25 return sizeof(uint64_t) * 4
33 return sizeof(uint64_t) * 4
48 std::stringstream buffer;
49 buffer <<
"frag 12: ";
50 buffer <<
"index(4:" << update_number <<
"), ";
51 buffer <<
"total_size(8:" << total_size <<
"), msg: ";
58 const char* buffer, int64_t& buffer_remaining)
61 if((
size_t)buffer_remaining >=
sizeof(size))
63 memcpy(&size, buffer,
sizeof(size));
65 buffer +=
sizeof(size);
69 std::stringstream buffer;
70 buffer <<
"FragmentMessageHeader::read: ";
71 buffer <<
sizeof(size) <<
" byte size field cannot fit in ";
72 buffer << buffer_remaining <<
" byte buffer\n";
76 buffer_remaining -=
sizeof(size);
86 std::stringstream buffer;
87 buffer <<
"FragmentMessageHeader::read: ";
89 buffer << buffer_remaining <<
" byte buffer\n";
103 std::stringstream buffer;
104 buffer <<
"FragmentMessageHeader::read: ";
106 buffer <<
" fit in ";
107 buffer << buffer_remaining <<
" byte buffer\n";
121 std::stringstream buffer;
122 buffer <<
"FragmentMessageHeader::read: ";
124 buffer <<
" fit in ";
125 buffer << buffer_remaining <<
" byte buffer\n";
132 if((
size_t)buffer_remaining >=
sizeof(type))
134 memcpy(&type, buffer,
sizeof(type));
136 buffer +=
sizeof(type);
140 std::stringstream buffer;
141 buffer <<
"FragmentMessageHeader::read: ";
142 buffer <<
sizeof(type) <<
" byte type encoding cannot";
143 buffer <<
" fit in ";
144 buffer << buffer_remaining <<
" byte buffer\n";
148 buffer_remaining -=
sizeof(type);
151 if((
size_t)buffer_remaining >=
sizeof(updates))
153 memcpy(&updates, buffer,
sizeof(updates));
155 buffer +=
sizeof(updates);
159 std::stringstream buffer;
160 buffer <<
"FragmentMessageHeader::read: ";
161 buffer <<
sizeof(updates) <<
" byte updates encoding cannot";
162 buffer <<
" fit in ";
163 buffer << buffer_remaining <<
" byte buffer\n";
167 buffer_remaining -=
sizeof(updates);
170 if((
size_t)buffer_remaining >=
sizeof(quality))
172 memcpy(&quality, buffer,
sizeof(quality));
174 buffer +=
sizeof(quality);
178 std::stringstream buffer;
179 buffer <<
"FragmentMessageHeader::read: ";
180 buffer <<
sizeof(quality) <<
" byte quality encoding cannot";
181 buffer <<
" fit in ";
182 buffer << buffer_remaining <<
" byte buffer\n";
186 buffer_remaining -=
sizeof(quality);
189 if((
size_t)buffer_remaining >=
sizeof(clock))
191 memcpy(&clock, buffer,
sizeof(clock));
193 buffer +=
sizeof(clock);
197 std::stringstream buffer;
198 buffer <<
"FragmentMessageHeader::read: ";
199 buffer <<
sizeof(clock) <<
" byte clock encoding cannot";
200 buffer <<
" fit in ";
201 buffer << buffer_remaining <<
" byte buffer\n";
205 buffer_remaining -=
sizeof(clock);
208 if((
size_t)buffer_remaining >=
sizeof(timestamp))
210 memcpy(×tamp, buffer,
sizeof(timestamp));
212 buffer +=
sizeof(timestamp);
216 std::stringstream buffer;
217 buffer <<
"FragmentMessageHeader::read: ";
218 buffer <<
sizeof(timestamp) <<
" byte timestamp encoding cannot";
219 buffer <<
" fit in ";
220 buffer << buffer_remaining <<
" byte buffer\n";
224 buffer_remaining -=
sizeof(timestamp);
227 if(buffer_remaining >= 1)
229 memcpy(&ttl, buffer, 1);
234 std::stringstream buffer;
235 buffer <<
"FragmentMessageHeader::read: ";
236 buffer <<
"1 byte ttl encoding cannot";
237 buffer <<
" fit in ";
238 buffer << buffer_remaining <<
" byte buffer\n";
242 buffer_remaining -= 1;
245 if((
size_t)buffer_remaining >=
sizeof(update_number))
247 memcpy(&update_number, buffer,
sizeof(update_number));
249 buffer +=
sizeof(update_number);
253 std::stringstream buffer;
254 buffer <<
"FragmentMessageHeader::read: ";
255 buffer <<
sizeof(update_number) <<
" byte update number encoding cannot";
256 buffer <<
" fit in ";
257 buffer << buffer_remaining <<
" byte buffer\n";
261 buffer_remaining -=
sizeof(update_number);
264 if((
size_t)buffer_remaining >=
sizeof(total_size))
266 memcpy(&total_size, buffer,
sizeof(total_size));
268 buffer +=
sizeof(total_size);
272 std::stringstream buffer;
273 buffer <<
"FragmentMessageHeader::read: ";
274 buffer <<
sizeof(total_size) <<
" byte total size encoding cannot";
275 buffer <<
" fit in ";
276 buffer << buffer_remaining <<
" byte buffer\n";
280 buffer_remaining -=
sizeof(total_size);
286 char* buffer, int64_t& buffer_remaining)
289 if((
size_t)buffer_remaining >=
sizeof(size))
292 buffer +=
sizeof(size);
296 std::stringstream buffer;
297 buffer <<
"FragmentMessageHeader::write: ";
298 buffer <<
sizeof(size) <<
" byte size encoding cannot";
299 buffer <<
" fit in ";
300 buffer << buffer_remaining <<
" byte buffer\n";
304 buffer_remaining -=
sizeof(size);
314 std::stringstream buffer;
315 buffer <<
"FragmentMessageHeader::write: ";
317 buffer <<
" fit in ";
318 buffer << buffer_remaining <<
" byte buffer\n";
332 std::stringstream buffer;
333 buffer <<
"FragmentMessageHeader::write: ";
335 buffer <<
" fit in ";
336 buffer << buffer_remaining <<
" byte buffer\n";
350 std::stringstream buffer;
351 buffer <<
"FragmentMessageHeader::write: ";
353 buffer <<
" fit in ";
354 buffer << buffer_remaining <<
" byte buffer\n";
361 if((
size_t)buffer_remaining >=
sizeof(type))
364 buffer +=
sizeof(type);
368 std::stringstream buffer;
369 buffer <<
"FragmentMessageHeader::write: ";
370 buffer <<
sizeof(type) <<
" byte type encoding cannot";
371 buffer <<
" fit in ";
372 buffer << buffer_remaining <<
" byte buffer\n";
376 buffer_remaining -=
sizeof(type);
379 if((
size_t)buffer_remaining >=
sizeof(updates))
382 buffer +=
sizeof(updates);
386 std::stringstream buffer;
387 buffer <<
"FragmentMessageHeader::write: ";
388 buffer <<
sizeof(updates) <<
" byte updates encoding cannot";
389 buffer <<
" fit in ";
390 buffer << buffer_remaining <<
" byte buffer\n";
394 buffer_remaining -=
sizeof(updates);
397 if((
size_t)buffer_remaining >=
sizeof(quality))
400 buffer +=
sizeof(quality);
404 std::stringstream buffer;
405 buffer <<
"FragmentMessageHeader::write: ";
406 buffer <<
sizeof(quality) <<
" byte quality encoding cannot";
407 buffer <<
" fit in ";
408 buffer << buffer_remaining <<
" byte buffer\n";
412 buffer_remaining -=
sizeof(quality);
415 if((
size_t)buffer_remaining >=
sizeof(clock))
418 buffer +=
sizeof(clock);
422 std::stringstream buffer;
423 buffer <<
"FragmentMessageHeader::write: ";
424 buffer <<
sizeof(clock) <<
" byte clock encoding cannot";
425 buffer <<
" fit in ";
426 buffer << buffer_remaining <<
" byte buffer\n";
430 buffer_remaining -=
sizeof(clock);
433 if((
size_t)buffer_remaining >=
sizeof(timestamp))
436 buffer +=
sizeof(timestamp);
440 std::stringstream buffer;
441 buffer <<
"FragmentMessageHeader::write: ";
442 buffer <<
sizeof(timestamp) <<
" byte timestamp encoding cannot";
443 buffer <<
" fit in ";
444 buffer << buffer_remaining <<
" byte buffer\n";
448 buffer_remaining -=
sizeof(timestamp);
450 if(buffer_remaining >= 1)
452 memcpy(buffer, &ttl, 1);
457 std::stringstream buffer;
458 buffer <<
"FragmentMessageHeader::write: ";
459 buffer <<
"1 byte ttl encoding cannot";
460 buffer <<
" fit in ";
461 buffer << buffer_remaining <<
" byte buffer\n";
465 buffer_remaining -= 1;
468 if((
size_t)buffer_remaining >=
sizeof(update_number))
471 buffer +=
sizeof(update_number);
475 std::stringstream buffer;
476 buffer <<
"FragmentMessageHeader::write: ";
477 buffer <<
sizeof(update_number) <<
" byte update number encoding cannot";
478 buffer <<
" fit in ";
479 buffer << buffer_remaining <<
" byte buffer\n";
483 buffer_remaining -=
sizeof(update_number);
486 if((
size_t)buffer_remaining >=
sizeof(total_size))
489 buffer +=
sizeof(total_size);
493 std::stringstream buffer;
494 buffer <<
"FragmentMessageHeader::write: ";
495 buffer <<
sizeof(total_size) <<
" byte total size encoding cannot";
496 buffer <<
" fit in ";
497 buffer << buffer_remaining <<
" byte buffer\n";
501 buffer_remaining -=
sizeof(total_size);
512 return size == rhs->
size && type == rhs->
type && updates == rhs->
updates &&
528 " defragging fragment map\n");
530 FragmentMap::iterator i = map.find(0);
535 const char* buffer = header.
read(i->second.get(), buffer_remaining);
539 " inspecting frag: %s\n", header.
to_string().c_str());
542 if(header.
updates <= map.size())
546 " the map is large enough to contain updates\n");
573 result =
new char[size];
577 buffer_remaining = actual_size;
582 "transport::defrag: copying buffer to lhs\n");
584 memcpy(lhs, buffer, buffer_remaining);
585 buffer += actual_size;
594 for(; i != map.end(); ++i)
597 "transport::defrag: reading header of new fragment\n");
600 buffer = header.
read(i->second.get(), buffer_remaining);
602 buffer_remaining = actual_size;
606 " piecing together frag: %s\n", header.
to_string().c_str());
612 "transport::defrag: copying buffer to lhs\n");
614 memcpy(lhs, buffer, buffer_remaining);
615 buffer += actual_size;
630 for(FragmentMap::iterator i = map.begin(); i != map.end(); ++i)
632 i->second = (
const char*)0;
638 uint32_t update_number,
const char* fragment, uint32_t queue_length,
655 "transport::add_fragment: adding fragment\n");
658 const char* cur_buffer = fragment;
663 "transport::add_fragment:"
664 " reading header from buffer\n");
666 header.
read(cur_buffer, buffer_remaining);
669 "transport::add_fragment:"
670 " inspecting frag: %s\n", header.
to_string().c_str());
675 "transport::add_fragment:"
676 " reading fragment of size %" PRIu64
" into new buffer\n",
679 char * new_buffer =
new char[header.
size];
680 memcpy(new_buffer, fragment, header.
size);
681 new_fragment = new_buffer;
689 "transport::add_fragment:"
690 " searching for originator %s.\n",
693 OriginatorFragmentMap::iterator orig_map = map.find(originator);
694 if(orig_map == map.end())
697 "transport::add_fragment:"
698 " creating entry for originator %s.\n",
702 map[originator][clock][update_number] = new_fragment;
707 "transport::add_fragment:"
708 " originator %s exists in fragment map.\n",
712 ClockFragmentMap::iterator clock_found = clock_map.find(clock);
714 if(clock_found != clock_map.end())
717 if(clock_found->second.find(update_number) == clock_found->second.end())
719 if(clock_found->second.size() != 0)
722 "transport::add_fragment:"
723 " %s:%d is being added.\n",
724 originator, update_number);
727 clock_found->second[update_number] = new_fragment;
730 result =
defrag(clock_found->second, total_size);
736 "transport::add_fragment:"
737 " %s:%d is complete. Deleting fragments.\n",
738 originator, update_number);
741 clock_map.erase(clock);
747 "transport::add_fragment:"
748 " %s:%d is complete. Not deleting fragments.\n",
749 originator, update_number);
755 "transport::add_fragment:"
756 " %s:%d is incomplete. %d size.\n",
757 originator, update_number, (
int)clock_found->second.size());
764 "transport::add_fragment:"
765 " %s:%d has been previously defragged and fragments deleted.\n",
766 originator, update_number);
771 else if(clock_map.size() < queue_length)
775 "transport::add_fragment:"
776 " %s:%d is being added to queue.\n",
777 originator, update_number);
780 clock_map[clock][update_number] = new_fragment;
785 "transport::add_fragment:"
786 " %s:%d is being added to queue after a deletion\n",
787 originator, update_number);
789 uint32_t oldest =(uint32_t)clock_map.begin()->first;
794 "transport::add_fragment:"
795 " deleting fragments.\n",
796 originator, update_number);
801 for(FragmentMap::iterator i = fragments.begin(); i != fragments.end();
804 i->second = (
const char*)0;
808 "transport::add_fragment:"
809 " erasing old clock.\n",
810 originator, update_number);
813 clock_map.erase(oldest);
816 clock_map[clock][update_number] = new_fragment;
828 clock = header.
clock;
841 const char* source, uint64_t total_size,
842 const char* originator,
const char* domain,
845 uint32_t quality,
unsigned char ttl,
848 if(fragment_size > 0)
852 " fragmenting character stream into %d byte packets.\n",
855 uint64_t data_per_packet = fragment_size;
858 const char* buffer = source;
860 header.
size = total_size;
864 header.
clock = clock;
894 header.
updates =(uint32_t)(total_size / data_per_packet);
895 uint64_t last_size = total_size % data_per_packet;
904 " inspecting frag: %s\n", header.
to_string().c_str());
908 " iterating over %d updates. last_size=%d\n",
909 (
int)header.
updates, (
int)last_size);
911 for(uint32_t i = 0; i < header.
updates; ++i)
915 int64_t buffer_remaining;
916 uint64_t actual_data_size;
918 if(i == header.
updates - 1 && last_size != 0)
927 buffer_remaining = cur_size;
930 new_frag =
new char[cur_size];
936 " writing %d packet of size %d (non-header:%d).\n",
937 (
int)i, (
int)cur_size, (
int)actual_data_size);
940 header.
size = cur_size;
944 " inspecting frag: %s\n", header.
to_string().c_str());
947 new_frag = header.
write(new_frag, buffer_remaining);
948 memcpy(new_frag, buffer, (
size_t)actual_data_size);
949 buffer += actual_data_size;
950 total_size -= actual_data_size;
960 OriginatorFragmentMap::iterator orig_map = map.find(originator);
962 if(orig_map != map.end())
965 "transport::is_complete:"
970 ClockFragmentMap::iterator clock_found = clock_map.find(clock);
972 if(clock_found != clock_map.end())
975 "transport::is_complete:"
976 " %s:%" PRIu64
" was found.\n",
979 uint64_t size = clock_found->second.size();
980 FragmentMap::iterator i = clock_found->second.find(0);
982 if(i != clock_found->second.end())
985 "transport::is_complete:"
986 " %s:%" PRIu64
": 0 == fragmap.end.\n",
992 "transport::is_complete:"
993 " %s:%" PRIu64
": size == %" PRIu64
", updates == %" PRIu32
995 originator, clock, size,
1003 "transport::is_complete:"
1004 " %s:%" PRIu64
": size == %" PRIu64
", updates == %" PRIu32
1006 originator, clock, size,
1013 "transport::is_complete:"
1014 " %s:%" PRIu64
": size == 0 and i == 0. COMPLETE\n",
1023 "transport::is_complete:"
1024 " %s:%" PRIu64
" was not found.\n",
1031 "transport::is_complete:"
1032 " %s was not found.\n",
1042 bool result =
false;
1044 OriginatorFragmentMap::iterator orig_map = map.find(originator);
1046 if(orig_map != map.end())
1049 ClockFragmentMap::iterator clock_found = clock_map.find(clock);
1051 if(clock_found != clock_map.end())
1053 if(clock_found->second.find(update_number) != clock_found->second.end())
#define FRAGMENTATION_MADARA_ID
#define madara_logger_ptr_log(loggering, level,...)
Fast version of the madara::logger::log method for Logger pointers.
An exception for general memory errors like out-of-memory.
This template class provides transparent reference counting of its template parameter T.
T get(const KnowledgeRecord &kr)
Get the value of a KnowlegeRecord.
MADARA_EXPORT utility::Refcounter< logger::Logger > global_logger
std::map< uint32_t, utility::ScopedArray< const char > > FragmentMap
Map of fragment identifiers to fragments.
MADARA_EXPORT char * add_fragment(const char *originator, uint64_t clock, uint32_t update_number, const char *fragment, uint32_t queue_length, OriginatorFragmentMap &map, uint64_t &total_size, bool clear=true)
Adds a fragment to an originator fragment map and returns the aggregate message if the message is com...
MADARA_EXPORT bool is_complete(const char *originator, uint64_t clock, OriginatorFragmentMap &map)
Breaks a large packet into smaller packets.
std::map< std::string, ClockFragmentMap > OriginatorFragmentMap
Map of originator to a map of clocks to fragments.
std::map< uint64_t, FragmentMap > ClockFragmentMap
Map of clocks to fragments.
MADARA_EXPORT void frag(const char *source, uint64_t total_size, const char *originator, const char *domain, uint64_t clock, uint64_t timestamp, uint32_t quality, unsigned char ttl, uint64_t fragment_size, FragmentMap &map)
Breaks a large packet into smaller packets.
MADARA_EXPORT bool exists(const char *originator, uint64_t clock, uint32_t update_number, OriginatorFragmentMap &map)
Checks if a fragment already exists within a fragment map.
MADARA_EXPORT char * defrag(FragmentMap &map, uint64_t &total_size)
Pieces together a fragment map into a single buffer.
MADARA_EXPORT void delete_fragments(FragmentMap &map)
Deletes fragments within a fragment map and clears the map.
uint64_t endian_swap(uint64_t value)
Converts a host format uint64_t into big endian.
MADARA_EXPORT void strncpy_safe(char *dst, const char *src, size_t dst_size)
Performs a strncpy in a way that will compile without warnings.