MADARA  3.2.3
Fragmentation.cpp
Go to the documentation of this file.
1 #include "Fragmentation.h"
2 #include "ReducedMessageHeader.h"
5 
6 #include <algorithm>
7 #include <time.h>
8 
10 : MessageHeader (), update_number (0)
11 {
13  madara_id[7] = 0;
14 
15  originator[0] = 0;
16  domain[0] = 0;
17 }
18 
19 
21 {
22 }
23 
24 uint32_t
26 {
27  return sizeof (uint64_t) * 3 // size, clock, timestamp
30  + sizeof (uint32_t) * 4; // type, updates, quality, update_number
31 }
32 
33 uint32_t
35 {
36  return sizeof (uint64_t) * 3 // size, clock, timestamp
39  + sizeof (uint32_t) * 4; // type, updates, quality, update_number
40 }
41 
42 
43 
44 uint32_t
46 {
47  buffer += 116;
48  return (madara::utility::endian_swap (*(uint32_t *)buffer));
49 }
50 
51 const char *
53  int64_t & buffer_remaining)
54 {
55  // Remove size field from the buffer and update accordingly
56  if ((size_t)buffer_remaining >= sizeof (size))
57  {
58  memcpy (&size, buffer, sizeof (size));
60  buffer += sizeof (size);
61  }
62  buffer_remaining -= sizeof (size);
63 
64  // Remove madara_id field from the buffer and update accordingly
65  if ((size_t)buffer_remaining >= sizeof (char) * MADARA_IDENTIFIER_LENGTH)
66  {
67  strncpy (madara_id, buffer, MADARA_IDENTIFIER_LENGTH);
68  buffer += sizeof (char) * MADARA_IDENTIFIER_LENGTH;
69  }
70  buffer_remaining -= sizeof (char) * MADARA_IDENTIFIER_LENGTH;
71 
72  // Remove domain field from the buffer and update accordingly
73  if ((size_t)buffer_remaining >= sizeof (char) * MADARA_DOMAIN_MAX_LENGTH)
74  {
75  strncpy (domain, buffer, MADARA_DOMAIN_MAX_LENGTH);
76  buffer += sizeof (char) * MADARA_DOMAIN_MAX_LENGTH;
77  }
78  buffer_remaining -= sizeof (char) * MADARA_DOMAIN_MAX_LENGTH;
79 
80  // Remove originator from the buffer and update accordingly
81  if ((size_t)buffer_remaining >= sizeof (char) * MAX_ORIGINATOR_LENGTH)
82  {
83  strncpy (originator, buffer, MAX_ORIGINATOR_LENGTH);
84  buffer += sizeof (char) * MAX_ORIGINATOR_LENGTH;
85  }
86  buffer_remaining -= sizeof (char) * MAX_ORIGINATOR_LENGTH;
87 
88  // Remove type field from the buffer and update accordingly
89  if ((size_t)buffer_remaining >= sizeof (type))
90  {
91  memcpy (&type, buffer, sizeof (type));
93  buffer += sizeof (type);
94  }
95  buffer_remaining -= sizeof (type);
96 
97  // Remove updates field from the buffer and update accordingly
98  if ((size_t)buffer_remaining >= sizeof (updates))
99  {
100  memcpy (&updates, buffer, sizeof (updates));
102  buffer += sizeof (updates);
103  }
104  buffer_remaining -= sizeof (updates);
105 
106  // Remove quality field from the buffer and update accordingly
107  if ((size_t)buffer_remaining >= sizeof (quality))
108  {
109  memcpy (&quality, buffer, sizeof (quality));
111  buffer += sizeof (quality);
112  }
113  buffer_remaining -= sizeof (quality);
114 
115  // Remove clock field from the buffer and update accordingly
116  if ((size_t)buffer_remaining >= sizeof (clock))
117  {
118  memcpy (&clock, buffer, sizeof (clock));
120  buffer += sizeof (clock);
121  }
122  buffer_remaining -= sizeof (clock);
123 
124  // Remove timestamp field from the buffer and update accordingly
125  if ((size_t)buffer_remaining >= sizeof (timestamp))
126  {
127  memcpy (&timestamp, buffer, sizeof (timestamp));
129  buffer += sizeof (timestamp);
130  }
131  buffer_remaining -= sizeof (timestamp);
132 
133  // Remove the time to live field from the buffer
134  if (buffer_remaining >= 1)
135  {
136  memcpy (&ttl, buffer, 1);
137  buffer += 1;
138  }
139  buffer_remaining -= 1;
140 
141  // Remove updates field from the buffer and update accordingly
142  if ((size_t)buffer_remaining >= sizeof (update_number))
143  {
144  memcpy (&update_number, buffer, sizeof (update_number));
146  buffer += sizeof (update_number);
147  }
148  buffer_remaining -= sizeof (update_number);
149 
150  return buffer;
151 }
152 
153 char *
155  int64_t & buffer_remaining)
156 {
157  // Write size field from the buffer and update accordingly
158  if ((size_t)buffer_remaining >= sizeof (size))
159  {
160  *(uint64_t *) buffer = madara::utility::endian_swap (size);
161  buffer += sizeof (size);
162  }
163  buffer_remaining -= sizeof (size);
164 
165  // Write madara_id field from the buffer and update accordingly
166  if ((size_t)buffer_remaining >= sizeof (char) * MADARA_IDENTIFIER_LENGTH)
167  {
168  strncpy (buffer, madara_id, MADARA_IDENTIFIER_LENGTH);
169  buffer += sizeof (char) * MADARA_IDENTIFIER_LENGTH;
170  }
171  buffer_remaining -= sizeof (char) * MADARA_IDENTIFIER_LENGTH;
172 
173  // Write domain field from the buffer and update accordingly
174  if ((size_t)buffer_remaining >= sizeof (char) * MADARA_DOMAIN_MAX_LENGTH)
175  {
176  strncpy (buffer, domain, MADARA_DOMAIN_MAX_LENGTH);
177  buffer += sizeof (char) * MADARA_DOMAIN_MAX_LENGTH;
178  }
179  buffer_remaining -= sizeof (char) * MADARA_DOMAIN_MAX_LENGTH;
180 
181  // Write originator from the buffer and update accordingly
182  if ((size_t)buffer_remaining >= sizeof (char) * MAX_ORIGINATOR_LENGTH)
183  {
184  strncpy (buffer, originator, MAX_ORIGINATOR_LENGTH);
185  buffer += sizeof (char) * MAX_ORIGINATOR_LENGTH;
186  }
187  buffer_remaining -= sizeof (char) * MAX_ORIGINATOR_LENGTH;
188 
189  // Write type field from the buffer and update accordingly
190  if ((size_t)buffer_remaining >= sizeof (type))
191  {
192  *(uint32_t *) buffer = madara::utility::endian_swap (type);
193  buffer += sizeof (type);
194  }
195  buffer_remaining -= sizeof (type);
196 
197  // Write updates field from the buffer and update accordingly
198  if ((size_t)buffer_remaining >= sizeof (updates))
199  {
200  *(uint32_t *) buffer = madara::utility::endian_swap (updates);
201  buffer += sizeof (updates);
202  }
203  buffer_remaining -= sizeof (updates);
204 
205  // Write quality field from the buffer and update accordingly
206  if ((size_t)buffer_remaining >= sizeof (quality))
207  {
208  *(uint32_t *) buffer = madara::utility::endian_swap (quality);
209  buffer += sizeof (quality);
210  }
211  buffer_remaining -= sizeof (quality);
212 
213  // Write clock field from the buffer and update accordingly
214  if ((size_t)buffer_remaining >= sizeof (clock))
215  {
216  *(uint64_t *) buffer = madara::utility::endian_swap (clock);
217  buffer += sizeof (clock);
218  }
219  buffer_remaining -= sizeof (clock);
220 
221  // Write timestamp field from the buffer and update accordingly
222  if ((size_t)buffer_remaining >= sizeof (timestamp))
223  {
224  *(uint64_t *) buffer = madara::utility::endian_swap (timestamp);
225  buffer += sizeof (timestamp);
226  }
227  buffer_remaining -= sizeof (timestamp);
228 
229  if (buffer_remaining >= 1)
230  {
231  memcpy (buffer, &ttl, 1);
232  buffer += 1;
233  }
234  buffer_remaining -= 1;
235 
236  // Write updates field from the buffer and update accordingly
237  if ((size_t)buffer_remaining >= sizeof (update_number))
238  {
239  *(uint32_t *) buffer = madara::utility::endian_swap (update_number);
240  buffer += sizeof (update_number);
241  }
242  buffer_remaining -= sizeof (update_number);
243 
244  return buffer;
245 }
246 
247 bool
249  const MessageHeader & other)
250 {
251  const FragmentMessageHeader * rhs = dynamic_cast <
252  const FragmentMessageHeader *> (&other);
253 
254  return size == rhs->size &&
255  type == rhs->type &&
256  updates == rhs->updates &&
257  update_number == rhs->update_number &&
258  quality == rhs->quality &&
259  clock == rhs->clock &&
260  timestamp == rhs->timestamp &&
261  strncmp (madara_id, rhs->madara_id, MADARA_IDENTIFIER_LENGTH) == 0 &&
262  strncmp (domain, rhs->domain, MADARA_DOMAIN_MAX_LENGTH) == 0 &&
263  strncmp (originator, rhs->originator, MAX_ORIGINATOR_LENGTH) == 0;
264 }
265 
266 
268 {
269  char * result = 0;
270 
272  "transport::defrag:" \
273  " defragging fragment map\n");
274 
275  FragmentMap::iterator i = map.find (0);
276  if (i != map.end ())
277  {
278  FragmentMessageHeader header;
279  int64_t buffer_remaining (header.encoded_size ());
280  const char * buffer = header.read (i->second, buffer_remaining);
281 
282  // do we have enough updates to defragment?
283  if (header.updates <= map.size ())
284  {
286  "transport::defrag:" \
287  " the map is large enough to contain updates\n");
288 
289  int64_t size = 0;
291  {
293  "transport::defrag:" \
294  " regular message header detected\n");
295 
296  MessageHeader contents_header;
297  buffer_remaining = contents_header.encoded_size ();
298  contents_header.read (buffer, buffer_remaining);
299  size = contents_header.size;
300  }
302  {
304  "transport::defrag:" \
305  " regular message header detected\n");
306 
307  ReducedMessageHeader contents_header;
308  buffer_remaining = contents_header.encoded_size ();
309  contents_header.read (buffer, buffer_remaining);
310  size = contents_header.size;
311  }
312 
313  result = new char [size];
314  char * lhs = result;
315 
316  uint32_t actual_size = (uint32_t)header.size - header.encoded_size ();
317  buffer_remaining = actual_size;
318 
319  if (size >= 0)
320  {
322  "transport::defrag: copying buffer to lhs\n");
323 
324  memcpy (lhs, buffer, buffer_remaining);
325  buffer += actual_size;
326  lhs += actual_size;
327  size -= actual_size;
328  }
329 
330  if (i != map.end ())
331  ++i;
332 
333  // if so, iterate over the fragments and copy the contents
334  for (;i != map.end (); ++i)
335  {
337  "transport::defrag: reading header of new fragment\n");
338 
339  buffer_remaining = header.encoded_size ();
340  buffer = header.read (i->second, buffer_remaining);
341  actual_size = (uint32_t)header.size - header.encoded_size ();
342  buffer_remaining = actual_size;
343 
344  if (size >= 0)
345  {
347  "transport::defrag: copying buffer to lhs\n");
348 
349  memcpy (lhs, buffer, buffer_remaining);
350  buffer += actual_size;
351  lhs += actual_size;
352  size -= actual_size;
353  }
354  }
355  }
356  }
357 
358  return result;
359 }
360 
361 void
363 {
364  for (FragmentMap::iterator i = map.begin (); i != map.end (); ++i)
365  {
366  delete [] i->second;
367  }
368  map.clear ();
369 }
370 
371 char *
373  uint32_t update_number, const char * fragment,
374  uint32_t queue_length,
375  OriginatorFragmentMap & map, bool clear)
376 {
377  char * result = 0;
378 
390  "transport::add_fragment: adding fragment\n");
391 
392  char * new_fragment = 0;
393  FragmentMessageHeader header;
394  int64_t buffer_remaining = header.encoded_size ();
395 
397  "transport::add_fragment:" \
398  " reading header from buffer\n");
399 
400  header.read (fragment, buffer_remaining);
401 
402  if (header.size > 0)
403  {
405  "transport::add_fragment:" \
406  " reading header from hold fragment\n");
407 
408  new_fragment = new char [header.size];
409  memcpy (new_fragment, fragment, header.size);
410  }
411  else
412  return 0;
413 
415  "transport::add_fragment:" \
416  " searching for originator %s.\n", originator);
417 
418  OriginatorFragmentMap::iterator orig_map = map.find (originator);
419  if (orig_map == map.end ())
420  {
422  "transport::add_fragment:" \
423  " creating entry for originator %s.\n", originator);
424 
425  // originator does not exist (1)
426  map[originator][clock][update_number] = new_fragment;
427  }
428 
429  else
430  {
432  "transport::add_fragment:" \
433  " originator %s exists in fragment map.\n", originator);
434 
435  ClockFragmentMap & clock_map (orig_map->second);
436  ClockFragmentMap::iterator clock_found = clock_map.find (clock);
437 
438  if (clock_found != clock_map.end ())
439  {
440  // we have found the clock entry
441  if (clock_found->second.find (update_number)
442  == clock_found->second.end ())
443  {
444  if (clock_found->second.size () != 0)
445  {
447  "transport::add_fragment:" \
448  " %s:%d is being added.\n", originator, update_number);
449 
450  // the fragment does not exist yet
451  clock_found->second [update_number] = new_fragment;
452 
453  // check for a new buffer
454  result = defrag (clock_found->second);
455 
456  if (result && clear)
457  {
459  "transport::add_fragment:" \
460  " %s:%d is complete. Deleting fragments.\n",
461  originator, update_number);
462 
463  delete_fragments (clock_found->second);
464  clock_map.erase (clock);
465  }
466  else
467  {
469  "transport::add_fragment:" \
470  " %s:%d is complete. Need more fragments.\n",
471  originator, update_number);
472  }
473  }
474  else
475  {
476  // if we get here, the message has already been defragged
478  "transport::add_fragment:" \
479  " %s:%d has been previously defragged and fragments deleted.\n",
480  originator, update_number);
481 
482  }
483  }
484  }
485 
486  else if (clock_map.size () < queue_length)
487  {
488  // if we get here, the message has already been defragged
490  "transport::add_fragment:" \
491  " %s:%d is being added to queue.\n",
492  originator, update_number);
493 
494  // clock queue has not been exhausted (2)
495  clock_map [clock][update_number] = new_fragment;
496  }
497  else
498  {
500  "transport::add_fragment:" \
501  " %s:%d is being added to queue after a deletion\n",
502  originator, update_number);
503 
504  uint32_t oldest = (uint32_t)clock_map.begin ()->first;
505 
506  if (oldest < clock)
507  {
509  "transport::add_fragment:" \
510  " deleting fragments.\n",
511  originator, update_number);
512 
513  FragmentMap & fragments = clock_map [oldest];
514 
515  // delete all fragments in the clock entry
516  for (FragmentMap::iterator i = fragments.begin ();
517  i != fragments.end (); ++i)
518  {
519  delete [] i->second;
520  }
521 
523  "transport::add_fragment:" \
524  " erasing old clock.\n",
525  originator, update_number);
526 
527  // erase the oldest clock fragment map
528  clock_map.erase (oldest);
529 
530  // replace it in the logical queue with the new clock fragment
531  clock_map [clock][update_number] = new_fragment;
532  }
533  }
534  }
535 
536  return result;
537 }
538 
539 void
541  MessageHeader & header)
542 {
543  if (this != &header)
544  {
545  clock = header.clock;
546  strncpy (domain, header.domain, MADARA_DOMAIN_MAX_LENGTH);
547  strncpy (originator, header.originator, MAX_ORIGINATOR_LENGTH);
548  quality = header.quality;
549  size = header.size;
550  timestamp = header.timestamp;
551  ttl = header.ttl;
552  type = header.type;
553  updates = header.updates;
554  }
555 }
556 
557 
558 void
559 madara::transport::frag (const char * source,
560  uint32_t fragment_size, FragmentMap & map)
561 {
562  if (fragment_size > 0)
563  {
565  "transport::frag:" \
566  " fragmenting character stream into %d byte packets.\n",
567  fragment_size);
568 
569  uint32_t data_per_packet = fragment_size -
571 
572  const char * buffer = source;
573  uint64_t total_size;
574  FragmentMessageHeader header;
576  {
578  "transport::frag:" \
579  " regular message header detected\n",
580  fragment_size);
581 
582  MessageHeader contents_header;
583  int64_t buffer_remaining = contents_header.encoded_size ();
584  contents_header.read (source, buffer_remaining);
585  header = contents_header;
586  }
588  {
590  "transport::frag:" \
591  " regular message header detected\n");
592 
593  ReducedMessageHeader contents_header;
594  int64_t buffer_remaining = contents_header.encoded_size ();
595  contents_header.read (source, buffer_remaining);
596  header = contents_header;
597  }
598 
599  total_size = header.size;
600  header.updates = (uint32_t)header.size / data_per_packet;
601 
602  if (header.size % data_per_packet != 0)
603  ++header.updates;
604 
606  "transport::frag:" \
607  " iterating over %d updates.\n",
608  header.updates);
609 
610  for (uint32_t i = 0; i < header.updates; ++i)
611  {
612  char * new_frag;
613  size_t cur_size;
614  int64_t buffer_remaining;
615  uint64_t actual_data_size;
616 
617  if (i == header.updates - 1)
618  cur_size = (size_t) total_size +
619  FragmentMessageHeader::static_encoded_size ();
620  else
621  cur_size = (size_t) fragment_size;
622 
623  buffer_remaining = cur_size;
624  actual_data_size = cur_size -
626  new_frag = new char [cur_size];
627 
628  map[i] = new_frag;
629 
631  "transport::frag:" \
632  " writing %d packet of size %d.\n",
633  i, cur_size);
634 
635  header.update_number = i;
636  header.size = cur_size;
637  new_frag = header.write (new_frag, buffer_remaining);
638  memcpy (new_frag, buffer, (size_t)actual_data_size);
639  buffer += actual_data_size;
640  total_size -= actual_data_size;
641  }
642  }
643 }
644 
645 bool
647  OriginatorFragmentMap & map)
648 {
649  bool result = false;
650 
651  OriginatorFragmentMap::iterator orig_map = map.find (originator);
652 
653  if (orig_map != map.end ())
654  {
656  "transport::is_complete:" \
657  " %s was found.\n",
658  originator);
659 
660  ClockFragmentMap & clock_map (orig_map->second);
661  ClockFragmentMap::iterator clock_found = clock_map.find (clock);
662 
663  if (clock_found != clock_map.end ())
664  {
666  "transport::is_complete:" \
667  " %s:%" PRIu64 " was found.\n",
668  originator, clock);
669 
670  uint64_t size = clock_found->second.size ();
671  FragmentMap::iterator i = clock_found->second.find (0);
672 
673  if (i != clock_found->second.end ())
674  {
676  "transport::is_complete:" \
677  " %s:%" PRIu64 ": 0 == fragmap.end.\n",
678  originator, clock);
679 
680  if (FragmentMessageHeader::get_updates (i->second) == size)
681  {
684  "transport::is_complete:" \
685  " %s:%" PRIu64 ": size == %" PRIu64
686  ", updates == %" PRIu32 ". COMPLETE\n",
687  originator, clock, size,
689 
690  result = true;
691  }
692  else
693  {
696  "transport::is_complete:" \
697  " %s:%" PRIu64 ": size == %" PRIu64
698  ", updates == %" PRIu32 ". INCOMPLETE\n",
699  originator, clock, size,
701  }
702  }
703  else if (size == 0)
704  {
706  "transport::is_complete:" \
707  " %s:%" PRIu64 ": size == 0 and i == 0. COMPLETE\n",
708  originator, clock);
709 
710  result = true;
711  }
712  }
713  else
714  {
716  "transport::is_complete:" \
717  " %s:%" PRIu64 " was not found.\n",
718  originator, clock);
719  }
720  }
721  else
722  {
724  "transport::is_complete:" \
725  " %s was not found.\n",
726  originator);
727  }
728 
729  return result;
730 }
731 
732 bool
734  uint32_t update_number, OriginatorFragmentMap & map)
735 {
736  bool result = false;
737 
738  OriginatorFragmentMap::iterator orig_map = map.find (originator);
739 
740  if (orig_map != map.end ())
741  {
742  ClockFragmentMap & clock_map (orig_map->second);
743  ClockFragmentMap::iterator clock_found = clock_map.find (clock);
744 
745  if (clock_found != clock_map.end ())
746  {
747  if (clock_found->second.find (update_number)
748  != clock_found->second.end ())
749  {
750  result = true;
751  }
752  }
753  }
754 
755  return result;
756 }
MADARA_EXPORT char * add_fragment(const char *originator, uint64_t clock, uint32_t update_number, const char *fragment, uint32_t queue_length, OriginatorFragmentMap &map, bool clear=true)
Adds a fragment to an originator fragment map and returns the aggregate message if the message is com...
virtual uint32_t encoded_size(void) const
Returns the size of the encoded MessageHeader class, which may be different from sizeof (MessageHeade...
MADARA_EXPORT char * defrag(FragmentMap &map)
Pieces together a fragment map into a single buffer.
char madara_id[8]
the identifier of this transport (MADARA_IDENTIFIER)
virtual const char * read(const char *buffer, int64_t &buffer_remaining)
Reads a MessageHeader instance from a buffer and updates the amount of buffer room remaining...
MADARA_EXPORT bool is_complete(const char *originator, uint64_t clock, OriginatorFragmentMap &map)
Breaks a large packet into smaller packets.
virtual uint32_t encoded_size(void) const
Returns the size of the encoded MessageHeader class, which may be different from sizeof (MessageHeade...
virtual ~FragmentMessageHeader()
Destructor.
uint32_t updates
the number of knowledge variable updates in the message
#define FRAGMENTATION_MADARA_ID
Definition: Fragmentation.h:24
MADARA_EXPORT utility::Refcounter< logger::Logger > global_logger
#define MAX_ORIGINATOR_LENGTH
Definition: MessageHeader.h:28
Defines a fragmentation header which allows for multi-part messages that are only applied once all fr...
Definition: Fragmentation.h:48
uint32_t type
the type of message
static uint32_t get_updates(const char *buffer)
Returns the number of updates indicated in the header.
virtual bool equals(const MessageHeader &other)
Compares the fields of this instance to another instance.
std::map< std::string, ClockFragmentMap > OriginatorFragmentMap
Map of originator to a map of clocks to fragments.
std::map< uint64_t, FragmentMap > ClockFragmentMap
Map of clocks to fragments.
uint64_t endian_swap(uint64_t value)
Converts a host format uint64_t into big endian.
Definition: Utility.inl:115
virtual uint32_t encoded_size(void) const
Returns the size of the encoded MessageHeader class, which may be different from sizeof (MessageHeade...
#define madara_logger_ptr_log(logger, level,...)
Fast version of the madara::logger::log method for Logger pointers.
Definition: Logger.h:32
uint64_t size
the size of this header plus the updates
static uint32_t static_encoded_size(void)
Returns the size of the encoded MessageHeader class, which may be different from sizeof (MessageHeade...
virtual const char * read(const char *buffer, int64_t &buffer_remaining)
Reads a MessageHeader instance from a buffer and updates the amount of buffer room remaining...
unsigned char ttl
time to live (number of rebroadcasts to perform after original send
uint64_t timestamp
the timestamp of the sender when the message was generated
void operator=(MessageHeader &header)
Assignment operator for regular message header.
MADARA_EXPORT void delete_fragments(FragmentMap &map)
Deletes fragments within a fragment map and clears the map.
char originator[64]
the originator of the message (host:port)
char domain[32]
the domain that this message is intended for
MADARA_EXPORT bool exists(const char *originator, uint64_t clock, uint32_t update_number, OriginatorFragmentMap &map)
Checks if a fragment already exists within a fragment map.
virtual const char * read(const char *buffer, int64_t &buffer_remaining)
Reads a MessageHeader instance from a buffer and updates the amount of buffer room remaining...
static bool message_header_test(const char *buffer)
Tests the buffer for a normal message identifier.
std::map< uint32_t, const char * > FragmentMap
Map of fragment identifiers to fragments.
#define MADARA_IDENTIFIER_LENGTH
Definition: MessageHeader.h:21
uint64_t clock
the clock of the sender when the message was generated
Defines a simple, smaller message header of 29 bytes that supports less QoS.
#define MADARA_DOMAIN_MAX_LENGTH
Definition: MessageHeader.h:23
Defines a robust message header which is the default for KaRL messages.
Definition: MessageHeader.h:57
virtual char * write(char *buffer, int64_t &buffer_remaining)
Writes a MessageHeader instance to a buffer and updates the amount of buffer room remaining...
uint32_t quality
the quality of the message sender
MADARA_EXPORT void frag(const char *source, uint32_t fragment_size, FragmentMap &map)
Breaks a large packet into smaller packets.