MADARA  3.2.3
Transport.cpp
Go to the documentation of this file.
1 #include "Transport.h"
2 
6 
7 #include <algorithm>
8 
9 namespace madara { namespace transport {
10 
11 Base::Base (const std::string & id,
12  TransportSettings & new_settings,
14  : is_valid_ (false), shutting_down_ (false), id_ (id),
15  settings_ (new_settings), context_ (context)
16 
17 #ifndef _MADARA_NO_KARL_
18  , on_data_received_ (context.get_logger ())
19 #endif // _MADARA_NO_KARL_
20 {
23 }
24 
26 {
27 }
28 
29 int
30 Base::setup (void)
31 {
32  // check for an on_data_received ruleset
33  if (settings_.on_data_received_logic.length () != 0)
34  {
36  "transport::Base::setup" \
37  " setting rules to %s\n",
39 
40 #ifndef _MADARA_NO_KARL_
41  expression::Interpreter interpreter;
42  on_data_received_ = interpreter.interpret (context_,
44 #endif // _MADARA_NO_KARL_
45  }
46  else
47  {
49  "transport::Base::setup" \
50  " no permanent rules were set\n");
51  }
52 
53  // setup the send buffer
54  if (settings_.queue_length > 0)
55  buffer_ = new char [settings_.queue_length];
56 
57  // if read domains has not been set, then set to write domain
58  if (settings_.num_read_domains () == 0)
59  {
61  "transport::Base::setup" \
62  " no read domains set. Adding write domain (%s)\n",
63  settings_.write_domain.c_str ());
64 
66  }
67  else
68  {
70  "transport::Base::setup" \
71  " settings configured with %d read domains\n",
73  }
74 
75  if (settings_.num_read_domains () > 0 &&
77  {
78  std::vector <std::string> domains;
79  settings_.get_read_domains (domains);
80 
81  std::stringstream buffer;
82 
83  for (unsigned int i = 0; i < domains.size (); ++i)
84  {
85  buffer << domains[i];
86 
87  if (i != domains.size () - 1)
88  {
89  buffer << ", ";
90  }
91  }
92 
94  "transport::Base::setup" \
95  " Write domain: %s. Read domains: %s\n",
96  settings_.write_domain.c_str (), buffer.str ().c_str ());
97  }
98 
99  return validate_transport ();
100 }
101 
102 void
104 {
106 }
107 
108 int
110  const char * buffer,
111  uint32_t bytes_read,
112  const std::string & id,
115  BandwidthMonitor & send_monitor,
116  BandwidthMonitor & receive_monitor,
117  knowledge::KnowledgeMap & rebroadcast_records,
118 #ifndef _MADARA_NO_KARL_
119  knowledge::CompiledExpression & on_data_received,
120 #endif // _MADARA_NO_KARL_
121 
122  const char * print_prefix,
123  const char * remote_host,
124  MessageHeader *& header)
125 {
126  // reset header to 0, so it is safe to delete
127  header = 0;
128 
129  int max_buffer_size = (int)bytes_read;
130 
131  // tell the receive bandwidth monitor about the transaction
132  receive_monitor.add (bytes_read);
133 
135  "%s:" \
136  " Receive bandwidth = %" PRIu64 " B/s\n",
137  print_prefix,
138  receive_monitor.get_bytes_per_second ());
139 
140  bool is_reduced = false;
141  bool is_fragment = false;
142 
144  "%s:" \
145  " calling decode filters on %" PRIu32 " bytes\n",
146  print_prefix, bytes_read);
147 
148  // call decodes, if applicable
149  bytes_read = (uint32_t)settings.filter_decode ((unsigned char *)buffer,
150  max_buffer_size, max_buffer_size);
151 
153  "%s:" \
154  " Decoding resulted in %" PRIu32 " final bytes\n",
155  print_prefix, bytes_read);
156 
157  // setup buffer remaining, used by the knowledge record read method
158  int64_t buffer_remaining = (int64_t)bytes_read;
159 
160  // clear the rebroadcast records
161  rebroadcast_records.clear ();
162 
163  // receive records will be what we pass to the aggregate filter
164  knowledge::KnowledgeMap updates;
165 
166  // check the buffer for a reduced message header
167  if (bytes_read >= ReducedMessageHeader::static_encoded_size () &&
169  {
171  "%s:" \
172  " processing reduced KaRL message from %s\n",
173  print_prefix,
174  remote_host);
175 
176  header = new ReducedMessageHeader ();
177  is_reduced = true;
178  }
179  else if (bytes_read >= MessageHeader::static_encoded_size () &&
181  {
183  "%s:" \
184  " processing KaRL message from %s\n",
185  print_prefix,
186  remote_host);
187 
188  header = new MessageHeader ();
189  }
190  else if (bytes_read >= FragmentMessageHeader::static_encoded_size () &&
192  {
194  "%s:" \
195  " processing KaRL fragment message from %s\n",
196  print_prefix,
197  remote_host);
198 
199  header = new FragmentMessageHeader ();
200  is_fragment = true;
201  }
202  else if (bytes_read >= 8 + MADARA_IDENTIFIER_LENGTH)
203  {
204  // get the text that appears as identifier.
205  char identifier[MADARA_IDENTIFIER_LENGTH];
206  strncpy (identifier, buffer + 8, MADARA_IDENTIFIER_LENGTH);
207  identifier[7] = 0;
208 
210  "%s:" \
211  " dropping non-KaRL message with id %s from %s\n",
212  print_prefix,
213  identifier,
214  remote_host);
215 
216  return -1;
217  }
218  else
219  {
221  "%s:" \
222  " dropping too short message from %s (length %i)\n",
223  print_prefix,
224  remote_host,
225  bytes_read);
226 
227  return -1;
228  }
229 
230  const char * update = header->read (buffer, buffer_remaining);
231 
233  "%s:" \
234  " header info: %s\n",
235  print_prefix, header->to_string ().c_str ());
236 
237  if (header->size < bytes_read)
238  {
240  "%s:" \
241  " Message header.size (%" PRIu64 " bytes) is less than actual"
242  " bytes read (%" PRIu32 " bytes). Dropping message.\n",
243  print_prefix, header->size, bytes_read);
244 
245  return -1;
246  }
247 
248  if (is_fragment &&
249  exists (header->originator, header->clock,
250  ((FragmentMessageHeader *)header)->update_number, settings.fragment_map))
251  {
253  "%s:" \
254  " Fragment already exists in fragment map. Dropping.\n",
255  print_prefix);
256 
257  return -1;
258  }
259 
260  if (!is_reduced)
261  {
262  // reject the message if it is us as the originator (no update necessary)
263  if (id == header->originator)
264  {
266  "%s:" \
267  " dropping message from ourself\n",
268  print_prefix);
269 
270  return -2;
271  }
272  else
273  {
275  "%s:" \
276  " remote id (%s) is not our own\n",
277  print_prefix,
278  remote_host);
279  }
280 
281  if (settings.is_trusted (remote_host))
282  {
284  "%s: remote id (%s) is trusted\n",
285  print_prefix,
286  remote_host);
287  }
288  else
289  {
291  "%s:" \
292  " dropping message from untrusted peer (%s\n",
293  print_prefix,
294  remote_host);
295 
296  // delete the header and continue to the svc loop
297  return -3;
298  }
299 
300  std::string originator (header->originator);
301 
302  if (settings.is_trusted (originator))
303  {
305  "%s:" \
306  " originator (%s) is trusted\n",
307  print_prefix,
308  originator.c_str ());
309  }
310  else
311  {
313  "%s:" \
314  " dropping message from untrusted originator (%s)\n",
315  print_prefix,
316  originator.c_str ());
317 
318  return -4;
319  }
320 
321  // reject the message if it is from a different domain
322  if (!settings.is_reading_domain (header->domain))
323  {
325  "%s:" \
326  " remote id (%s) has an untrusted domain (%s). Dropping message.\n",
327  print_prefix,
328  remote_host,
329  header->domain);
330 
331  // delete the header and continue to the svc loop
332  return -5;
333  }
334  else
335  {
337  "%s:" \
338  " remote id (%s) message is in our domain\n",
339  print_prefix,
340  remote_host);
341  }
342  }
343 
344  // fragments are special cases
345  if (is_fragment)
346  {
347  // grab the fragment header
348  FragmentMessageHeader * frag_header =
349  dynamic_cast <FragmentMessageHeader *> (header);
350 
352  "%s:" \
353  " Processing fragment %" PRIu32 " of %s:%" PRIu64 ".\n",
354  print_prefix, frag_header->update_number,
355  frag_header->originator, frag_header->clock);
356 
357  // add the fragment and attempt to defrag the message
358  char * message = transport::add_fragment (
359  frag_header->originator, frag_header->clock,
360  frag_header->update_number, buffer, settings.fragment_queue_length,
361  settings.fragment_map, true);
362 
363  // if we have no return message, we may have previously defragged it
364  if (!message)
365  {
366  return 0;
367  }
368  else
369  {
371  "%s:" \
372  " Message has been pieced together from fragments. Processing...\n",
373  print_prefix);
374 
380  buffer_remaining = (int64_t)frag_header->get_size (message);
381  if (buffer_remaining <= settings.queue_length)
382  {
383  char * buffer_override = (char *)buffer;
384  memcpy (buffer_override, message, frag_header->get_size (message));
385 
386  // check the buffer for a reduced message header
388  {
390  "%s:" \
391  " processing reduced KaRL message from %s\n",
392  print_prefix,
393  remote_host);
394 
395  header = new ReducedMessageHeader ();
396  is_reduced = true;
397  update = header->read (buffer, buffer_remaining);
398  }
399  else if (MessageHeader::message_header_test (buffer))
400  {
402  "%s:" \
403  " processing KaRL message from %s\n",
404  print_prefix,
405  remote_host);
406 
407  header = new MessageHeader ();
408  update = header->read (buffer, buffer_remaining);
409  }
410 
411  delete [] message;
412  }
413  }
414  }
415 
416  int actual_updates = 0;
417  uint64_t current_time = utility::get_time ();
418  double deadline = settings.get_deadline ();
419  TransportContext transport_context (
421  receive_monitor.get_bytes_per_second (),
422  send_monitor.get_bytes_per_second (),
423  header->timestamp, current_time,
424  header->domain, header->originator,
425  remote_host);
426 
427  uint64_t latency (0);
428 
429  if (deadline >= 1.0)
430  {
431  if (header->timestamp < current_time)
432  {
433  latency = current_time - header->timestamp;
434 
435  if (latency > deadline)
436  {
438  "%s:" \
439  " deadline violation (latency is %" PRIu64 ", deadline is %f).\n",
440  print_prefix,
441  latency, deadline);
442 
443  return -6;
444  }
445  }
446  else
447  {
449  "%s:" \
450  " Cannot compute message latency." \
451  " Message header timestamp is in the future." \
452  " message.timestamp = %" PRIu64 ", cur_timestamp = %" PRIu64 ".\n",
453  print_prefix,
454  header->timestamp, current_time);
455  }
456  }
457 
459  "%s:" \
460  " iterating over the %" PRIu32 " updates\n",
461  print_prefix,
462  header->updates);
463 
464  // temporary record for reading from the updates buffer
466  record.quality = header->quality;
467  record.clock = header->clock;
468  std::string key;
469 
470  bool dropped = false;
471 
472  if (send_monitor.is_bandwidth_violated (
473  settings.get_send_bandwidth_limit ()))
474  {
475  dropped = true;
477  "%s:" \
478  " Send monitor has detected violation of bandwidth limit." \
479  " Dropping packet from rebroadcast list\n", print_prefix);
480  }
481  else if (receive_monitor.is_bandwidth_violated (
482  settings.get_total_bandwidth_limit ()))
483  {
484  dropped = true;
486  "%s:" \
487  " Receive monitor has detected violation of bandwidth limit." \
488  " Dropping packet from rebroadcast list...\n", print_prefix);
489  }
490  else if (settings.get_participant_ttl () < header->ttl)
491  {
492  dropped = true;
494  "%s:" \
495  " Transport participant TTL is lower than header ttl." \
496  " Dropping packet from rebroadcast list...\n", print_prefix);
497  }
498 
500  "%s:" \
501  " Applying %" PRIu32 " updates\n", print_prefix, header->updates);
502 
503  // iterate over the updates
504  for (uint32_t i = 0; i < header->updates; ++i)
505  {
506  // read converts everything into host format from the update stream
507  update = record.read (update, key, buffer_remaining);
508 
509  if (buffer_remaining < 0)
510  {
512  "%s:" \
513  " unable to process message. Buffer remaining is negative." \
514  " Server is likely being targeted by custom KaRL tools.\n",
515  print_prefix);
516 
517  // we do not delete the header as this will be cleaned up later
518  break;
519  }
520  else
521  {
523  "%s:" \
524  " Applying receive filter to %s\n", print_prefix, key.c_str ());
525 
526  record = settings.filter_receive (record, key, transport_context);
527 
528  if (record.exists ())
529  {
531  "%s:" \
532  " Filter results for %s were %s\n", print_prefix,
533  key.c_str (), record.to_string ().c_str ());
534 
535  updates[key] = record;
536  }
537  else
538  {
540  "%s:" \
541  " Filter resulted in dropping %s\n", print_prefix, key.c_str ());
542  }
543  }
544  }
545 
546  const knowledge::KnowledgeMap & additionals = transport_context.get_records ();
547 
548  if (additionals.size () > 0)
549  {
551  "%s:" \
552  " %lld additional records being handled after receive.\n", print_prefix,
553  (long long)additionals.size ());
554 
555  for (knowledge::KnowledgeMap::const_iterator i = additionals.begin ();
556  i != additionals.end (); ++i)
557  {
558  updates[i->first] = i->second;
559  }
560 
561  transport_context.clear_records ();
562 
563  if (header->ttl < 2)
564  header->ttl = 2;
565 
566  // modify originator to indicate we are the originator of modifications
567  strncpy (header->originator, id.c_str (),
568  sizeof (header->originator) - 1);
569 
570  }
571 
572  // apply aggregate receive filters
573  if (settings.get_number_of_receive_aggregate_filters () > 0
574  && (updates.size () > 0 || header->type == transport::REGISTER))
575  {
577  "%s:" \
578  " Applying aggregate receive filters.\n", print_prefix);
579 
580 
581  settings.filter_receive (updates, transport_context);
582  }
583  else
584  {
586  "%s:" \
587  " No aggregate receive filters were applied...\n",
588  print_prefix);
589  }
590 
592  "%s:" \
593  " Locking the context to apply updates.\n", print_prefix);
594 
595  {
596  knowledge::ContextGuard guard (context);
597 
599  "%s:" \
600  " Applying updates to context.\n", print_prefix);
601 
602  // apply updates from the update list
603  for (knowledge::KnowledgeMap::iterator i = updates.begin ();
604  i != updates.end (); ++i)
605  {
606  int result = 0;
607 
608  result = i->second.apply (context, i->first, header->quality,
609  header->clock, false);
610  ++actual_updates;
611 
612  if (result != 1)
613  {
615  "%s:" \
616  " update %s=%s was rejected\n",
617  print_prefix,
618  key.c_str (), record.to_string ().c_str ());
619  }
620  else
621  {
623  "%s:" \
624  " update %s=%s was accepted\n",
625  print_prefix,
626  key.c_str (), record.to_string ().c_str ());
627  }
628  }
629  }
630 
631  context.set_changed ();
632 
633  if (!dropped)
634  {
635  transport_context.set_operation (
637 
639  "%s:" \
640  " Applying rebroadcast filters to receive results.\n", print_prefix);
641 
642  // create a list of rebroadcast records from the updates
643  for (knowledge::KnowledgeMap::iterator i = updates.begin ();
644  i != updates.end (); ++i)
645  {
646  i->second = settings.filter_rebroadcast (
647  i->second, i->first, transport_context);
648 
649  if (i->second.exists ())
650  {
651  if (i->second.to_string () != "")
652  {
654  "%s:" \
655  " Filter results for key %s were %s\n", print_prefix,
656  i->first.c_str (), i->second.to_string ().c_str ());
657  }
658  rebroadcast_records[i->first] = i->second;
659  }
660  else
661  {
663  "%s:" \
664  " Filter resulted in dropping %s\n", print_prefix,
665  i->first.c_str ());
666  }
667  }
668 
669  const knowledge::KnowledgeMap & additionals = transport_context.get_records ();
670 
671  for (knowledge::KnowledgeMap::const_iterator i = additionals.begin ();
672  i != additionals.end (); ++i)
673  {
674  rebroadcast_records[i->first] = i->second;
675  }
676 
678  "%s:" \
679  " Applying aggregate rebroadcast filters to %d records.\n",
680  print_prefix, rebroadcast_records.size ());
681 
682  // apply aggregate filters to the rebroadcast records
684  && rebroadcast_records.size () > 0)
685  {
686  settings.filter_rebroadcast (rebroadcast_records, transport_context);
687  }
688  else
689  {
691  "%s:" \
692  " No aggregate rebroadcast filters were applied...\n",
693  print_prefix);
694  }
695 
697  "%s:" \
698  " Returning to caller with %d rebroadcast records.\n",
699  print_prefix, rebroadcast_records.size ());
700 
701  }
702  else
703  {
705  "%s:" \
706  " Rebroadcast packet was dropped...\n",
707  print_prefix);
708  }
709 
710  // before we send to others, we first execute rules
711  if (settings.on_data_received_logic.length () != 0)
712  {
713 #ifndef _MADARA_NO_KARL_
715  "%s:" \
716  " evaluating rules in %s\n",
717  print_prefix,
718  settings.on_data_received_logic.c_str ());
719 
720  context.evaluate (on_data_received);
721 #endif // _MADARA_NO_KARL_
722 
723  }
724  else
725  {
727  "%s:" \
728  " no permanent rules were set\n",
729  print_prefix);
730  }
731 
732  return actual_updates;
733 }
734 
735 
736 int
739  char * buffer,
740  int64_t & buffer_remaining,
742  const char * print_prefix,
743  MessageHeader * header,
744  const knowledge::KnowledgeMap & records,
745  PacketScheduler & packet_scheduler)
746 {
747  int result = 0;
748 
749  if (header->ttl > 0 && records.size () > 0 && packet_scheduler.add ())
750  {
751  // keep track of the message_size portion of buffer
752  uint64_t * message_size = (uint64_t *)buffer;
753  int max_buffer_size = (int)buffer_remaining;
754 
755  // the number of updates will be the size of the records map
756  header->updates = uint32_t (records.size ());
757 
758  // set the update to the end of the header
759  char * update = header->write (buffer, buffer_remaining);
760 
761  for (knowledge::KnowledgeMap::const_iterator i = records.begin ();
762  i != records.end (); ++i)
763  {
764  update = i->second.write (update, i->first, buffer_remaining);
765  }
766 
767  if (buffer_remaining > 0)
768  {
769  int size = (int)(settings.queue_length - buffer_remaining);
770  *message_size = utility::endian_swap ((uint64_t)size);
771 
773  "%s:" \
774  " %" PRIu64 " bytes prepped for rebroadcast packet\n",
775  print_prefix, size);
776 
777  result = size;
778 
780  "%s:" \
781  " calling encode filters\n",
782  print_prefix);
783 
784  settings.filter_encode ((unsigned char *)buffer,
785  result, max_buffer_size);
786  }
787  else
788  {
790  "%s:" \
791  " Not enough buffer for rebroadcasting packet\n",
792  print_prefix);
793 
794  result = -2;
795  }
796  }
797  else
798  {
800  "%s:" \
801  " No rebroadcast necessary.\n",
802  print_prefix);
803 
804  result = -1;
805  }
806 
807  packet_scheduler.print_status (logger::LOG_DETAILED, print_prefix);
808 
809  return result;
810 }
811 
813  const knowledge::VariableReferenceMap & orig_updates,
814  const char * print_prefix)
815 {
816  // check to see if we are shutting down
817  long ret = this->check_transport ();
818  if (-1 == ret)
819  {
821  "%s: transport has been told to shutdown",
822  print_prefix);
823 
824  return ret;
825  }
826  else if (-2 == ret)
827  {
829  "%s: transport is not valid",
830  print_prefix);
831 
832  return ret;
833  }
834 
835  // get the maximum quality from the updates
836  uint32_t quality = knowledge::max_quality (orig_updates);
837  bool reduced = false;
838 
839  knowledge::KnowledgeMap filtered_updates;
840 
842  "%s:" \
843  " Applying filters before sending...\n",
844  print_prefix);
845 
849  (uint64_t) utility::get_time (), (uint64_t) utility::get_time (),
851  id_);
852 
853  bool dropped = false;
854 
857  {
858  dropped = true;
860  "%s:" \
861  " Send monitor has detected violation of bandwidth limit." \
862  " Dropping packet...\n", print_prefix);
863  }
866  {
867  dropped = true;
869  "%s:" \
870  " Receive monitor has detected violation of bandwidth limit." \
871  " Dropping packet...\n", print_prefix);
872  }
873 
874  if (!dropped && packet_scheduler_.add ())
875  {
880  for (const auto &e : orig_updates)
881  {
883  "%s:" \
884  " Calling filter chain.\n", print_prefix);
885 
886  // filter the record according to the send filter chain
888  *e.second.get_record_unsafe (), e.first, transport_context);
889 
891  "%s:" \
892  " Filter returned.\n", print_prefix);
893 
894  if (result.exists ())
895  {
897  "%s:" \
898  " Adding record to update list.\n", print_prefix);
899 
900  filtered_updates[e.first] = result;
901  }
902  else
903  {
905  "%s:" \
906  " Filter removed record from update list.\n", print_prefix);
907  }
908  }
909 
910  const knowledge::KnowledgeMap & additionals = transport_context.get_records ();
911 
912  for (knowledge::KnowledgeMap::const_iterator i = additionals.begin ();
913  i != additionals.end (); ++i)
914  {
916  "%s:" \
917  " Filter added a record %s to the update list.\n",
918  print_prefix, i->first.c_str ());
919  filtered_updates[i->first] = i->second;
920  }
921  }
922  else
923  {
925  "%s:" \
926  " Packet scheduler has dropped packet...\n", print_prefix);
927 
928  return 0;
929  }
930 
932  "%s:" \
933  " Applying %d aggregate update send filters to %d updates...\n",
934  print_prefix, (int)settings_.get_number_of_send_aggregate_filters (),
935  (int)filtered_updates.size ());
936 
937  // apply the aggregate filters
939  filtered_updates.size () > 0)
940  {
941  settings_.filter_send (filtered_updates, transport_context);
942  }
943  else
944  {
946  "%s:" \
947  " No aggregate send filters were applied...\n",
948  print_prefix);
949  }
950 
952 
954  "%s:" \
955  " Finished applying filters before sending...\n",
956  print_prefix);
957 
958  if (filtered_updates.size () == 0)
959  {
961  "%s:" \
962  " Filters removed all data. Nothing to send.\n",
963  print_prefix);
964 
965  return 0;
966  }
967 
968  // allocate a buffer to send
969  char * buffer = buffer_.get_ptr ();
970  int64_t buffer_remaining = settings_.queue_length;
971 
972  if (buffer == 0)
973  {
975  "%s:" \
976  " Unable to allocate buffer of size " PRIu32 ". Exiting thread.\n",
977  print_prefix,
979 
980  return -3;
981  }
982 
983 
984  // set the header to the beginning of the buffer
985  MessageHeader * header = 0;
986 
988  {
990  "%s:" \
991  " Preparing message with reduced message header.\n",
992  print_prefix);
993 
994  header = new ReducedMessageHeader ();
995  reduced = true;
996  }
997  else
998  {
1000  "%s:" \
1001  " Preparing message with normal message header.\n",
1002  print_prefix);
1003 
1004  header = new MessageHeader ();
1005  }
1006 
1007  // get the clock
1008  header->clock = context_.get_clock ();
1009 
1010  if (!reduced)
1011  {
1012  // copy the domain from settings
1013  strncpy (header->domain, this->settings_.write_domain.c_str (),
1014  sizeof (header->domain) - 1);
1015 
1016  // get the quality of the key
1017  header->quality = quality;
1018 
1019  // copy the message originator (our id)
1020  strncpy (header->originator, id_.c_str (), sizeof (header->originator) - 1);
1021 
1022  // send data is generally an assign type. However, MessageHeader is
1023  // flexible enough to support both, and this will simply our read thread
1024  // handling
1025  header->type = MULTIASSIGN;
1026 
1027  }
1028 
1029  // set the time-to-live
1030  header->ttl = settings_.get_rebroadcast_ttl ();
1031 
1032  header->updates = uint32_t (filtered_updates.size ());
1033 
1034  // compute size of this header
1035  header->size = header->encoded_size ();
1036 
1037  // keep track of the maximum buffer size for encoding
1038  int max_buffer_size = (int)buffer_remaining;
1039 
1040  // set the update to the end of the header
1041  char * update = header->write (buffer, buffer_remaining);
1042  uint64_t * message_size = (uint64_t *)buffer;
1043 
1044  // Message header format
1045  // [size|id|domain|originator|type|updates|quality|clock|list of updates]
1046 
1060  // zero out the memory
1061  //memset(buffer, 0, MAX_PACKET_SIZE);
1062 
1063  // Message update format
1064  // [key|value]
1065 
1066  int j = 0;
1067  for (knowledge::KnowledgeMap::const_iterator i = filtered_updates.begin ();
1068  i != filtered_updates.end (); ++i, ++j)
1069  {
1070  update = i->second.write (update, i->first, buffer_remaining);
1071 
1072  if (buffer_remaining > 0)
1073  {
1075  "%s:" \
1076  " update[%d] => encoding %s of type %" PRId32 " and size %" PRIu32 "\n",
1077  print_prefix,
1078  j, i->first.c_str (), i->second.type (), i->second.size ());
1079  }
1080  else
1081  {
1083  "%s:" \
1084  " unable to encode update[%d] => %s of type %"
1085  PRId32 " and size %" PRIu32 "\n",
1086  print_prefix,
1087  j, i->first.c_str (), i->second.type (), i->second.size ());
1088  }
1089  }
1090 
1091  long size (0);
1092 
1093  if (buffer_remaining > 0)
1094  {
1095  size = (long)(settings_.queue_length - buffer_remaining);
1096  *message_size = utility::endian_swap ((uint64_t)size);
1097 
1098  // before we send to others, we first execute rules
1099  if (settings_.on_data_received_logic.length () != 0)
1100  {
1101 #ifndef _MADARA_NO_KARL_
1102 
1104  "%s:" \
1105  " evaluating rules in %s\n",
1106  print_prefix,
1108 
1110 
1112  "%s:" \
1113  " rules have been successfully evaluated\n",
1114  print_prefix);
1115 
1116 #endif // _MADARA_NO_KARL_
1117 
1118  }
1119  else
1120  {
1122  "%s:" \
1123  " no permanent rules were set\n",
1124  print_prefix);
1125  }
1126  }
1127 
1129  "%s:" \
1130  " calling encode filters\n",
1131  print_prefix);
1132 
1133  // buffer is ready encoding
1134  size = (long)settings_.filter_encode ((unsigned char *)buffer_.get_ptr (),
1135  (int)size, max_buffer_size);
1136 
1138  "%s:" \
1139  " header info before encode: %s\n",
1140  print_prefix, header->to_string ().c_str ());
1141 
1142  delete header;
1143 
1144  return size;
1145 }
1146 
1147 } }
This class encapsulates an entry in a KnowledgeBase.
uint32_t max_quality(const KnowledgeRecords &records)
Returns the maximum quality within the records.
MADARA_EXPORT char * add_fragment(const char *originator, uint64_t clock, uint32_t update_number, const char *fragment, uint32_t queue_length, OriginatorFragmentMap &map, bool clear=true)
Adds a fragment to an originator fragment map and returns the aggregate message if the message is com...
madara::expression::ExpressionTree on_data_received_
data received rules, defined in Transport settings
Definition: Transport.h:140
bool is_trusted(const std::string &peer) const
Checks if a peer is trusted.
virtual uint32_t encoded_size(void) const
Returns the size of the encoded MessageHeader class, which may be different from sizeof (MessageHeade...
Base(const std::string &id, TransportSettings &new_settings, knowledge::ThreadSafeContext &context)
Constructor.
Definition: Transport.cpp:11
OriginatorFragmentMap fragment_map
map of fragments received by originator
void print_status(unsigned int log_level=0, const char *prefix="PacketScheduler")
Prints the number of status of the packet scheduler.
QoSTransportSettings settings_
Definition: Transport.h:133
int filter_decode(unsigned char *source, int size, int max_size) const
Calls decode on the the buffer filter chain.
uint64_t get_bytes_per_second(void)
Queries the monitor for the current bandwidth utilization per second over the past window...
void add_read_domain(const std::string domain)
Adds a read domain to the list of domains to read from.
void invalidate_transport(void)
Invalidates a transport to indicate it is shutting down.
Definition: Transport.inl:33
virtual const char * read(const char *buffer, int64_t &buffer_remaining)
Reads a MessageHeader instance from a buffer and updates the amount of buffer room remaining...
uint32_t quality
priority of the update
bool is_bandwidth_violated(int64_t limit)
Checks send and receive bandwidth against send and receive limits.
void attach(knowledge::ThreadSafeContext *context)
Attaches a context to the various filtering systems.
int validate_transport(void)
Validates a transport to indicate it is not shutting down.
Definition: Transport.inl:6
uint32_t updates
the number of knowledge variable updates in the message
int64_t get_time(void)
Returns a time of day in nanoseconds If simtime feature is enabled, this may be simulation time inste...
Definition: Utility.inl:253
int64_t get_send_bandwidth_limit(void) const
Returns the limit for sending on this transport in bytes per second.
knowledge::KnowledgeRecord filter_receive(const madara::knowledge::KnowledgeRecord &input, const std::string &name, transport::TransportContext &context) const
Filters an input according to the receive filter chain.
uint32_t fragment_queue_length
Indicates queue length for holding clock-keyed fragments.
This class stores variables and their values for use by any entity needing state information in a thr...
Provides scheduler for dropping packets.
unsigned char get_rebroadcast_ttl(void) const
Gets the time to live for rebroadcasting (number of rebroadcasts per message).
Holds basic transport settings.
Compiled, optimized KaRL logic.
double get_deadline(void) const
Returns the latency deadline in seconds.
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
Defines a fragmentation header which allows for multi-part messages that are only applied once all fr...
Definition: Fragmentation.h:48
Provides context about the transport.
uint32_t type
the type of message
bool add(void)
Adds a message to the monitor.
BandwidthMonitor receive_monitor_
monitor for receiving bandwidth usage
Definition: Transport.h:148
static uint32_t static_encoded_size(void)
Returns the size of the encoded MessageHeader class.
size_t get_number_of_send_aggregate_filters(void) const
Returns the number of aggregate filters applied before sending @ return the number of aggregate filte...
int get_level(void)
Gets the maximum logging detail level.
Definition: Logger.inl:66
virtual std::string to_string(void)
Converts the relevant fields to a printable string.
knowledge::KnowledgeRecord evaluate(CompiledExpression expression, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Evaluate a compiled expression.
size_t num_read_domains(void) const
Returns the number of read domains.
A thread-safe guard for a context or knowledge base.
Definition: ContextGuard.h:23
static bool fragment_message_header_test(const char *buffer)
Tests the buffer for a reduced message identifier.
static struct madara::knowledge::tags::string_t string
uint64_t endian_swap(uint64_t value)
Converts a host format uint64_t into big endian.
Definition: Utility.inl:115
T * get_ptr(void)
get the underlying pointer
Definition: ScopedArray.inl:68
knowledge::KnowledgeRecord filter_rebroadcast(const madara::knowledge::KnowledgeRecord &input, const std::string &name, transport::TransportContext &context) const
Filters an input according to the rebroadcast filter chain.
std::string write_domain
All class members are accessible to users for easy setup.
uint64_t size
the size of this header plus the updates
static uint64_t get_size(const char *buffer)
Returns the size field of the header.
Parses incoming expression strings into a parse tree and generates an expression tree from the parse ...
Definition: Interpreter.h:42
static uint32_t static_encoded_size(void)
Returns the size of the encoded MessageHeader class, which may be different from sizeof (MessageHeade...
::std::map< std::string, KnowledgeRecord > KnowledgeMap
void get_read_domains(std::vector< std::string > &domains) const
Retrieves the list of read domains.
bool send_reduced_message_header
send the reduced message header (clock, size, updates, KaRL id)
void add(uint64_t size)
Adds a message to the monitor.
size_t get_number_of_receive_aggregate_filters(void) const
Returns the number of aggregate filters applied after receiving @ return the number of aggregate filt...
static bool reduced_message_header_test(const char *buffer)
Tests the buffer for a reduced message identifier.
unsigned char ttl
time to live (number of rebroadcasts to perform after original send
void attach(const QoSTransportSettings *settings)
Attaches settings.
const std::string id_
host:port identifier of this process
Definition: Transport.h:131
bool is_reading_domain(const std::string domain) const
Checks if a domain is in the domain read list.
uint64_t timestamp
the timestamp of the sender when the message was generated
std::string on_data_received_logic
logic to be evaluated after every successful update
void set_changed(void)
Force a change to be registered, waking up anyone waiting on entry.
std::map< const char *, VariableReference, VariableReferenceMapCompare > VariableReferenceMap
a map of variable references
char originator[64]
the originator of the message (host:port)
char domain[32]
the domain that this message is intended for
uint32_t queue_length
Length of the buffer used to store history of events.
Container for quality-of-service settings.
virtual void close(void)
Closes this transport.
Definition: Transport.cpp:103
MADARA_EXPORT bool exists(const char *originator, uint64_t clock, uint32_t update_number, OriginatorFragmentMap &map)
Checks if a fragment already exists within a fragment map.
int filter_encode(unsigned char *source, int size, int max_size) const
Calls encode on the the buffer filter chain.
int64_t get_total_bandwidth_limit(void) const
Returns the total limit for this transport in bytes per second.
Provides monitoring capability of a transport&#39;s bandwidth.
static bool message_header_test(const char *buffer)
Tests the buffer for a normal message identifier.
knowledge::KnowledgeRecord filter_send(const madara::knowledge::KnowledgeRecord &input, const std::string &name, transport::TransportContext &context) const
Filters an input according to send&#39;s filter chain.
#define MADARA_IDENTIFIER_LENGTH
Definition: MessageHeader.h:21
uint64_t get_clock(void) const
Atomically gets the Lamport clock.
uint64_t clock
the clock of the sender when the message was generated
PacketScheduler packet_scheduler_
scheduler for dropping packets to simulate network issues
Definition: Transport.h:151
Defines a simple, smaller message header of 29 bytes that supports less QoS.
Copyright (c) 2015 Carnegie Mellon University.
madara::utility::ScopedArray< char > buffer_
buffer for sending
Definition: Transport.h:154
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
virtual int setup(void)
all subclasses should call this method at the end of its setup
Definition: Transport.cpp:30
virtual ~Base()=0
Destructor.
Definition: Transport.cpp:25
int process_received_update(const char *buffer, uint32_t bytes_read, const std::string &id, knowledge::ThreadSafeContext &context, const QoSTransportSettings &settings, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, knowledge::KnowledgeMap &rebroadcast_records, knowledge::CompiledExpression &on_data_received, const char *print_prefix, const char *remote_host, MessageHeader *&header)
Processes a received update, updates monitors, fills rebroadcast records according to settings filter...
Definition: Transport.cpp:109
Defines a robust message header which is the default for KaRL messages.
Definition: MessageHeader.h:57
long prep_send(const knowledge::VariableReferenceMap &orig_updates, const char *print_prefix)
Preps a message for sending.
Definition: Transport.cpp:812
size_t get_number_of_rebroadcast_aggregate_filters(void) const
Returns the number of aggregate filters applied before rebroadcasting @ return the number of aggregat...
int prep_rebroadcast(knowledge::ThreadSafeContext &context, char *buffer, int64_t &buffer_remaining, const QoSTransportSettings &settings, const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records, PacketScheduler &packet_scheduler)
Preps a buffer for rebroadcasting records to other agents on the network.
Definition: Transport.cpp:737
madara::knowledge::KnowledgeRecord evaluate(const madara::knowledge::KnowledgeUpdateSettings &settings=knowledge::KnowledgeUpdateSettings())
Evaluates the expression tree.
BandwidthMonitor send_monitor_
monitor for sending bandwidth usage
Definition: Transport.h:145
madara::knowledge::ThreadSafeContext & context_
Definition: Transport.h:136
uint32_t quality
the quality of the message sender
int check_transport(void)
all subclasses should call this method at the beginning of send_data
Definition: Transport.inl:18
unsigned char get_participant_ttl(void) const
Returns the maximum time to live participation of this transport in rebroadcasting of other agent&#39;s m...
TransportSettings & settings(void)
Getter for the transport settings.
Definition: Transport.inl:43
virtual char * write(char *buffer, int64_t &buffer_remaining)
Writes a MessageHeader instance to a buffer and updates the amount of buffer room remaining...