MADARA  3.4.1
Transport.cpp
Go to the documentation of this file.
1 #include "Transport.h"
2 
6 
7 #include <algorithm>
8 
9 namespace madara
10 {
11 namespace transport
12 {
13 Base::Base(const std::string& id, TransportSettings& new_settings,
15  : is_valid_(false),
16  shutting_down_(false),
17  id_(id),
18  settings_(new_settings),
19  context_(context)
20 
21 #ifndef _MADARA_NO_KARL_
22  ,
23  on_data_received_(context.get_logger())
24 #endif // _MADARA_NO_KARL_
25 {
28 }
29 
31 
32 int Base::setup(void)
33 {
34  // check for an on_data_received ruleset
35  if(settings_.on_data_received_logic.length() != 0)
36  {
38  "transport::Base::setup"
39  " setting rules to %s\n",
41 
42 #ifndef _MADARA_NO_KARL_
43  expression::Interpreter interpreter;
46 #endif // _MADARA_NO_KARL_
47  }
48  else
49  {
51  "transport::Base::setup"
52  " no permanent rules were set\n");
53  }
54 
55  // setup the send buffer
56  if(settings_.queue_length > 0)
57  buffer_ = new char[settings_.queue_length];
58 
59  // if read domains has not been set, then set to write domain
60  if(settings_.num_read_domains() == 0)
61  {
63  "transport::Base::setup"
64  " no read domains set. Adding write domain (%s)\n",
65  settings_.write_domain.c_str());
66 
68  }
69  else
70  {
72  "transport::Base::setup"
73  " settings configured with %d read domains\n",
75  }
76 
77  if(settings_.num_read_domains() > 0 &&
79  {
80  std::vector<std::string> domains;
81  settings_.get_read_domains(domains);
82 
83  std::stringstream buffer;
84 
85  for(unsigned int i = 0; i < domains.size(); ++i)
86  {
87  buffer << domains[i];
88 
89  if(i != domains.size() - 1)
90  {
91  buffer << ", ";
92  }
93  }
94 
96  "transport::Base::setup"
97  " Write domain: %s. Read domains: %s\n",
98  settings_.write_domain.c_str(), buffer.str().c_str());
99  }
100 
101  return validate_transport();
102 }
103 
104 void Base::close(void)
105 {
107 }
108 
109 int process_received_update(const char* buffer, uint32_t bytes_read,
110  const std::string& id, knowledge::ThreadSafeContext& context,
111  const QoSTransportSettings& settings, BandwidthMonitor& send_monitor,
112  BandwidthMonitor& receive_monitor,
113  knowledge::KnowledgeMap& rebroadcast_records,
114 #ifndef _MADARA_NO_KARL_
115  knowledge::CompiledExpression& on_data_received,
116 #endif // _MADARA_NO_KARL_
117 
118  const char* print_prefix, const char* remote_host, MessageHeader*& header)
119 {
120  // reset header to 0, so it is safe to delete
121  header = 0;
122 
123  int max_buffer_size = (int)bytes_read;
124 
125  // tell the receive bandwidth monitor about the transaction
126  receive_monitor.add(bytes_read);
127 
129  "%s:"
130  " Receive bandwidth = %" PRIu64 " B/s\n",
131  print_prefix, receive_monitor.get_bytes_per_second());
132 
133  bool is_reduced = false;
134  bool is_fragment = false;
135 
137  "%s:"
138  " calling decode filters on %" PRIu32 " bytes\n",
139  print_prefix, bytes_read);
140 
141  // call decodes, if applicable
142  bytes_read = (uint32_t)settings.filter_decode(
143  (char*)buffer, max_buffer_size, max_buffer_size);
144 
146  "%s:"
147  " Decoding resulted in %" PRIu32 " final bytes\n",
148  print_prefix, bytes_read);
149 
150  // setup buffer remaining, used by the knowledge record read method
151  int64_t buffer_remaining = (int64_t)bytes_read;
152 
153  // clear the rebroadcast records
154  rebroadcast_records.clear();
155 
156  // receive records will be what we pass to the aggregate filter
157  knowledge::KnowledgeMap updates;
158 
159  // if a key appears multiple times, keep to add to buffer history
160  std::map<std::string, std::vector<knowledge::KnowledgeRecord>> past_updates;
161 
162  // check the buffer for a reduced message header
163  if(bytes_read >= ReducedMessageHeader::static_encoded_size() &&
165  {
167  "%s:"
168  " processing reduced KaRL message from %s\n",
169  print_prefix, remote_host);
170 
171  header = new ReducedMessageHeader();
172  is_reduced = true;
173  }
174  else if(bytes_read >= MessageHeader::static_encoded_size() &&
176  {
178  "%s:"
179  " %s: processing KaRL message from %s\n",
180  print_prefix, id.c_str(), remote_host);
181 
182  header = new MessageHeader();
183  }
184  else if(bytes_read >= FragmentMessageHeader::static_encoded_size() &&
186  {
188  "%s:"
189  " processing KaRL fragment message from %s\n",
190  print_prefix, remote_host);
191 
192  header = new FragmentMessageHeader();
193  is_fragment = true;
194  }
195  else if(bytes_read >= 8 + MADARA_IDENTIFIER_LENGTH)
196  {
197  // get the text that appears as identifier.
198  char identifier[MADARA_IDENTIFIER_LENGTH];
199  utility::strncpy_safe(identifier, buffer + 8, MADARA_IDENTIFIER_LENGTH);
200 
202  "%s:"
203  " dropping non-KaRL message with id %s from %s\n",
204  print_prefix, identifier, remote_host);
205 
206  return -1;
207  }
208  else
209  {
211  "%s:"
212  " dropping too short message from %s (length %i)\n",
213  print_prefix, remote_host, bytes_read);
214 
215  return -1;
216  }
217 
218  const char* update = header->read(buffer, buffer_remaining);
219 
221  "%s:"
222  " header info: %s\n",
223  print_prefix, header->to_string().c_str());
224 
225  if(header->size < bytes_read)
226  {
228  "%s:"
229  " Message header.size (%" PRIu64 " bytes) is less than actual"
230  " bytes read (%" PRIu32 " bytes). Dropping message.\n",
231  print_prefix, header->size, bytes_read);
232 
233  return -1;
234  }
235 
236  if(is_fragment && exists(header->originator, header->clock,
237  ((FragmentMessageHeader*)header)->update_number,
238  settings.fragment_map))
239  {
241  "%s:"
242  " Fragment already exists in fragment map. Dropping.\n",
243  print_prefix);
244 
245  return -1;
246  }
247 
248  if(!is_reduced)
249  {
250  // reject the message if it is us as the originator (no update necessary)
251  if(id == header->originator)
252  {
254  "%s:"
255  " %s: dropping message from ourself\n",
256  print_prefix, id.c_str());
257 
258  return -2;
259  }
260  else
261  {
263  "%s:"
264  " remote id (%s) is not our own\n",
265  print_prefix, remote_host);
266  }
267 
268  if(settings.is_trusted(remote_host))
269  {
271  "%s: remote id (%s) is trusted\n", print_prefix, remote_host);
272  }
273  else
274  {
276  "%s:"
277  " dropping message from untrusted peer (%s\n",
278  print_prefix, remote_host);
279 
280  // delete the header and continue to the svc loop
281  return -3;
282  }
283 
284  std::string originator(header->originator);
285 
286  if(settings.is_trusted(originator))
287  {
289  "%s:"
290  " originator (%s) is trusted\n",
291  print_prefix, originator.c_str());
292  }
293  else
294  {
296  "%s:"
297  " dropping message from untrusted originator (%s)\n",
298  print_prefix, originator.c_str());
299 
300  return -4;
301  }
302 
303  // reject the message if it is from a different domain
304  if(!settings.is_reading_domain(header->domain))
305  {
307  "%s:"
308  " remote id (%s) has an untrusted domain (%s). Dropping message.\n",
309  print_prefix, remote_host, header->domain);
310 
311  // delete the header and continue to the svc loop
312  return -5;
313  }
314  else
315  {
317  "%s:"
318  " remote id (%s) message is in our domain\n",
319  print_prefix, remote_host);
320  }
321  }
322 
323  // fragments are special cases
324  if(is_fragment)
325  {
326  // grab the fragment header
327  FragmentMessageHeader* frag_header =
328  dynamic_cast<FragmentMessageHeader*>(header);
329 
330  // total size of the fragmented packet
331  uint64_t total_size = 0;
332 
334  "%s:"
335  " Processing fragment %" PRIu32 " of %s:%" PRIu64 ".\n",
336  print_prefix, frag_header->update_number, frag_header->originator,
337  frag_header->clock);
338 
339  // add the fragment and attempt to defrag the message
340  char* message = transport::add_fragment(frag_header->originator,
341  frag_header->clock, frag_header->update_number, buffer,
342  settings.fragment_queue_length, settings.fragment_map, total_size, true);
343 
344  // if we have no return message, we may have previously defragged it
345  if(!message || total_size == 0)
346  {
347  return 0;
348  }
349 
350  // copy the intermediate buffer to the user-allocated buffer
351  char* buffer_override = (char*)buffer;
352  memcpy(buffer_override, message, total_size);
353 
354  // cleanup the old buffer. We should really have a zero-copy
355  delete[] message;
356 
357  int decode_result = (uint32_t)settings.filter_decode(
358  (char*)buffer, total_size, settings.queue_length);
359 
360  if (decode_result <= 0)
361  {
363  "%s:"
364  " ERROR: Unable to decode fragments. Likely incorrect filters.\n",
365  print_prefix);
366 
367  return 0;
368  }
369  else
370  {
372  "%s:"
373  " Message has been pieced together from fragments. Processing...\n",
374  print_prefix);
375 
381  buffer_remaining = (int64_t)decode_result;
382  if(buffer_remaining <= settings.queue_length &&
383  buffer_remaining > (int64_t)MessageHeader::static_encoded_size ())
384  {
385  delete header;
386 
387  // check the buffer for a reduced message header
389  {
391  "%s:"
392  " processing reduced KaRL message from %s\n",
393  print_prefix, remote_host);
394 
395  header = new ReducedMessageHeader();
396  is_reduced = true;
397  update = header->read(buffer, buffer_remaining);
398  }
399  else if(MessageHeader::message_header_test(buffer))
400  {
402  "%s:"
403  " processing KaRL message from %s\n",
404  print_prefix, remote_host);
405 
406  header = new MessageHeader();
407  update = header->read(buffer, buffer_remaining);
408  }
409  else
410  {
412  "%s:"
413  " ERROR: defrag resulted in unknown message header.\n",
414  print_prefix);
415 
416  return 0;
417  }
418 
419  madara_logger_log(context.get_logger(),
420  logger::LOG_MAJOR, "%s:"
421  " past fragment header create.\n",
422  print_prefix);
423  } // end if buffer remaining
424  } // end if decode didn't break
425  } // end if is fragment
426 
427 
428  int actual_updates = 0;
429  uint64_t current_time = utility::get_time();
430  double deadline = settings.get_deadline();
431 
432  madara_logger_log(context.get_logger(),
433  logger::LOG_MAJOR, "%s:"
434  " create transport_context with: "
435  "originator(%s), domain(%s), remote(%s), time(%" PRIu64 ").\n",
436  print_prefix, header->originator, header->domain, remote_host,
437  header->timestamp);
438 
440  receive_monitor.get_bytes_per_second(),
441  send_monitor.get_bytes_per_second(), header->timestamp, current_time,
442  header->domain, header->originator, remote_host);
443 
444  madara_logger_log(context.get_logger(),
445  logger::LOG_MAJOR, "%s:"
446  " past transport_context create.\n",
447  print_prefix);
448 
449  uint64_t latency(0);
450 
451  if(deadline >= 1.0)
452  {
453  if(header->timestamp < current_time)
454  {
455  latency = current_time - header->timestamp;
456 
457  if(latency > deadline)
458  {
460  "%s:"
461  " deadline violation (latency is %" PRIu64 ", deadline is %f).\n",
462  print_prefix, latency, deadline);
463 
464  return -6;
465  }
466  }
467  else
468  {
470  "%s:"
471  " Cannot compute message latency."
472  " Message header timestamp is in the future."
473  " message.timestamp = %" PRIu64 ", cur_timestamp = %" PRIu64 ".\n",
474  print_prefix, header->timestamp, current_time);
475  }
476  }
477 
479  "%s:"
480  " iterating over the %" PRIu32 " updates\n",
481  print_prefix, header->updates);
482 
483  // temporary record for reading from the updates buffer
485  record.quality = header->quality;
486  record.clock = header->clock;
487  std::string key;
488 
489  bool dropped = false;
490 
491  if(send_monitor.is_bandwidth_violated(settings.get_send_bandwidth_limit()))
492  {
493  dropped = true;
495  "%s:"
496  " Send monitor has detected violation of bandwidth limit."
497  " Dropping packet from rebroadcast list\n",
498  print_prefix);
499  }
500  else if(receive_monitor.is_bandwidth_violated(
501  settings.get_total_bandwidth_limit()))
502  {
503  dropped = true;
505  "%s:"
506  " Receive monitor has detected violation of bandwidth limit."
507  " Dropping packet from rebroadcast list...\n",
508  print_prefix);
509  }
510  else if(settings.get_participant_ttl() < header->ttl)
511  {
512  dropped = true;
514  "%s:"
515  " Transport participant TTL is lower than header ttl."
516  " Dropping packet from rebroadcast list...\n",
517  print_prefix);
518  }
519 
521  "%s:"
522  " Applying %" PRIu32 " updates\n",
523  print_prefix, header->updates);
524 
525  const auto add_record = [&](const std::string& key,
527  auto& entry = updates[key];
528  if(entry.exists())
529  {
530  using std::swap;
531 
532  swap(rec, entry);
533 
534  past_updates[key].emplace_back(std::move(rec));
535  }
536  else
537  {
538  entry = std::move(rec);
539  }
540  };
541 
542  // iterate over the updates
543  for(uint32_t i = 0; i < header->updates; ++i)
544  {
545  // read converts everything into host format from the update stream
546  update = record.read(update, key, buffer_remaining);
547 
548  if(buffer_remaining < 0)
549  {
551  "%s:"
552  " unable to process message. Buffer remaining is negative."
553  " Server is likely being targeted by custom KaRL tools.\n",
554  print_prefix);
555 
556  // we do not delete the header as this will be cleaned up later
557  break;
558  }
559  else
560  {
562  "%s:"
563  " Applying receive filter to %s (clk %i, qual %i) = %s\n",
564  print_prefix, key.c_str(), record.clock, record.quality,
565  record.to_string().c_str());
566 
567  record = settings.filter_receive(record, key, transport_context);
568 
569  if(record.exists())
570  {
572  "%s:"
573  " Filter results for %s were %s\n",
574  print_prefix, key.c_str(), record.to_string().c_str());
575 
576  add_record(key, record);
577  }
578  else
579  {
581  "%s:"
582  " Filter resulted in dropping %s\n",
583  print_prefix, key.c_str());
584  }
585  }
586  }
587 
588  const knowledge::KnowledgeMap& additionals = transport_context.get_records();
589 
590  if(additionals.size() > 0)
591  {
593  "%s:"
594  " %lld additional records being handled after receive.\n",
595  print_prefix, (long long)additionals.size());
596 
597  for(knowledge::KnowledgeMap::const_iterator i = additionals.begin();
598  i != additionals.end(); ++i)
599  {
600  add_record(i->first, i->second);
601  }
602 
603  transport_context.clear_records();
604 
605  if(header->ttl < 2)
606  header->ttl = 2;
607 
608  // modify originator to indicate we are the originator of modifications
609  utility::strncpy_safe(header->originator, id.c_str(), sizeof(header->originator));
610  }
611 
612  // apply aggregate receive filters
613  if(settings.get_number_of_receive_aggregate_filters() > 0 &&
614  (updates.size() > 0 || header->type == transport::REGISTER))
615  {
617  "%s:"
618  " Applying aggregate receive filters.\n",
619  print_prefix);
620 
621  settings.filter_receive(updates, transport_context);
622  }
623  else
624  {
626  "%s:"
627  " No aggregate receive filters were applied...\n",
628  print_prefix);
629  }
630 
632  "%s:"
633  " Locking the context to apply updates.\n",
634  print_prefix);
635 
636  {
637  knowledge::ContextGuard guard(context);
638 
640  "%s:"
641  " Applying updates to context.\n",
642  print_prefix);
643 
644  uint64_t now = utility::get_time();
645  // apply updates from the update list
646  for(knowledge::KnowledgeMap::iterator i = updates.begin();
647  i != updates.end(); ++i)
648  {
649  const auto apply = [&](knowledge::KnowledgeRecord& record) {
650  int result = 0;
651 
652  record.set_toi(now);
653  result = record.apply(
654  context, i->first, header->quality, header->clock, false);
655  ++actual_updates;
656 
657  if(result != 1)
658  {
660  "%s:"
661  " update %s=%s was rejected\n",
662  print_prefix, key.c_str(), record.to_string().c_str());
663  }
664  else
665  {
667  "%s:"
668  " update %s=%s was accepted\n",
669  print_prefix, key.c_str(), record.to_string().c_str());
670  }
671  };
672 
673  auto iter = past_updates.find(i->first);
674  if(iter != past_updates.end())
675  {
676  for(auto& cur : iter->second)
677  {
678  if(cur.exists())
679  {
680  apply(cur);
681  }
682  }
683  }
684 
685  apply(i->second);
686  }
687  }
688 
689  context.set_changed();
690 
691  if(!dropped)
692  {
694 
696  "%s:"
697  " Applying rebroadcast filters to receive results.\n",
698  print_prefix);
699 
700  // create a list of rebroadcast records from the updates
701  for(knowledge::KnowledgeMap::iterator i = updates.begin();
702  i != updates.end(); ++i)
703  {
704  i->second =
705  settings.filter_rebroadcast(i->second, i->first, transport_context);
706 
707  if(i->second.exists())
708  {
709  if(i->second.to_string() != "")
710  {
712  "%s:"
713  " Filter results for key %s were %s\n",
714  print_prefix, i->first.c_str(), i->second.to_string().c_str());
715  }
716  rebroadcast_records[i->first] = i->second;
717  }
718  else
719  {
721  "%s:"
722  " Filter resulted in dropping %s\n",
723  print_prefix, i->first.c_str());
724  }
725  }
726 
727  const knowledge::KnowledgeMap& additionals =
728  transport_context.get_records();
729 
730  for(knowledge::KnowledgeMap::const_iterator i = additionals.begin();
731  i != additionals.end(); ++i)
732  {
733  rebroadcast_records[i->first] = i->second;
734  }
735 
737  "%s:"
738  " Applying aggregate rebroadcast filters to %d records.\n",
739  print_prefix, rebroadcast_records.size());
740 
741  // apply aggregate filters to the rebroadcast records
743  rebroadcast_records.size() > 0)
744  {
745  settings.filter_rebroadcast(rebroadcast_records, transport_context);
746  }
747  else
748  {
750  "%s:"
751  " No aggregate rebroadcast filters were applied...\n",
752  print_prefix);
753  }
754 
756  "%s:"
757  " Returning to caller with %d rebroadcast records.\n",
758  print_prefix, rebroadcast_records.size());
759  }
760  else
761  {
763  "%s:"
764  " Rebroadcast packet was dropped...\n",
765  print_prefix);
766  }
767 
768  // before we send to others, we first execute rules
769  if(settings.on_data_received_logic.length() != 0)
770  {
771 #ifndef _MADARA_NO_KARL_
773  "%s:"
774  " evaluating rules in %s\n",
775  print_prefix, settings.on_data_received_logic.c_str());
776 
777  context.evaluate(on_data_received);
778 #endif // _MADARA_NO_KARL_
779  }
780  else
781  {
783  "%s:"
784  " no permanent rules were set\n",
785  print_prefix);
786  }
787 
788  return actual_updates;
789 }
790 
792  int64_t& buffer_remaining, const QoSTransportSettings& settings,
793  const char* print_prefix, MessageHeader* header,
794  const knowledge::KnowledgeMap& records, PacketScheduler& packet_scheduler)
795 {
796  int result = 0;
797 
798  if(header->ttl > 0 && records.size() > 0 && packet_scheduler.add())
799  {
800  // keep track of the message_size portion of buffer
801  uint64_t* message_size = (uint64_t*)buffer;
802  int max_buffer_size = (int)buffer_remaining;
803 
804  // the number of updates will be the size of the records map
805  header->updates = uint32_t(records.size());
806 
807  // set the update to the end of the header
808  char* update = header->write(buffer, buffer_remaining);
809 
810  for(knowledge::KnowledgeMap::const_iterator i = records.begin();
811  i != records.end(); ++i)
812  {
813  update = i->second.write(update, i->first, buffer_remaining);
814  }
815 
816  if(buffer_remaining > 0)
817  {
818  int size = (int)(settings.queue_length - buffer_remaining);
819  *message_size = utility::endian_swap((uint64_t)size);
820 
822  "%s:"
823  " %" PRIu64 " bytes prepped for rebroadcast packet\n",
824  print_prefix, size);
825 
826  result = size;
827 
829  "%s:"
830  " calling encode filters\n",
831  print_prefix);
832 
833  settings.filter_encode(buffer, result, max_buffer_size);
834  }
835  else
836  {
838  "%s:"
839  " Not enough buffer for rebroadcasting packet\n",
840  print_prefix);
841 
842  result = -2;
843  }
844  }
845  else
846  {
848  "%s:"
849  " No rebroadcast necessary.\n",
850  print_prefix);
851 
852  result = -1;
853  }
854 
855  packet_scheduler.print_status(logger::LOG_DETAILED, print_prefix);
856 
857  return result;
858 }
859 
860 long Base::prep_send(const knowledge::KnowledgeMap& orig_updates,
861  const char* print_prefix)
862 {
863  // check to see if we are shutting down
864  long ret = this->check_transport();
865  if(-1 == ret)
866  {
868  "%s: transport has been told to shutdown", print_prefix);
869 
870  return ret;
871  }
872  else if(-2 == ret)
873  {
875  "%s: transport is not valid", print_prefix);
876 
877  return ret;
878  }
879 
880  // get the maximum quality from the updates
881  uint32_t quality = knowledge::max_quality(orig_updates);
882  uint64_t latest_toi = 0;
883  bool reduced = false;
884 
885  knowledge::KnowledgeMap filtered_updates;
886 
888  "%s:"
889  " Applying filters to %zu updates before sending...\n",
890  print_prefix, orig_updates.size());
891 
896 
897  bool dropped = false;
898 
900  {
901  dropped = true;
903  "%s:"
904  " Send monitor has detected violation of bandwidth limit."
905  " Dropping packet...\n",
906  print_prefix);
907  }
910  {
911  dropped = true;
913  "%s:"
914  " Receive monitor has detected violation of bandwidth limit."
915  " Dropping packet...\n",
916  print_prefix);
917  }
918 
919  if(!dropped && packet_scheduler_.add())
920  {
922  {
927  for(auto e : orig_updates)
928  {
930  "%s:"
931  " Calling filter chain of %s.\n",
932  print_prefix, e.first.c_str());
933 
934  const auto record = e.second;
935 
936  if(record.toi() > latest_toi)
937  {
938  latest_toi = record.toi();
939  }
940 
941  // filter the record according to the send filter chain
943  settings_.filter_send(record, e.first, transport_context);
944 
946  "%s:"
947  " Filter returned for %s.\n",
948  print_prefix, e.first.c_str());
949 
950  // allow updates of 0. Exists probably isn't right here.
951  if(result.exists() || !record.exists())
952  {
954  "%s:"
955  " Adding record to update list.\n",
956  print_prefix);
957 
958  filtered_updates.emplace(std::make_pair(e.first, result));
959  }
960  else
961  {
963  "%s:"
964  " Filter removed record from update list.\n",
965  print_prefix);
966  }
967  }
968 
970  "%s:"
971  " Through individual record filters. Proceeding to add update "
972  "list.\n",
973  print_prefix);
974 
975  const knowledge::KnowledgeMap& additionals =
976  transport_context.get_records();
977 
978  for(knowledge::KnowledgeMap::const_iterator i = additionals.begin();
979  i != additionals.end(); ++i)
980  {
982  "%s:"
983  " Filter added a record %s to the update list.\n",
984  print_prefix, i->first.c_str());
985  filtered_updates.emplace(std::make_pair(i->first, i->second));
986  }
987  }
988  else
989  {
990  for(auto e : orig_updates)
991  {
992  const auto record = e.second;
993 
994  if(record.toi() > latest_toi)
995  {
996  latest_toi = record.toi();
997  }
998 
1000  "%s:"
1001  " Adding record %s to update list.\n",
1002  print_prefix, e.first.c_str());
1003 
1004  filtered_updates.emplace(std::make_pair(e.first, record));
1005 
1006  // Youtube tutorial is currently throwing this. Need to check GAMS
1007  // else
1008  // {
1009  // std::stringstream message;
1010  // message << print_prefix;
1011  // message << ": record " << e.first << " produced a null record ";
1012  // message << "from get_record_unsafe ()\n";
1013 
1014  // madara_logger_log(
1015  // context_.get_logger(), logger::LOG_ERROR, message.str().c_str());
1016 
1017  // throw exceptions::MemoryException(message.str());
1018  // }
1019  }
1020  }
1021  }
1022  else
1023  {
1025  "%s:"
1026  " Packet scheduler has dropped packet...\n",
1027  print_prefix);
1028 
1029  return 0;
1030  }
1031 
1033  "%s:"
1034  " Applying %d aggregate update send filters to %d updates...\n",
1035  print_prefix, (int)settings_.get_number_of_send_aggregate_filters(),
1036  (int)filtered_updates.size());
1037 
1038  // apply the aggregate filters
1040  filtered_updates.size() > 0)
1041  {
1042  settings_.filter_send(filtered_updates, transport_context);
1043  }
1044  else
1045  {
1047  "%s:"
1048  " No aggregate send filters were applied...\n",
1049  print_prefix);
1050  }
1051 
1053 
1055  "%s:"
1056  " Finished applying filters before sending...\n",
1057  print_prefix);
1058 
1059  if(filtered_updates.size() == 0)
1060  {
1062  "%s:"
1063  " Filters removed all data. Nothing to send.\n",
1064  print_prefix);
1065 
1066  return 0;
1067  }
1068 
1069  // allocate a buffer to send
1070  char* buffer = buffer_.get_ptr();
1071  int64_t buffer_remaining = settings_.queue_length;
1072 
1073  if(buffer == 0)
1074  {
1076  "%s:"
1077  " Unable to allocate buffer of size %" PRIu32 ". Exiting thread.\n",
1078  print_prefix, settings_.queue_length);
1079 
1080  return -3;
1081  }
1082 
1083  // set the header to the beginning of the buffer
1084  MessageHeader* header = 0;
1085 
1087  {
1089  "%s:"
1090  " Preparing message with reduced message header.\n",
1091  print_prefix);
1092 
1093  header = new ReducedMessageHeader();
1094  reduced = true;
1095  }
1096  else
1097  {
1099  "%s:"
1100  " Preparing message with normal message header.\n",
1101  print_prefix);
1102 
1103  header = new MessageHeader();
1104  }
1105 
1106  // get the clock
1107  header->clock = context_.get_clock();
1108 
1109  if(!reduced)
1110  {
1111  // copy the domain from settings
1112  utility::strncpy_safe(header->domain, this->settings_.write_domain.c_str(),
1113  sizeof(header->domain));
1114 
1115  // get the quality of the key
1116  header->quality = quality;
1117 
1118  // copy the message originator (our id)
1119  utility::strncpy_safe(header->originator, id_.c_str(), sizeof(header->originator));
1120 
1121  // send data is generally an assign type. However, MessageHeader is
1122  // flexible enough to support both, and this will simply our read thread
1123  // handling
1124  header->type = MULTIASSIGN;
1125  }
1126 
1127  // set the time-to-live
1128  header->ttl = settings_.get_rebroadcast_ttl();
1129 
1130  header->updates = uint32_t(filtered_updates.size());
1131 
1132  // compute size of this header
1133  header->size = header->encoded_size();
1134 
1135  // keep track of the maximum buffer size for encoding
1136  int max_buffer_size = (int)buffer_remaining;
1137 
1138  // set the update to the end of the header
1139  char* update = header->write(buffer, buffer_remaining);
1140  uint64_t* message_size = (uint64_t*)buffer;
1141  uint32_t* message_updates = (uint32_t*)(buffer + 116);
1142 
1143  // Message header format
1144  // [size|id|domain|originator|type|updates|quality|clock|list of updates]
1145 
1159  // zero out the memory
1160  // memset(buffer, 0, MAX_PACKET_SIZE);
1161 
1162  // Message update format
1163  // [key|value]
1164 
1165  int j = 0;
1166  uint32_t actual_updates = 0;
1167  for(knowledge::KnowledgeMap::const_iterator i = filtered_updates.begin();
1168  i != filtered_updates.end(); ++i)
1169  {
1170  const auto& key = i->first;
1171  const auto& rec = i->second;
1172  const auto do_write = [&](const knowledge::KnowledgeRecord& rec) {
1173  if(!rec.exists())
1174  {
1176  "%s:"
1177  " update[%d] => value is empty\n",
1178  print_prefix, j, key.c_str());
1179  return;
1180  }
1181 
1182  update = rec.write(update, key, buffer_remaining);
1183 
1184  if(buffer_remaining > 0)
1185  {
1187  "%s:"
1188  " update[%d] => encoding %s of type %" PRId32 " and size %" PRIu32
1189  " @%" PRIu64 "\n",
1190  print_prefix, j, key.c_str(), rec.type(), rec.size(), rec.toi());
1191  ++actual_updates;
1192  ++j;
1193  }
1194  else
1195  {
1197  "%s:"
1198  " unable to encode update[%d] => %s of type %" PRId32
1199  " and size %" PRIu32 "\n",
1200  print_prefix, j, key.c_str(), rec.type(), rec.size());
1201  }
1202  };
1203 
1204  if(!settings_.send_history || !rec.has_history())
1205  {
1206  do_write(rec);
1207  }
1208  else
1209  {
1210  auto buf = rec.share_circular_buffer();
1211  auto end = buf->end();
1212  auto cur = buf->begin();
1213 
1214  if(last_toi_sent_ > 0)
1215  {
1216  cur = std::upper_bound(cur, end, last_toi_sent_,
1217  [](uint64_t lhs, const knowledge::KnowledgeRecord& rhs) {
1218  return lhs < rhs.toi();
1219  });
1220  }
1221  for(; cur != end; ++cur)
1222  {
1223  do_write(*cur);
1224  }
1225  }
1226  }
1227 
1228  long size(0);
1229 
1230  if(buffer_remaining > 0)
1231  {
1232  size = (long)(settings_.queue_length - buffer_remaining);
1233  header->size = size;
1234  *message_size = utility::endian_swap((uint64_t)size);
1235  header->updates = actual_updates;
1236  *message_updates = utility::endian_swap(actual_updates);
1237 
1238  // before we send to others, we first execute rules
1239  if(settings_.on_data_received_logic.length() != 0)
1240  {
1241 #ifndef _MADARA_NO_KARL_
1242 
1244  "%s:"
1245  " evaluating rules in %s\n",
1246  print_prefix, settings_.on_data_received_logic.c_str());
1247 
1249 
1251  "%s:"
1252  " rules have been successfully evaluated\n",
1253  print_prefix);
1254 
1255 #endif // _MADARA_NO_KARL_
1256  }
1257  else
1258  {
1260  "%s:"
1261  " no permanent rules were set\n",
1262  print_prefix);
1263  }
1264  }
1265 
1267  "%s:"
1268  " calling encode filters\n",
1269  print_prefix);
1270 
1271  // buffer is ready encoding
1272  size = (long)settings_.filter_encode(
1273  buffer_.get_ptr(), (int)size, max_buffer_size);
1274 
1276  "%s:"
1277  " header info before encode: %s\n",
1278  print_prefix, header->to_string().c_str());
1279 
1280  delete header;
1281 
1282  last_toi_sent_ = latest_toi;
1283 
1284  return size;
1285 }
1286 }
1287 }
#define madara_logger_log(loggering, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
#define MADARA_IDENTIFIER_LENGTH
Definition: MessageHeader.h:21
const ThreadSafeContext * context_
madara::knowledge::KnowledgeRecord evaluate(const madara::knowledge::KnowledgeUpdateSettings &settings=knowledge::KnowledgeUpdateSettings())
Evaluates the expression tree.
Parses incoming expression strings into a parse tree and generates an expression tree from the parse ...
Definition: Interpreter.h:43
ExpressionTree interpret(madara::knowledge::ThreadSafeContext &context, const std::string &input)
Compiles an expression into an expression tree.
Compiled, optimized KaRL logic.
A thread-safe guard for a context or knowledge base.
Definition: ContextGuard.h:24
This class encapsulates an entry in a KnowledgeBase.
std::string to_string(const std::string &delimiter=", ") const
converts the value to a string.
uint64_t clock
last modification lamport clock time
int apply(madara::knowledge::ThreadSafeContext &context, const std::string &key, unsigned int quality, uint64_t clock, bool perform_lock)
Apply the knowledge record to a context, given some quality and clock.
bool exists(void) const
Checks if record exists (i.e., is not uncreated)
uint32_t quality
priority of the update
const char * read(const char *buffer, int64_t &buffer_remaining)
Reads a KnowledgeRecord instance from a buffer and updates the amount of buffer room remaining.
This class stores variables and their values for use by any entity needing state information in a thr...
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
knowledge::KnowledgeRecord evaluate(CompiledExpression expression, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Evaluate a compiled expression.
void set_changed(void)
Force a change to be registered, waking up anyone waiting on entry.
uint64_t get_clock(void) const
Atomically gets the Lamport clock.
int get_level(void)
Gets the maximum logging detail level.
Definition: Logger.inl:99
Provides monitoring capability of a transport's bandwidth.
uint64_t get_bytes_per_second(void)
Queries the monitor for the current bandwidth utilization per second over the past window.
bool is_bandwidth_violated(int64_t limit)
Checks send and receive bandwidth against send and receive limits.
void add(uint64_t size)
Adds a message to the monitor.
int validate_transport(void)
Validates a transport to indicate it is not shutting down.
Definition: Transport.inl:6
madara::utility::ScopedArray< char > buffer_
buffer for sending
Definition: Transport.h:153
QoSTransportSettings settings_
Definition: Transport.h:132
madara::knowledge::ThreadSafeContext & context_
Definition: Transport.h:135
int check_transport(void)
all subclasses should call this method at the beginning of send_data
Definition: Transport.inl:17
uint64_t last_toi_sent_
Latest TOI the previous send operation included.
Definition: Transport.h:156
const std::string id_
host:port identifier of this process
Definition: Transport.h:130
virtual int setup(void)
all subclasses should call this method at the end of its setup
Definition: Transport.cpp:32
BandwidthMonitor send_monitor_
monitor for sending bandwidth usage
Definition: Transport.h:144
PacketScheduler packet_scheduler_
scheduler for dropping packets to simulate network issues
Definition: Transport.h:150
Base(const std::string &id, TransportSettings &new_settings, knowledge::ThreadSafeContext &context)
Constructor.
Definition: Transport.cpp:13
long prep_send(const knowledge::KnowledgeMap &orig_updates, const char *print_prefix)
Preps a message for sending.
Definition: Transport.cpp:860
BandwidthMonitor receive_monitor_
monitor for receiving bandwidth usage
Definition: Transport.h:147
virtual ~Base()=0
Destructor.
Definition: Transport.cpp:30
virtual void close(void)
Closes this transport.
Definition: Transport.cpp:104
void invalidate_transport(void)
Invalidates a transport to indicate it is shutting down.
Definition: Transport.inl:31
madara::expression::ExpressionTree on_data_received_
data received rules, defined in Transport settings
Definition: Transport.h:139
Defines a fragmentation header which allows for multi-part messages that are only applied once all fr...
Definition: Fragmentation.h:51
static uint32_t static_encoded_size(void)
Returns the size of the encoded MessageHeader class, which may be different from sizeof (MessageHeade...
static bool fragment_message_header_test(const char *buffer)
Tests the buffer for a reduced message identifier.
Defines a robust message header which is the default for KaRL messages.
Definition: MessageHeader.h:57
uint32_t quality
the quality of the message sender
uint64_t timestamp
the timestamp of the sender when the message was generated
char domain[32]
the domain that this message is intended for
char originator[64]
the originator of the message (host:port)
virtual const char * read(const char *buffer, int64_t &buffer_remaining)
Reads a MessageHeader instance from a buffer and updates the amount of buffer room remaining.
uint32_t updates
the number of knowledge variable updates in the message
uint32_t type
the type of message
static bool message_header_test(const char *buffer)
Tests the buffer for a normal message identifier.
uint64_t clock
the clock of the sender when the message was generated
uint64_t size
the size of this header plus the updates
virtual std::string to_string(void)
Converts the relevant fields to a printable string.
static uint32_t static_encoded_size(void)
Returns the size of the encoded MessageHeader class.
virtual uint32_t encoded_size(void) const
Returns the size of the encoded MessageHeader class, which may be different from sizeof (MessageHeade...
unsigned char ttl
time to live (number of rebroadcasts to perform after original send
virtual char * write(char *buffer, int64_t &buffer_remaining)
Writes a MessageHeader instance to a buffer and updates the amount of buffer room remaining.
Provides scheduler for dropping packets.
void print_status(unsigned int log_level=0, const char *prefix="PacketScheduler")
Prints the number of status of the packet scheduler.
bool add(void)
Adds a message to the monitor.
void attach(const QoSTransportSettings *settings)
Attaches settings.
Container for quality-of-service settings.
size_t get_number_of_send_filtered_types(void) const
Returns the number of types that are filtered before send.
knowledge::KnowledgeRecord filter_rebroadcast(const madara::knowledge::KnowledgeRecord &input, const std::string &name, transport::TransportContext &context) const
Filters an input according to the rebroadcast filter chain.
size_t get_number_of_receive_aggregate_filters(void) const
Returns the number of aggregate filters applied after receiving @ return the number of aggregate filt...
int filter_encode(char *source, int size, int max_size) const
Calls encode on the the buffer filter chain.
knowledge::KnowledgeRecord filter_receive(const madara::knowledge::KnowledgeRecord &input, const std::string &name, transport::TransportContext &context) const
Filters an input according to the receive filter chain.
unsigned char get_participant_ttl(void) const
Returns the maximum time to live participation of this transport in rebroadcasting of other agent's m...
double get_deadline(void) const
Returns the latency deadline in seconds.
size_t get_number_of_send_aggregate_filters(void) const
Returns the number of aggregate filters applied before sending @ return the number of aggregate filte...
void attach(knowledge::ThreadSafeContext *context)
Attaches a context to the various filtering systems.
knowledge::KnowledgeRecord filter_send(const madara::knowledge::KnowledgeRecord &input, const std::string &name, transport::TransportContext &context) const
Filters an input according to send's filter chain.
unsigned char get_rebroadcast_ttl(void) const
Gets the time to live for rebroadcasting (number of rebroadcasts per message).
bool is_trusted(const std::string &peer) const
Checks if a peer is trusted.
int filter_decode(char *source, int size, int max_size) const
Calls decode on the the buffer filter chain.
int64_t get_send_bandwidth_limit(void) const
Returns the limit for sending on this transport in bytes per second.
int64_t get_total_bandwidth_limit(void) const
Returns the total limit for this transport in bytes per second.
size_t get_number_of_rebroadcast_aggregate_filters(void) const
Returns the number of aggregate filters applied before rebroadcasting @ return the number of aggregat...
Defines a simple, smaller message header of 29 bytes that supports less QoS.
static bool reduced_message_header_test(const char *buffer)
Tests the buffer for a reduced message identifier.
Provides context about the transport.
void set_operation(int64_t operation)
Sets the operation that the context is/should be performing.
void clear_records(void)
Clears records added through filtering operations.
const knowledge::KnowledgeMap & get_records(void) const
Returns the additional records stored in the context.
Holds basic transport settings.
uint32_t fragment_queue_length
Indicates queue length for holding clock-keyed fragments.
OriginatorFragmentMap fragment_map
Map of fragments received by originator.
void add_read_domain(const std::string &domain)
Adds a read domain to the list of domains to read from.
std::string on_data_received_logic
Logic to be evaluated after every successful update.
uint32_t queue_length
Length of the buffer used to store history of events.
bool send_history
if true, send all updates since last send, for records that have history enabled (if the history capa...
void get_read_domains(std::vector< std::string > &domains) const
Retrieves the list of read domains.
std::string write_domain
All class members are accessible to users for easy setup.
bool is_reading_domain(const std::string domain) const
Checks if a domain is in the domain read list.
bool send_reduced_message_header
Send a reduced message header (clock, size, updates, KaRL id)
size_t num_read_domains(void) const
Returns the number of read domains.
T * get_ptr(void)
get the underlying pointer
Definition: ScopedArray.inl:64
constexpr string_t string
uint32_t max_quality(const KnowledgeRecords &records)
Returns the maximum quality within the records.
::std::map< std::string, KnowledgeRecord > KnowledgeMap
MADARA_EXPORT char * add_fragment(const char *originator, uint64_t clock, uint32_t update_number, const char *fragment, uint32_t queue_length, OriginatorFragmentMap &map, uint64_t &total_size, bool clear=true)
Adds a fragment to an originator fragment map and returns the aggregate message if the message is com...
int process_received_update(const char *buffer, uint32_t bytes_read, const std::string &id, knowledge::ThreadSafeContext &context, const QoSTransportSettings &settings, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, knowledge::KnowledgeMap &rebroadcast_records, knowledge::CompiledExpression &on_data_received, const char *print_prefix, const char *remote_host, MessageHeader *&header)
Processes a received update, updates monitors, fills rebroadcast records according to settings filter...
Definition: Transport.cpp:109
int prep_rebroadcast(knowledge::ThreadSafeContext &context, char *buffer, int64_t &buffer_remaining, const QoSTransportSettings &settings, const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records, PacketScheduler &packet_scheduler)
Preps a buffer for rebroadcasting records to other agents on the network.
Definition: Transport.cpp:791
MADARA_EXPORT bool exists(const char *originator, uint64_t clock, uint32_t update_number, OriginatorFragmentMap &map)
Checks if a fragment already exists within a fragment map.
uint64_t endian_swap(uint64_t value)
Converts a host format uint64_t into big endian.
Definition: Utility.inl:134
int64_t get_time(void)
Returns a time of day in nanoseconds If simtime feature is enabled, this may be simulation time inste...
Definition: Utility.inl:265
MADARA_EXPORT void strncpy_safe(char *dst, const char *src, size_t dst_size)
Performs a strncpy in a way that will compile without warnings.
Definition: Utility.cpp:376
Copyright(c) 2020 Galois.