27 return sizeof (uint64_t) * 3
30 +
sizeof (uint32_t) * 4;
36 return sizeof (uint64_t) * 3
39 +
sizeof (uint32_t) * 4;
53 int64_t & buffer_remaining)
56 if ((
size_t)buffer_remaining >=
sizeof (
size))
58 memcpy (&
size, buffer,
sizeof (
size));
60 buffer +=
sizeof (
size);
62 buffer_remaining -=
sizeof (
size);
89 if ((
size_t)buffer_remaining >=
sizeof (
type))
91 memcpy (&
type, buffer,
sizeof (
type));
93 buffer +=
sizeof (
type);
95 buffer_remaining -=
sizeof (
type);
98 if ((
size_t)buffer_remaining >=
sizeof (
updates))
104 buffer_remaining -=
sizeof (
updates);
107 if ((
size_t)buffer_remaining >=
sizeof (
quality))
113 buffer_remaining -=
sizeof (
quality);
116 if ((
size_t)buffer_remaining >=
sizeof (
clock))
120 buffer +=
sizeof (
clock);
122 buffer_remaining -=
sizeof (
clock);
125 if ((
size_t)buffer_remaining >=
sizeof (
timestamp))
134 if (buffer_remaining >= 1)
136 memcpy (&
ttl, buffer, 1);
139 buffer_remaining -= 1;
155 int64_t & buffer_remaining)
158 if ((
size_t)buffer_remaining >=
sizeof (
size))
161 buffer +=
sizeof (
size);
163 buffer_remaining -=
sizeof (
size);
190 if ((
size_t)buffer_remaining >=
sizeof (
type))
193 buffer +=
sizeof (
type);
195 buffer_remaining -=
sizeof (
type);
198 if ((
size_t)buffer_remaining >=
sizeof (
updates))
203 buffer_remaining -=
sizeof (
updates);
206 if ((
size_t)buffer_remaining >=
sizeof (
quality))
211 buffer_remaining -=
sizeof (
quality);
214 if ((
size_t)buffer_remaining >=
sizeof (
clock))
217 buffer +=
sizeof (
clock);
219 buffer_remaining -=
sizeof (
clock);
222 if ((
size_t)buffer_remaining >=
sizeof (
timestamp))
229 if (buffer_remaining >= 1)
231 memcpy (buffer, &
ttl, 1);
234 buffer_remaining -= 1;
272 "transport::defrag:" \
273 " defragging fragment map\n");
275 FragmentMap::iterator i = map.find (0);
280 const char * buffer = header.
read (i->second, buffer_remaining);
283 if (header.
updates <= map.size ())
286 "transport::defrag:" \
287 " the map is large enough to contain updates\n");
293 "transport::defrag:" \
294 " regular message header detected\n");
298 contents_header.
read (buffer, buffer_remaining);
299 size = contents_header.
size;
304 "transport::defrag:" \
305 " regular message header detected\n");
309 contents_header.
read (buffer, buffer_remaining);
310 size = contents_header.
size;
313 result =
new char [
size];
317 buffer_remaining = actual_size;
322 "transport::defrag: copying buffer to lhs\n");
324 memcpy (lhs, buffer, buffer_remaining);
325 buffer += actual_size;
334 for (;i != map.end (); ++i)
337 "transport::defrag: reading header of new fragment\n");
340 buffer = header.
read (i->second, buffer_remaining);
342 buffer_remaining = actual_size;
347 "transport::defrag: copying buffer to lhs\n");
349 memcpy (lhs, buffer, buffer_remaining);
350 buffer += actual_size;
364 for (FragmentMap::iterator i = map.begin (); i != map.end (); ++i)
374 uint32_t queue_length,
390 "transport::add_fragment: adding fragment\n");
392 char * new_fragment = 0;
397 "transport::add_fragment:" \
398 " reading header from buffer\n");
400 header.
read (fragment, buffer_remaining);
405 "transport::add_fragment:" \
406 " reading header from hold fragment\n");
408 new_fragment =
new char [header.
size];
409 memcpy (new_fragment, fragment, header.
size);
415 "transport::add_fragment:" \
416 " searching for originator %s.\n", originator);
418 OriginatorFragmentMap::iterator orig_map = map.find (originator);
419 if (orig_map == map.end ())
422 "transport::add_fragment:" \
423 " creating entry for originator %s.\n", originator);
432 "transport::add_fragment:" \
433 " originator %s exists in fragment map.\n", originator);
436 ClockFragmentMap::iterator clock_found = clock_map.find (clock);
438 if (clock_found != clock_map.end ())
441 if (clock_found->second.find (update_number)
442 == clock_found->second.end ())
444 if (clock_found->second.size () != 0)
447 "transport::add_fragment:" \
448 " %s:%d is being added.\n", originator, update_number);
454 result =
defrag (clock_found->second);
459 "transport::add_fragment:" \
460 " %s:%d is complete. Deleting fragments.\n",
461 originator, update_number);
464 clock_map.erase (clock);
469 "transport::add_fragment:" \
470 " %s:%d is complete. Need more fragments.\n",
471 originator, update_number);
478 "transport::add_fragment:" \
479 " %s:%d has been previously defragged and fragments deleted.\n",
480 originator, update_number);
486 else if (clock_map.size () < queue_length)
490 "transport::add_fragment:" \
491 " %s:%d is being added to queue.\n",
492 originator, update_number);
500 "transport::add_fragment:" \
501 " %s:%d is being added to queue after a deletion\n",
502 originator, update_number);
504 uint32_t oldest = (uint32_t)clock_map.begin ()->first;
509 "transport::add_fragment:" \
510 " deleting fragments.\n",
511 originator, update_number);
516 for (FragmentMap::iterator i = fragments.begin ();
517 i != fragments.end (); ++i)
523 "transport::add_fragment:" \
524 " erasing old clock.\n",
525 originator, update_number);
528 clock_map.erase (oldest);
562 if (fragment_size > 0)
566 " fragmenting character stream into %d byte packets.\n",
569 uint32_t data_per_packet = fragment_size -
572 const char * buffer = source;
579 " regular message header detected\n",
583 int64_t buffer_remaining = contents_header.
encoded_size ();
584 contents_header.
read (source, buffer_remaining);
585 header = contents_header;
591 " regular message header detected\n");
594 int64_t buffer_remaining = contents_header.
encoded_size ();
595 contents_header.
read (source, buffer_remaining);
596 header = contents_header;
599 total_size = header.
size;
600 header.
updates = (uint32_t)header.
size / data_per_packet;
602 if (header.
size % data_per_packet != 0)
607 " iterating over %d updates.\n",
610 for (uint32_t i = 0; i < header.
updates; ++i)
614 int64_t buffer_remaining;
615 uint64_t actual_data_size;
618 cur_size = (size_t) total_size +
619 FragmentMessageHeader::static_encoded_size ();
621 cur_size = (size_t) fragment_size;
623 buffer_remaining = cur_size;
624 actual_data_size = cur_size -
626 new_frag =
new char [cur_size];
632 " writing %d packet of size %d.\n",
636 header.
size = cur_size;
637 new_frag = header.
write (new_frag, buffer_remaining);
638 memcpy (new_frag, buffer, (
size_t)actual_data_size);
639 buffer += actual_data_size;
640 total_size -= actual_data_size;
651 OriginatorFragmentMap::iterator orig_map = map.find (originator);
653 if (orig_map != map.end ())
656 "transport::is_complete:" \
661 ClockFragmentMap::iterator clock_found = clock_map.find (clock);
663 if (clock_found != clock_map.end ())
666 "transport::is_complete:" \
667 " %s:%" PRIu64
" was found.\n",
670 uint64_t
size = clock_found->second.size ();
671 FragmentMap::iterator i = clock_found->second.find (0);
673 if (i != clock_found->second.end ())
676 "transport::is_complete:" \
677 " %s:%" PRIu64
": 0 == fragmap.end.\n",
684 "transport::is_complete:" \
685 " %s:%" PRIu64
": size == %" PRIu64
686 ", updates == %" PRIu32
". COMPLETE\n",
687 originator, clock, size,
696 "transport::is_complete:" \
697 " %s:%" PRIu64
": size == %" PRIu64
698 ", updates == %" PRIu32
". INCOMPLETE\n",
699 originator, clock, size,
706 "transport::is_complete:" \
707 " %s:%" PRIu64
": size == 0 and i == 0. COMPLETE\n",
716 "transport::is_complete:" \
717 " %s:%" PRIu64
" was not found.\n",
724 "transport::is_complete:" \
725 " %s was not found.\n",
738 OriginatorFragmentMap::iterator orig_map = map.find (originator);
740 if (orig_map != map.end ())
743 ClockFragmentMap::iterator clock_found = clock_map.find (clock);
745 if (clock_found != clock_map.end ())
747 if (clock_found->second.find (update_number)
748 != clock_found->second.end ())
MADARA_EXPORT char * add_fragment(const char *originator, uint64_t clock, uint32_t update_number, const char *fragment, uint32_t queue_length, OriginatorFragmentMap &map, bool clear=true)
Adds a fragment to an originator fragment map and returns the aggregate message if the message is com...
MADARA_EXPORT char * defrag(FragmentMap &map)
Pieces together a fragment map into a single buffer.
MADARA_EXPORT bool is_complete(const char *originator, uint64_t clock, OriginatorFragmentMap &map)
Breaks a large packet into smaller packets.
#define FRAGMENTATION_MADARA_ID
MADARA_EXPORT utility::Refcounter< logger::Logger > global_logger
std::map< std::string, ClockFragmentMap > OriginatorFragmentMap
Map of originator to a map of clocks to fragments.
std::map< uint64_t, FragmentMap > ClockFragmentMap
Map of clocks to fragments.
uint64_t endian_swap(uint64_t value)
Converts a host format uint64_t into big endian.
#define madara_logger_ptr_log(logger, level,...)
Fast version of the madara::logger::log method for Logger pointers.
MADARA_EXPORT void delete_fragments(FragmentMap &map)
Deletes fragments within a fragment map and clears the map.
MADARA_EXPORT bool exists(const char *originator, uint64_t clock, uint32_t update_number, OriginatorFragmentMap &map)
Checks if a fragment already exists within a fragment map.
std::map< uint32_t, const char * > FragmentMap
Map of fragment identifiers to fragments.
MADARA_EXPORT void frag(const char *source, uint32_t fragment_size, FragmentMap &map)
Breaks a large packet into smaller packets.