MADARA  3.4.1
Fragmentation.cpp
Go to the documentation of this file.
1 #include "Fragmentation.h"
2 #include "ReducedMessageHeader.h"
6 
7 #include <algorithm>
8 #include <time.h>
9 #include <sstream>
10 
12  : MessageHeader(), update_number(0), total_size(0)
13 {
15  madara_id[7] = 0;
16 
17  originator[0] = 0;
18  domain[0] = 0;
19 }
20 
22 
24 {
25  return sizeof(uint64_t) * 4 // size, clock, timestamp, total_size (4)=32
27  MAX_ORIGINATOR_LENGTH + 1) + // 8+32+64+1=105
28  sizeof(uint32_t) * 4; // type, updates, quality, update_number(4)=16
29 }
30 
32 {
33  return sizeof(uint64_t) * 4 // size, clock, timestamp, total_size=32
35  MAX_ORIGINATOR_LENGTH + 1) + // 8+32+64+1=105
36  sizeof(uint32_t) * 4; // type, updates, quality, update_number=16
37 }
38 
40  const char* buffer)
41 {
42  buffer += 116;
43  return(madara::utility::endian_swap(*(uint32_t*)buffer));
44 }
45 
47 {
48  std::stringstream buffer;
49  buffer << "frag 12: ";
50  buffer << "index(4:" << update_number << "), ";
51  buffer << "total_size(8:" << total_size << "), msg: ";
52  buffer << MessageHeader::to_string();
53 
54  return buffer.str();
55 }
56 
58  const char* buffer, int64_t& buffer_remaining)
59 {
60  // Remove size field from the buffer and update accordingly
61  if((size_t)buffer_remaining >= sizeof(size))
62  {
63  memcpy(&size, buffer, sizeof(size));
64  size = madara::utility::endian_swap(size);
65  buffer += sizeof(size);
66  }
67  else
68  {
69  std::stringstream buffer;
70  buffer << "FragmentMessageHeader::read: ";
71  buffer << sizeof(size) << " byte size field cannot fit in ";
72  buffer << buffer_remaining << " byte buffer\n";
73 
74  throw exceptions::MemoryException(buffer.str());
75  }
76  buffer_remaining -= sizeof(size);
77 
78  // Remove madara_id field from the buffer and update accordingly
79  if((size_t)buffer_remaining >= sizeof(char) * MADARA_IDENTIFIER_LENGTH)
80  {
82  buffer += sizeof(char) * MADARA_IDENTIFIER_LENGTH;
83  }
84  else
85  {
86  std::stringstream buffer;
87  buffer << "FragmentMessageHeader::read: ";
88  buffer << MADARA_IDENTIFIER_LENGTH << " byte id encoding cannot fit in ";
89  buffer << buffer_remaining << " byte buffer\n";
90 
91  throw exceptions::MemoryException(buffer.str());
92  }
93  buffer_remaining -= sizeof(char) * MADARA_IDENTIFIER_LENGTH;
94 
95  // Remove domain field from the buffer and update accordingly
96  if((size_t)buffer_remaining >= sizeof(char) * MADARA_DOMAIN_MAX_LENGTH)
97  {
99  buffer += sizeof(char) * MADARA_DOMAIN_MAX_LENGTH;
100  }
101  else
102  {
103  std::stringstream buffer;
104  buffer << "FragmentMessageHeader::read: ";
105  buffer << MADARA_DOMAIN_MAX_LENGTH << " byte domain encoding cannot";
106  buffer << " fit in ";
107  buffer << buffer_remaining << " byte buffer\n";
108 
109  throw exceptions::MemoryException(buffer.str());
110  }
111  buffer_remaining -= sizeof(char) * MADARA_DOMAIN_MAX_LENGTH;
112 
113  // Remove originator from the buffer and update accordingly
114  if((size_t)buffer_remaining >= sizeof(char) * MAX_ORIGINATOR_LENGTH)
115  {
116  utility::strncpy_safe(originator, buffer, MAX_ORIGINATOR_LENGTH);
117  buffer += sizeof(char) * MAX_ORIGINATOR_LENGTH;
118  }
119  else
120  {
121  std::stringstream buffer;
122  buffer << "FragmentMessageHeader::read: ";
123  buffer << MAX_ORIGINATOR_LENGTH << " byte originator encoding cannot";
124  buffer << " fit in ";
125  buffer << buffer_remaining << " byte buffer\n";
126 
127  throw exceptions::MemoryException(buffer.str());
128  }
129  buffer_remaining -= sizeof(char) * MAX_ORIGINATOR_LENGTH;
130 
131  // Remove type field from the buffer and update accordingly
132  if((size_t)buffer_remaining >= sizeof(type))
133  {
134  memcpy(&type, buffer, sizeof(type));
135  type = madara::utility::endian_swap(type);
136  buffer += sizeof(type);
137  }
138  else
139  {
140  std::stringstream buffer;
141  buffer << "FragmentMessageHeader::read: ";
142  buffer << sizeof(type) << " byte type encoding cannot";
143  buffer << " fit in ";
144  buffer << buffer_remaining << " byte buffer\n";
145 
146  throw exceptions::MemoryException(buffer.str());
147  }
148  buffer_remaining -= sizeof(type);
149 
150  // Remove updates field from the buffer and update accordingly
151  if((size_t)buffer_remaining >= sizeof(updates))
152  {
153  memcpy(&updates, buffer, sizeof(updates));
154  updates = madara::utility::endian_swap(updates);
155  buffer += sizeof(updates);
156  }
157  else
158  {
159  std::stringstream buffer;
160  buffer << "FragmentMessageHeader::read: ";
161  buffer << sizeof(updates) << " byte updates encoding cannot";
162  buffer << " fit in ";
163  buffer << buffer_remaining << " byte buffer\n";
164 
165  throw exceptions::MemoryException(buffer.str());
166  }
167  buffer_remaining -= sizeof(updates);
168 
169  // Remove quality field from the buffer and update accordingly
170  if((size_t)buffer_remaining >= sizeof(quality))
171  {
172  memcpy(&quality, buffer, sizeof(quality));
173  quality = madara::utility::endian_swap(quality);
174  buffer += sizeof(quality);
175  }
176  else
177  {
178  std::stringstream buffer;
179  buffer << "FragmentMessageHeader::read: ";
180  buffer << sizeof(quality) << " byte quality encoding cannot";
181  buffer << " fit in ";
182  buffer << buffer_remaining << " byte buffer\n";
183 
184  throw exceptions::MemoryException(buffer.str());
185  }
186  buffer_remaining -= sizeof(quality);
187 
188  // Remove clock field from the buffer and update accordingly
189  if((size_t)buffer_remaining >= sizeof(clock))
190  {
191  memcpy(&clock, buffer, sizeof(clock));
192  clock = madara::utility::endian_swap(clock);
193  buffer += sizeof(clock);
194  }
195  else
196  {
197  std::stringstream buffer;
198  buffer << "FragmentMessageHeader::read: ";
199  buffer << sizeof(clock) << " byte clock encoding cannot";
200  buffer << " fit in ";
201  buffer << buffer_remaining << " byte buffer\n";
202 
203  throw exceptions::MemoryException(buffer.str());
204  }
205  buffer_remaining -= sizeof(clock);
206 
207  // Remove timestamp field from the buffer and update accordingly
208  if((size_t)buffer_remaining >= sizeof(timestamp))
209  {
210  memcpy(&timestamp, buffer, sizeof(timestamp));
211  timestamp = madara::utility::endian_swap(timestamp);
212  buffer += sizeof(timestamp);
213  }
214  else
215  {
216  std::stringstream buffer;
217  buffer << "FragmentMessageHeader::read: ";
218  buffer << sizeof(timestamp) << " byte timestamp encoding cannot";
219  buffer << " fit in ";
220  buffer << buffer_remaining << " byte buffer\n";
221 
222  throw exceptions::MemoryException(buffer.str());
223  }
224  buffer_remaining -= sizeof(timestamp);
225 
226  // Remove the time to live field from the buffer
227  if(buffer_remaining >= 1)
228  {
229  memcpy(&ttl, buffer, 1);
230  buffer += 1;
231  }
232  else
233  {
234  std::stringstream buffer;
235  buffer << "FragmentMessageHeader::read: ";
236  buffer << "1 byte ttl encoding cannot";
237  buffer << " fit in ";
238  buffer << buffer_remaining << " byte buffer\n";
239 
240  throw exceptions::MemoryException(buffer.str());
241  }
242  buffer_remaining -= 1;
243 
244  // Remove updates field from the buffer and update accordingly
245  if((size_t)buffer_remaining >= sizeof(update_number))
246  {
247  memcpy(&update_number, buffer, sizeof(update_number));
248  update_number = madara::utility::endian_swap(update_number);
249  buffer += sizeof(update_number);
250  }
251  else
252  {
253  std::stringstream buffer;
254  buffer << "FragmentMessageHeader::read: ";
255  buffer << sizeof(update_number) << " byte update number encoding cannot";
256  buffer << " fit in ";
257  buffer << buffer_remaining << " byte buffer\n";
258 
259  throw exceptions::MemoryException(buffer.str());
260  }
261  buffer_remaining -= sizeof(update_number);
262 
263  // Remove total size field from the buffer and update accordingly
264  if((size_t)buffer_remaining >= sizeof(total_size))
265  {
266  memcpy(&total_size, buffer, sizeof(total_size));
267  total_size = madara::utility::endian_swap(total_size);
268  buffer += sizeof(total_size);
269  }
270  else
271  {
272  std::stringstream buffer;
273  buffer << "FragmentMessageHeader::read: ";
274  buffer << sizeof(total_size) << " byte total size encoding cannot";
275  buffer << " fit in ";
276  buffer << buffer_remaining << " byte buffer\n";
277 
278  throw exceptions::MemoryException(buffer.str());
279  }
280  buffer_remaining -= sizeof(total_size);
281 
282  return buffer;
283 }
284 
286  char* buffer, int64_t& buffer_remaining)
287 {
288  // Write size field from the buffer and update accordingly
289  if((size_t)buffer_remaining >= sizeof(size))
290  {
291  *(uint64_t*)buffer = madara::utility::endian_swap(size);
292  buffer += sizeof(size);
293  }
294  else
295  {
296  std::stringstream buffer;
297  buffer << "FragmentMessageHeader::write: ";
298  buffer << sizeof(size) << " byte size encoding cannot";
299  buffer << " fit in ";
300  buffer << buffer_remaining << " byte buffer\n";
301 
302  throw exceptions::MemoryException(buffer.str());
303  }
304  buffer_remaining -= sizeof(size);
305 
306  // Write madara_id field from the buffer and update accordingly
307  if((size_t)buffer_remaining >= sizeof(char) * MADARA_IDENTIFIER_LENGTH)
308  {
310  buffer += sizeof(char) * MADARA_IDENTIFIER_LENGTH;
311  }
312  else
313  {
314  std::stringstream buffer;
315  buffer << "FragmentMessageHeader::write: ";
316  buffer << MADARA_IDENTIFIER_LENGTH << " byte id encoding cannot";
317  buffer << " fit in ";
318  buffer << buffer_remaining << " byte buffer\n";
319 
320  throw exceptions::MemoryException(buffer.str());
321  }
322  buffer_remaining -= sizeof(char) * MADARA_IDENTIFIER_LENGTH;
323 
324  // Write domain field from the buffer and update accordingly
325  if((size_t)buffer_remaining >= sizeof(char) * MADARA_DOMAIN_MAX_LENGTH)
326  {
328  buffer += sizeof(char) * MADARA_DOMAIN_MAX_LENGTH;
329  }
330  else
331  {
332  std::stringstream buffer;
333  buffer << "FragmentMessageHeader::write: ";
334  buffer << MADARA_DOMAIN_MAX_LENGTH << " byte domain encoding cannot";
335  buffer << " fit in ";
336  buffer << buffer_remaining << " byte buffer\n";
337 
338  throw exceptions::MemoryException(buffer.str());
339  }
340  buffer_remaining -= sizeof(char) * MADARA_DOMAIN_MAX_LENGTH;
341 
342  // Write originator from the buffer and update accordingly
343  if((size_t)buffer_remaining >= sizeof(char) * MAX_ORIGINATOR_LENGTH)
344  {
345  utility::strncpy_safe(buffer, originator, MAX_ORIGINATOR_LENGTH);
346  buffer += sizeof(char) * MAX_ORIGINATOR_LENGTH;
347  }
348  else
349  {
350  std::stringstream buffer;
351  buffer << "FragmentMessageHeader::write: ";
352  buffer << MAX_ORIGINATOR_LENGTH << " byte originator encoding cannot";
353  buffer << " fit in ";
354  buffer << buffer_remaining << " byte buffer\n";
355 
356  throw exceptions::MemoryException(buffer.str());
357  }
358  buffer_remaining -= sizeof(char) * MAX_ORIGINATOR_LENGTH;
359 
360  // Write type field from the buffer and update accordingly
361  if((size_t)buffer_remaining >= sizeof(type))
362  {
363  *(uint32_t*)buffer = madara::utility::endian_swap(type);
364  buffer += sizeof(type);
365  }
366  else
367  {
368  std::stringstream buffer;
369  buffer << "FragmentMessageHeader::write: ";
370  buffer << sizeof(type) << " byte type encoding cannot";
371  buffer << " fit in ";
372  buffer << buffer_remaining << " byte buffer\n";
373 
374  throw exceptions::MemoryException(buffer.str());
375  }
376  buffer_remaining -= sizeof(type);
377 
378  // Write updates field from the buffer and update accordingly
379  if((size_t)buffer_remaining >= sizeof(updates))
380  {
381  *(uint32_t*)buffer = madara::utility::endian_swap(updates);
382  buffer += sizeof(updates);
383  }
384  else
385  {
386  std::stringstream buffer;
387  buffer << "FragmentMessageHeader::write: ";
388  buffer << sizeof(updates) << " byte updates encoding cannot";
389  buffer << " fit in ";
390  buffer << buffer_remaining << " byte buffer\n";
391 
392  throw exceptions::MemoryException(buffer.str());
393  }
394  buffer_remaining -= sizeof(updates);
395 
396  // Write quality field from the buffer and update accordingly
397  if((size_t)buffer_remaining >= sizeof(quality))
398  {
399  *(uint32_t*)buffer = madara::utility::endian_swap(quality);
400  buffer += sizeof(quality);
401  }
402  else
403  {
404  std::stringstream buffer;
405  buffer << "FragmentMessageHeader::write: ";
406  buffer << sizeof(quality) << " byte quality encoding cannot";
407  buffer << " fit in ";
408  buffer << buffer_remaining << " byte buffer\n";
409 
410  throw exceptions::MemoryException(buffer.str());
411  }
412  buffer_remaining -= sizeof(quality);
413 
414  // Write clock field from the buffer and update accordingly
415  if((size_t)buffer_remaining >= sizeof(clock))
416  {
417  *(uint64_t*)buffer = madara::utility::endian_swap(clock);
418  buffer += sizeof(clock);
419  }
420  else
421  {
422  std::stringstream buffer;
423  buffer << "FragmentMessageHeader::write: ";
424  buffer << sizeof(clock) << " byte clock encoding cannot";
425  buffer << " fit in ";
426  buffer << buffer_remaining << " byte buffer\n";
427 
428  throw exceptions::MemoryException(buffer.str());
429  }
430  buffer_remaining -= sizeof(clock);
431 
432  // Write timestamp field from the buffer and update accordingly
433  if((size_t)buffer_remaining >= sizeof(timestamp))
434  {
435  *(uint64_t*)buffer = madara::utility::endian_swap(timestamp);
436  buffer += sizeof(timestamp);
437  }
438  else
439  {
440  std::stringstream buffer;
441  buffer << "FragmentMessageHeader::write: ";
442  buffer << sizeof(timestamp) << " byte timestamp encoding cannot";
443  buffer << " fit in ";
444  buffer << buffer_remaining << " byte buffer\n";
445 
446  throw exceptions::MemoryException(buffer.str());
447  }
448  buffer_remaining -= sizeof(timestamp);
449 
450  if(buffer_remaining >= 1)
451  {
452  memcpy(buffer, &ttl, 1);
453  buffer += 1;
454  }
455  else
456  {
457  std::stringstream buffer;
458  buffer << "FragmentMessageHeader::write: ";
459  buffer << "1 byte ttl encoding cannot";
460  buffer << " fit in ";
461  buffer << buffer_remaining << " byte buffer\n";
462 
463  throw exceptions::MemoryException(buffer.str());
464  }
465  buffer_remaining -= 1;
466 
467  // Write updates field from the buffer and update accordingly
468  if((size_t)buffer_remaining >= sizeof(update_number))
469  {
470  *(uint32_t*)buffer = madara::utility::endian_swap(update_number);
471  buffer += sizeof(update_number);
472  }
473  else
474  {
475  std::stringstream buffer;
476  buffer << "FragmentMessageHeader::write: ";
477  buffer << sizeof(update_number) << " byte update number encoding cannot";
478  buffer << " fit in ";
479  buffer << buffer_remaining << " byte buffer\n";
480 
481  throw exceptions::MemoryException(buffer.str());
482  }
483  buffer_remaining -= sizeof(update_number);
484 
485  // Write updates field from the buffer and update accordingly
486  if((size_t)buffer_remaining >= sizeof(total_size))
487  {
488  *(uint64_t*)buffer = madara::utility::endian_swap(total_size);
489  buffer += sizeof(total_size);
490  }
491  else
492  {
493  std::stringstream buffer;
494  buffer << "FragmentMessageHeader::write: ";
495  buffer << sizeof(total_size) << " byte total size encoding cannot";
496  buffer << " fit in ";
497  buffer << buffer_remaining << " byte buffer\n";
498 
499  throw exceptions::MemoryException(buffer.str());
500  }
501  buffer_remaining -= sizeof(total_size);
502 
503  return buffer;
504 }
505 
507  const MessageHeader& other)
508 {
509  const FragmentMessageHeader* rhs =
510  dynamic_cast<const FragmentMessageHeader*>(&other);
511 
512  return size == rhs->size && type == rhs->type && updates == rhs->updates &&
513  update_number == rhs->update_number &&
514  total_size == rhs->total_size &&
515  quality == rhs->quality &&
516  clock == rhs->clock && timestamp == rhs->timestamp &&
517  strncmp(madara_id, rhs->madara_id, MADARA_IDENTIFIER_LENGTH) == 0 &&
518  strncmp(domain, rhs->domain, MADARA_DOMAIN_MAX_LENGTH) == 0 &&
519  strncmp(originator, rhs->originator, MAX_ORIGINATOR_LENGTH) == 0;
520 }
521 
522 char* madara::transport::defrag(FragmentMap& map, uint64_t & total_size)
523 {
524  char* result = 0;
525 
527  "transport::defrag:"
528  " defragging fragment map\n");
529 
530  FragmentMap::iterator i = map.find(0);
531  if(i != map.end())
532  {
533  FragmentMessageHeader header;
534  int64_t buffer_remaining(header.encoded_size());
535  const char* buffer = header.read(i->second.get(), buffer_remaining);
536 
538  "transport::defrag:"
539  " inspecting frag: %s\n", header.to_string().c_str());
540 
541  // do we have enough updates to defragment?
542  if(header.updates <= map.size())
543  {
545  "transport::defrag:"
546  " the map is large enough to contain updates\n");
547 
548  int64_t size = 0;
549  // if(MessageHeader::message_header_test(buffer))
550  // {
551  // madara_logger_ptr_log(logger::global_logger.get(), logger::LOG_DETAILED,
552  // "transport::defrag:"
553  // " regular message header detected\n");
554 
555  // MessageHeader contents_header;
556  // buffer_remaining = contents_header.encoded_size();
557  // contents_header.read(buffer, buffer_remaining);
558  // size = contents_header.size;
559  // }
560  // else if(ReducedMessageHeader::message_header_test(buffer))
561  // {
562  // madara_logger_ptr_log(logger::global_logger.get(), logger::LOG_DETAILED,
563  // "transport::defrag:"
564  // " reduced message header detected\n");
565 
566  // ReducedMessageHeader contents_header;
567  // buffer_remaining = contents_header.encoded_size();
568  // contents_header.read(buffer, buffer_remaining);
569  // size = contents_header.size;
570  // }
571  size = header.total_size;
572 
573  result = new char[size];
574  char* lhs = result;
575 
576  uint32_t actual_size = (uint32_t)header.size - header.encoded_size();
577  buffer_remaining = actual_size;
578 
579  if(size >= 0)
580  {
582  "transport::defrag: copying buffer to lhs\n");
583 
584  memcpy(lhs, buffer, buffer_remaining);
585  buffer += actual_size;
586  lhs += actual_size;
587  size -= actual_size;
588  }
589 
590  if(i != map.end())
591  ++i;
592 
593  // if so, iterate over the fragments and copy the contents
594  for(; i != map.end(); ++i)
595  {
597  "transport::defrag: reading header of new fragment\n");
598 
599  buffer_remaining = header.encoded_size();
600  buffer = header.read(i->second.get(), buffer_remaining);
601  actual_size =(uint32_t)header.size - header.encoded_size();
602  buffer_remaining = actual_size;
603 
605  "transport::defrag:"
606  " piecing together frag: %s\n", header.to_string().c_str());
607 
608  if(size >= 0)
609  {
612  "transport::defrag: copying buffer to lhs\n");
613 
614  memcpy(lhs, buffer, buffer_remaining);
615  buffer += actual_size;
616  lhs += actual_size;
617  size -= actual_size;
618  }
619  }
620  }
621 
622  total_size = header.total_size;
623  }
624 
625  return result;
626 }
627 
629 {
630  for(FragmentMap::iterator i = map.begin(); i != map.end(); ++i)
631  {
632  i->second = (const char*)0;
633  }
634  map.clear();
635 }
636 
637 char* madara::transport::add_fragment(const char* originator, uint64_t clock,
638  uint32_t update_number, const char* fragment, uint32_t queue_length,
639  OriginatorFragmentMap& map, uint64_t & total_size, bool clear)
640 {
641  char* result = 0;
642  total_size = 0;
643 
655  "transport::add_fragment: adding fragment\n");
656 
657  utility::ScopedArray<const char> new_fragment = 0;
658  const char* cur_buffer = fragment;
659  FragmentMessageHeader header;
660  int64_t buffer_remaining = header.encoded_size();
661 
663  "transport::add_fragment:"
664  " reading header from buffer\n");
665 
666  header.read(cur_buffer, buffer_remaining);
667 
669  "transport::add_fragment:"
670  " inspecting frag: %s\n", header.to_string().c_str());
671 
672  if(header.size > 0)
673  {
675  "transport::add_fragment:"
676  " reading fragment of size %" PRIu64 " into new buffer\n",
677  header.size);
678 
679  char * new_buffer = new char[header.size];
680  memcpy(new_buffer, fragment, header.size);
681  new_fragment = new_buffer;
682  }
683  else
684  {
685  return 0;
686  }
687 
689  "transport::add_fragment:"
690  " searching for originator %s.\n",
691  originator);
692 
693  OriginatorFragmentMap::iterator orig_map = map.find(originator);
694  if(orig_map == map.end())
695  {
697  "transport::add_fragment:"
698  " creating entry for originator %s.\n",
699  originator);
700 
701  // originator does not exist(1)
702  map[originator][clock][update_number] = new_fragment;
703  }
704  else
705  {
707  "transport::add_fragment:"
708  " originator %s exists in fragment map.\n",
709  originator);
710 
711  ClockFragmentMap& clock_map(orig_map->second);
712  ClockFragmentMap::iterator clock_found = clock_map.find(clock);
713 
714  if(clock_found != clock_map.end())
715  {
716  // we have found the clock entry
717  if(clock_found->second.find(update_number) == clock_found->second.end())
718  {
719  if(clock_found->second.size() != 0)
720  {
722  "transport::add_fragment:"
723  " %s:%d is being added.\n",
724  originator, update_number);
725 
726  // the fragment does not exist yet
727  clock_found->second[update_number] = new_fragment;
728 
729  // check for a new buffer
730  result = defrag(clock_found->second, total_size);
731 
732  if(result && clear)
733  {
736  "transport::add_fragment:"
737  " %s:%d is complete. Deleting fragments.\n",
738  originator, update_number);
739 
740  delete_fragments(clock_found->second);
741  clock_map.erase(clock);
742  }
743  else if(result)
744  {
747  "transport::add_fragment:"
748  " %s:%d is complete. Not deleting fragments.\n",
749  originator, update_number);
750  }
751  else
752  {
755  "transport::add_fragment:"
756  " %s:%d is incomplete. %d size.\n",
757  originator, update_number, (int)clock_found->second.size());
758  }
759  }
760  else
761  {
762  // if we get here, the message has already been defragged
764  "transport::add_fragment:"
765  " %s:%d has been previously defragged and fragments deleted.\n",
766  originator, update_number);
767  }
768  }
769  }
770 
771  else if(clock_map.size() < queue_length)
772  {
773  // if we get here, the message has already been defragged
775  "transport::add_fragment:"
776  " %s:%d is being added to queue.\n",
777  originator, update_number);
778 
779  // clock queue has not been exhausted(2)
780  clock_map[clock][update_number] = new_fragment;
781  }
782  else
783  {
785  "transport::add_fragment:"
786  " %s:%d is being added to queue after a deletion\n",
787  originator, update_number);
788 
789  uint32_t oldest =(uint32_t)clock_map.begin()->first;
790 
791  if(oldest < clock)
792  {
794  "transport::add_fragment:"
795  " deleting fragments.\n",
796  originator, update_number);
797 
798  FragmentMap& fragments = clock_map[oldest];
799 
800  // delete all fragments in the clock entry
801  for(FragmentMap::iterator i = fragments.begin(); i != fragments.end();
802  ++i)
803  {
804  i->second = (const char*)0;
805  }
806 
808  "transport::add_fragment:"
809  " erasing old clock.\n",
810  originator, update_number);
811 
812  // erase the oldest clock fragment map
813  clock_map.erase(oldest);
814 
815  // replace it in the logical queue with the new clock fragment
816  clock_map[clock][update_number] = new_fragment;
817  }
818  }
819  }
820 
821  return result;
822 }
823 
825 {
826  if(this != &header)
827  {
828  clock = header.clock;
831  quality = header.quality;
832  size = header.size;
833  timestamp = header.timestamp;
834  ttl = header.ttl;
835  type = header.type;
836  updates = header.updates;
837  }
838 }
839 
841  const char* source, uint64_t total_size,
842  const char* originator, const char* domain,
843  uint64_t clock,
844  uint64_t timestamp,
845  uint32_t quality, unsigned char ttl,
846  uint64_t fragment_size, FragmentMap& map)
847 {
848  if(fragment_size > 0)
849  {
851  "transport::frag:"
852  " fragmenting character stream into %d byte packets.\n",
853  fragment_size);
854 
855  uint64_t data_per_packet = fragment_size;
856 // fragment_size - FragmentMessageHeader::static_encoded_size();
857 
858  const char* buffer = source;
859  FragmentMessageHeader header;
860  header.size = total_size;
861  header.total_size = total_size;
864  header.clock = clock;
865  header.timestamp = timestamp;
866  header.quality = quality;
867  header.ttl = ttl;
868 
869  // if(MessageHeader::message_header_test(source))
870  // {
871  // madara_logger_ptr_log(logger::global_logger.get(), logger::LOG_DETAILED,
872  // "transport::frag:"
873  // " regular message header detected\n",
874  // fragment_size);
875 
876  // MessageHeader contents_header;
877  // int64_t buffer_remaining = contents_header.encoded_size();
878  // contents_header.read(source, buffer_remaining);
879  // header = contents_header;
880  // }
881  // else if(ReducedMessageHeader::message_header_test(source))
882  // {
883  // madara_logger_ptr_log(logger::global_logger.get(), logger::LOG_DETAILED,
884  // "transport::frag:"
885  // " reduced message header detected\n");
886 
887  // ReducedMessageHeader contents_header;
888  // int64_t buffer_remaining = contents_header.encoded_size();
889  // contents_header.read(source, buffer_remaining);
890  // header = contents_header;
891  // }
892 
893 // total_size = header.size;
894  header.updates =(uint32_t)(total_size / data_per_packet);
895  uint64_t last_size = total_size % data_per_packet;
896 
897  if(last_size != 0)
898  {
899  ++header.updates;
900  }
901 
903  "transport::frag:"
904  " inspecting frag: %s\n", header.to_string().c_str());
905 
907  "transport::frag:"
908  " iterating over %d updates. last_size=%d\n",
909  (int)header.updates, (int)last_size);
910 
911  for(uint32_t i = 0; i < header.updates; ++i)
912  {
913  char* new_frag;
914  size_t cur_size;
915  int64_t buffer_remaining;
916  uint64_t actual_data_size;
917 
918  if(i == header.updates - 1 && last_size != 0)
919  {
920  cur_size = (size_t)last_size + FragmentMessageHeader::static_encoded_size();
921  }
922  else
923  {
924  cur_size = (size_t)fragment_size + FragmentMessageHeader::static_encoded_size();
925  }
926 
927  buffer_remaining = cur_size;
928  actual_data_size =
930  new_frag = new char[cur_size];
931 
932  map[i] = new_frag;
933 
935  "transport::frag:"
936  " writing %d packet of size %d (non-header:%d).\n",
937  (int)i, (int)cur_size, (int)actual_data_size);
938 
939  header.update_number = i;
940  header.size = cur_size;
941 
943  "transport::frag:"
944  " inspecting frag: %s\n", header.to_string().c_str());
945 
946 
947  new_frag = header.write(new_frag, buffer_remaining);
948  memcpy(new_frag, buffer, (size_t)actual_data_size);
949  buffer += actual_data_size;
950  total_size -= actual_data_size;
951  }
952  }
953 }
954 
956  const char* originator, uint64_t clock, OriginatorFragmentMap& map)
957 {
958  bool result = false;
959 
960  OriginatorFragmentMap::iterator orig_map = map.find(originator);
961 
962  if(orig_map != map.end())
963  {
965  "transport::is_complete:"
966  " %s was found.\n",
967  originator);
968 
969  ClockFragmentMap& clock_map(orig_map->second);
970  ClockFragmentMap::iterator clock_found = clock_map.find(clock);
971 
972  if(clock_found != clock_map.end())
973  {
975  "transport::is_complete:"
976  " %s:%" PRIu64 " was found.\n",
977  originator, clock);
978 
979  uint64_t size = clock_found->second.size();
980  FragmentMap::iterator i = clock_found->second.find(0);
981 
982  if(i != clock_found->second.end())
983  {
985  "transport::is_complete:"
986  " %s:%" PRIu64 ": 0 == fragmap.end.\n",
987  originator, clock);
988 
989  if(FragmentMessageHeader::get_updates(i->second.get()) == size)
990  {
992  "transport::is_complete:"
993  " %s:%" PRIu64 ": size == %" PRIu64 ", updates == %" PRIu32
994  ". COMPLETE\n",
995  originator, clock, size,
996  FragmentMessageHeader::get_updates(i->second.get()));
997 
998  result = true;
999  }
1000  else
1001  {
1003  "transport::is_complete:"
1004  " %s:%" PRIu64 ": size == %" PRIu64 ", updates == %" PRIu32
1005  ". INCOMPLETE\n",
1006  originator, clock, size,
1007  FragmentMessageHeader::get_updates(i->second.get()));
1008  }
1009  }
1010  else if(size == 0)
1011  {
1013  "transport::is_complete:"
1014  " %s:%" PRIu64 ": size == 0 and i == 0. COMPLETE\n",
1015  originator, clock);
1016 
1017  result = true;
1018  }
1019  }
1020  else
1021  {
1023  "transport::is_complete:"
1024  " %s:%" PRIu64 " was not found.\n",
1025  originator, clock);
1026  }
1027  }
1028  else
1029  {
1031  "transport::is_complete:"
1032  " %s was not found.\n",
1033  originator);
1034  }
1035 
1036  return result;
1037 }
1038 
1039 bool madara::transport::exists(const char* originator, uint64_t clock,
1040  uint32_t update_number, OriginatorFragmentMap& map)
1041 {
1042  bool result = false;
1043 
1044  OriginatorFragmentMap::iterator orig_map = map.find(originator);
1045 
1046  if(orig_map != map.end())
1047  {
1048  ClockFragmentMap& clock_map(orig_map->second);
1049  ClockFragmentMap::iterator clock_found = clock_map.find(clock);
1050 
1051  if(clock_found != clock_map.end())
1052  {
1053  if(clock_found->second.find(update_number) != clock_found->second.end())
1054  {
1055  result = true;
1056  }
1057  }
1058  }
1059 
1060  return result;
1061 }
#define FRAGMENTATION_MADARA_ID
Definition: Fragmentation.h:25
#define madara_logger_ptr_log(loggering, level,...)
Fast version of the madara::logger::log method for Logger pointers.
Definition: Logger.h:41
#define MADARA_DOMAIN_MAX_LENGTH
Definition: MessageHeader.h:23
#define MADARA_IDENTIFIER_LENGTH
Definition: MessageHeader.h:21
#define MAX_ORIGINATOR_LENGTH
Definition: MessageHeader.h:28
An exception for general memory errors like out-of-memory.
Defines a fragmentation header which allows for multi-part messages that are only applied once all fr...
Definition: Fragmentation.h:51
virtual std::string to_string(void) override
Converts the structure into a human-readable string.
static uint32_t static_encoded_size(void)
Returns the size of the encoded MessageHeader class, which may be different from sizeof (MessageHeade...
virtual const char * read(const char *buffer, int64_t &buffer_remaining) override
Reads a MessageHeader instance from a buffer and updates the amount of buffer room remaining.
virtual bool equals(const MessageHeader &other) override
Compares the fields of this instance to another instance.
static uint32_t get_updates(const char *buffer)
Returns the number of updates indicated in the header.
virtual char * write(char *buffer, int64_t &buffer_remaining) override
Writes a MessageHeader instance to a buffer and updates the amount of buffer room remaining.
void operator=(MessageHeader &header)
Assignment operator for regular message header.
virtual uint32_t encoded_size(void) const override
Returns the size of the encoded MessageHeader class, which may be different from sizeof (MessageHeade...
Defines a robust message header which is the default for KaRL messages.
Definition: MessageHeader.h:57
uint32_t quality
the quality of the message sender
uint64_t timestamp
the timestamp of the sender when the message was generated
char domain[32]
the domain that this message is intended for
char originator[64]
the originator of the message (host:port)
uint32_t updates
the number of knowledge variable updates in the message
uint32_t type
the type of message
uint64_t clock
the clock of the sender when the message was generated
uint64_t size
the size of this header plus the updates
virtual std::string to_string(void)
Converts the relevant fields to a printable string.
char madara_id[8]
the identifier of this transport (MADARA_IDENTIFIER)
unsigned char ttl
time to live (number of rebroadcasts to perform after original send
This template class provides transparent reference counting of its template parameter T.
Definition: ScopedArray.h:23
constexpr string_t string
T get(const KnowledgeRecord &kr)
Get the value of a KnowlegeRecord.
Definition: GetRecord.h:121
MADARA_EXPORT utility::Refcounter< logger::Logger > global_logger
std::map< uint32_t, utility::ScopedArray< const char > > FragmentMap
Map of fragment identifiers to fragments.
MADARA_EXPORT char * add_fragment(const char *originator, uint64_t clock, uint32_t update_number, const char *fragment, uint32_t queue_length, OriginatorFragmentMap &map, uint64_t &total_size, bool clear=true)
Adds a fragment to an originator fragment map and returns the aggregate message if the message is com...
MADARA_EXPORT bool is_complete(const char *originator, uint64_t clock, OriginatorFragmentMap &map)
Breaks a large packet into smaller packets.
std::map< std::string, ClockFragmentMap > OriginatorFragmentMap
Map of originator to a map of clocks to fragments.
std::map< uint64_t, FragmentMap > ClockFragmentMap
Map of clocks to fragments.
MADARA_EXPORT void frag(const char *source, uint64_t total_size, const char *originator, const char *domain, uint64_t clock, uint64_t timestamp, uint32_t quality, unsigned char ttl, uint64_t fragment_size, FragmentMap &map)
Breaks a large packet into smaller packets.
MADARA_EXPORT bool exists(const char *originator, uint64_t clock, uint32_t update_number, OriginatorFragmentMap &map)
Checks if a fragment already exists within a fragment map.
MADARA_EXPORT char * defrag(FragmentMap &map, uint64_t &total_size)
Pieces together a fragment map into a single buffer.
MADARA_EXPORT void delete_fragments(FragmentMap &map)
Deletes fragments within a fragment map and clears the map.
uint64_t endian_swap(uint64_t value)
Converts a host format uint64_t into big endian.
Definition: Utility.inl:134
MADARA_EXPORT void strncpy_safe(char *dst, const char *src, size_t dst_size)
Performs a strncpy in a way that will compile without warnings.
Definition: Utility.cpp:376