MADARA  3.4.1
QoSTransportSettings.cpp
Go to the documentation of this file.
6 
7 namespace logger = madara::logger;
10 
13  rebroadcast_ttl_(0),
14  participant_rebroadcast_ttl_(0),
15  trusted_peers_(),
16  banned_peers_(),
17  packet_drop_rate_(0.0),
18  packet_drop_burst_(1),
19  max_send_bandwidth_(-1),
20  max_total_bandwidth_(-1),
21  deadline_(-1)
22 {
23 }
24 
26  const QoSTransportSettings& settings)
27  : TransportSettings(settings),
28  rebroadcast_ttl_(settings.rebroadcast_ttl_),
29  participant_rebroadcast_ttl_(settings.participant_rebroadcast_ttl_),
30  trusted_peers_(settings.trusted_peers_),
31  banned_peers_(settings.banned_peers_),
32  rebroadcast_filters_(settings.rebroadcast_filters_),
33  receive_filters_(settings.receive_filters_),
34  send_filters_(settings.send_filters_),
35  buffer_filters_(settings.buffer_filters_),
36  packet_drop_rate_(settings.packet_drop_rate_),
37  packet_drop_type_(settings.packet_drop_type_),
38  packet_drop_burst_(settings.packet_drop_burst_),
39  max_send_bandwidth_(settings.max_send_bandwidth_),
40  max_total_bandwidth_(settings.max_total_bandwidth_),
41  deadline_(settings.deadline_)
42 {
43 }
44 
46  const TransportSettings& settings)
47  : TransportSettings(settings),
48  rebroadcast_ttl_(0),
49  participant_rebroadcast_ttl_(0),
50  trusted_peers_(),
51  banned_peers_(),
52  packet_drop_rate_(0.0),
53  max_send_bandwidth_(-1),
54  max_total_bandwidth_(-1),
55  deadline_(-1)
56 {
57  const QoSTransportSettings* rhs =
58  dynamic_cast<const QoSTransportSettings*>(&settings);
59  if(rhs)
60  {
74  deadline_ = rhs->deadline_;
75  }
76  else
77  {
78  TransportSettings* lhs = dynamic_cast<TransportSettings*>(this);
79  *lhs = settings;
80  }
81 }
82 
84 {
85  // need to clean up functors
86 }
87 
89  const QoSTransportSettings& rhs)
90 {
91  if(this != &rhs)
92  {
93  TransportSettings* lhs_base = (TransportSettings*)this;
94  TransportSettings* rhs_base = (TransportSettings*)&rhs;
95 
96  *lhs_base = *rhs_base;
97 
98  rebroadcast_ttl_ = rhs.rebroadcast_ttl_;
99  participant_rebroadcast_ttl_ = rhs.participant_rebroadcast_ttl_;
100  trusted_peers_ = rhs.trusted_peers_;
101  banned_peers_ = rhs.banned_peers_;
102  send_filters_ = rhs.send_filters_;
103  receive_filters_ = rhs.receive_filters_;
104  buffer_filters_ = rhs.buffer_filters_;
105  rebroadcast_filters_ = rhs.rebroadcast_filters_;
106  packet_drop_rate_ = rhs.packet_drop_rate_;
107  packet_drop_type_ = rhs.packet_drop_type_;
108  packet_drop_burst_ = rhs.packet_drop_burst_;
109  max_send_bandwidth_ = rhs.max_send_bandwidth_;
110  max_total_bandwidth_ = rhs.max_total_bandwidth_;
111  deadline_ = rhs.deadline_;
112  }
113 }
114 
116  const TransportSettings& rhs)
117 {
118  if(this != &rhs)
119  {
120  rebroadcast_ttl_ = 0;
121  participant_rebroadcast_ttl_ = 0;
122  trusted_peers_.clear();
123  banned_peers_.clear();
124  send_filters_.clear(knowledge::KnowledgeRecord::ALL_TYPES);
125  receive_filters_.clear(knowledge::KnowledgeRecord::ALL_TYPES);
126  rebroadcast_filters_.clear(knowledge::KnowledgeRecord::ALL_TYPES);
127  packet_drop_rate_ = 0.0;
128  packet_drop_type_ = PACKET_DROP_PROBABLISTIC;
129  packet_drop_burst_ = 1;
130  max_send_bandwidth_ = -1;
131  max_total_bandwidth_ = -1;
132  deadline_ = -1;
133 
134  TransportSettings* lhs_base = (TransportSettings*)this;
135  TransportSettings* rhs_base = (TransportSettings*)&rhs;
136 
137  *lhs_base = *rhs_base;
138  }
139 }
140 
142  unsigned char ttl)
143 {
144  rebroadcast_ttl_ = ttl;
145 }
146 
148  void) const
149 {
150  return rebroadcast_ttl_;
151 }
152 
154  unsigned char ttl)
155 {
156  participant_rebroadcast_ttl_ = ttl;
157 }
158 
160  void) const
161 {
162  return participant_rebroadcast_ttl_;
163 }
164 
166  const std::string& peer)
167 {
168  banned_peers_.erase(peer);
169  trusted_peers_[peer] = 1;
170 }
171 
173  const std::string& peer)
174 {
175  trusted_peers_.erase(peer);
176  banned_peers_[peer] = 1;
177 }
178 
180  const std::string& peer)
181 {
182  bool condition = false;
183  if(trusted_peers_.find(peer) != trusted_peers_.end())
184  {
185  trusted_peers_.erase(peer);
186  condition = true;
187  }
188  return condition;
189 }
190 
192  const std::string& peer)
193 {
194  bool condition = false;
195  if(banned_peers_.find(peer) != banned_peers_.end())
196  {
197  banned_peers_.erase(peer);
198  condition = true;
199  }
200  return condition;
201 }
202 
204  const std::string& peer) const
205 {
206  bool condition = false;
207 
215  if(trusted_peers_.size() == 0)
216  {
217  if(banned_peers_.find(peer) == banned_peers_.end())
218  condition = true;
219  }
220  else
221  {
222  condition = (trusted_peers_.find(peer) != trusted_peers_.end());
223  }
224 
225  return condition;
226 }
227 
229  uint32_t types, madara::knowledge::KnowledgeRecord (*function)(
231 {
232  send_filters_.add(types, function);
233 }
234 
236  uint32_t types, filters::RecordFilter* functor)
237 {
238  send_filters_.add(types, functor);
239 }
240 
243 {
244  send_filters_.add(function);
245 }
246 
248  filters::AggregateFilter* functor)
249 {
250  send_filters_.add(functor);
251 }
252 
254  filters::AggregateFilter* functor)
255 {
256  receive_filters_.add(functor);
257 }
258 
260  filters::BufferFilter* functor)
261 {
262  buffer_filters_.push_back(functor);
263 }
264 
266  uint32_t types, madara::knowledge::KnowledgeRecord (*function)(
268 {
269  receive_filters_.add(types, function);
270 }
271 
273  uint32_t types, filters::RecordFilter* functor)
274 {
275  receive_filters_.add(types, functor);
276 }
277 
279  void (*function)(knowledge::KnowledgeMap&, const TransportContext&,
281 {
282  receive_filters_.add(function);
283 }
284 
286  uint32_t types, madara::knowledge::KnowledgeRecord (*function)(
288 {
289  rebroadcast_filters_.add(types, function);
290 }
291 
293  uint32_t types, filters::RecordFilter* functor)
294 {
295  rebroadcast_filters_.add(types, functor);
296 }
297 
299  void (*function)(knowledge::KnowledgeMap&, const TransportContext&,
301 {
302  rebroadcast_filters_.add(function);
303 }
304 
306  filters::AggregateFilter* functor)
307 {
308  rebroadcast_filters_.add(functor);
309 }
310 
311 #ifdef _MADARA_JAVA_
312 
314  uint32_t types, jobject& object)
315 {
317  "QoSTransportSettings::add: "
318  "Adding Java record filter to receive queue\n");
319 
320  receive_filters_.add(types, object);
321 }
322 
324  uint32_t types, jobject& object)
325 {
326  send_filters_.add(types, object);
327 }
328 
330  uint32_t types, jobject& object)
331 {
332  rebroadcast_filters_.add(types, object);
333 }
334 
336  jobject& object)
337 {
338  receive_filters_.add(object);
339 }
340 
342 {
343  send_filters_.add(object);
344 }
345 
347  jobject& object)
348 {
349  rebroadcast_filters_.add(object);
350 }
351 
352 #endif
353 
354 #ifdef _MADARA_PYTHON_CALLBACKS_
355 
357  uint32_t types, boost::python::object& object)
358 {
359  receive_filters_.add(types, object);
360 }
361 
363  uint32_t types, boost::python::object& object)
364 {
365  send_filters_.add(types, object);
366 }
367 
369  uint32_t types, boost::python::object& object)
370 {
371  rebroadcast_filters_.add(types, object);
372 }
373 
375  boost::python::object& object)
376 {
377  receive_filters_.add(object);
378 }
379 
381  boost::python::object& object)
382 {
383  send_filters_.add(object);
384 }
385 
387  boost::python::object& object)
388 {
389  rebroadcast_filters_.add(object);
390 }
391 
392 #endif
393 
396 {
397  send_filters_.attach(context);
398  receive_filters_.attach(context);
399  rebroadcast_filters_.attach(context);
400 }
401 
403 {
404  send_filters_.clear(types);
405 }
406 
408 {
409  send_filters_.clear_aggregate_filters();
410 }
411 
413  uint32_t types)
414 {
415  receive_filters_.clear(types);
416 }
417 
419 {
420  receive_filters_.clear_aggregate_filters();
421 }
422 
424  uint32_t types)
425 {
426  rebroadcast_filters_.clear(types);
427 }
428 
430 {
431  buffer_filters_.clear();
432 }
433 
436 {
437  rebroadcast_filters_.clear_aggregate_filters();
438 }
439 
442  const madara::knowledge::KnowledgeRecord& input, const std::string& name,
443  transport::TransportContext& context) const
444 {
445  return send_filters_.filter(input, name, context);
446 }
447 
449  knowledge::KnowledgeMap& records,
450  const TransportContext& transport_context) const
451 {
452  send_filters_.filter(records, transport_context);
453 }
454 
456  char* source, int size, int max_size) const
457 {
458  // encode from front to back
459  for(filters::BufferFilters::const_iterator i = buffer_filters_.begin();
460  i != buffer_filters_.end(); ++i)
461  {
463  "QoSTransportSettings::filter_encode: size before encode: "
464  " %d of %d\n",
465  size, max_size);
466 
467  size = (*i)->encode(source, size, max_size);
468 
470  "QoSTransportSettings::filter_encode: size after encode: "
471  " %d of %d\n",
472  size, max_size);
473 
474  if(max_size > size + 20)
475  {
476  memmove(source + 20, source, size);
477 
479  header.read(*i);
480  header.size = (uint64_t)size;
481 
482  int64_t buffer_remaining = 20;
483 
484  header.write((char*)source, buffer_remaining);
485 
487 
489  "QoSTransportSettings::filter_encode: header: "
490  "%s:%s within size %d\n",
491  header.id, utility::to_string_version(header.version).c_str(), size);
492  }
493  else
494  {
495  std::stringstream buffer;
496  buffer << "QoSTransportSettings::filter_encode: ";
497  buffer << (size + 20) << " 20 byte size encoding cannot fit in ";
498  buffer << max_size << " byte buffer\n";
499 
500  throw exceptions::MemoryException(buffer.str());
501  }
502  }
503 
504  return size;
505 }
506 
508  char* source, int size, int max_size) const
509 {
510  // if we don't have buffer filters, do a check to see if we should
512  int64_t buffer_size = (int64_t)filters::BufferFilterHeader::encoded_size();
513 
514  header.read((char*)source, buffer_size);
515 
516  // if this is a fragment, the decode needs to be run after defrag
517  if(utility::begins_with (header.id, "KFRG"))
518  {
520  "QoSTransportSettings::filter_decode: header: "
521  " Detected %s. decode has to be called on defragged buffer.\n",
522  header.id);
523 
524  return size;
525  }
526  else
527  {
529  "QoSTransportSettings::filter_decode: header: "
530  " Detected %s\n",
531  header.id);
532  }
533 
534  if(buffer_filters_.size() == 0)
535  {
536  // id is either karl or KaRL. If it's anything else, then error
537  if(utility::begins_with (header.id, "karl") ||
538  utility::begins_with (header.id, "KaRL"))
539  {
541  "QoSTransportSettings::filter_decode: header: "
542  " Detected %s\n",
543  header.id);
544  }
545  else
546  {
548  "QoSTransportSettings::filter_decode: header: "
549  " Detected %s, which is not a message or checkpoint header\n",
550  header.id);
551 
552  return 0;
553  }
554  }
555 
556  // decode from back to front
557  for(filters::BufferFilters::const_reverse_iterator i =
558  buffer_filters_.rbegin();
559  i != buffer_filters_.rend(); ++i)
560  {
562  {
563  buffer_size = (int64_t)filters::BufferFilterHeader::encoded_size();
564 
565  header.read((char*)source, buffer_size);
566 
567  if(header.size > (uint64_t)max_size)
568  {
570  "QoSTransportSettings::filter_decode: header: "
571  " %d byte size encoding cannot fit in %d byte buffer\n",
572  (int)header.size, max_size);
573 
574  return 0;
575  }
576 
578  "QoSTransportSettings::filter_decode: header: "
579  " %s:%s\n",
580  header.id, utility::to_string_version(header.version).c_str());
581 
582  if(*i == 0)
583  {
585  "QoSTransportSettings::filter_decode: filter is null somehow\n");
586 
587  return 0;
588  }
589  else
590  {
592  "QoSTransportSettings::filter_decode: filter is not null\n");
593  }
594 
595  if(header.check_filter(*i))
596  {
598  "QoSTransportSettings::filter_decode: buffer filter %s is a "
599  "match\n",
600  header.id);
601  }
602  else
603  {
605  "QoSTransportSettings::filter_decode: buffer filter %s doesn't "
606  "match."
607  " Returning 0.\n",
608  header.id);
609 
610  return 0;
611  }
612 
614  "QoSTransportSettings::filter_decode: size before decode: "
615  " %d of %d (header.size=%d)\n",
616  size, max_size, (int)header.size);
617 
618  size = (*i)->decode(source + filters::BufferFilterHeader::encoded_size(),
619  (int)header.size, max_size);
620 
622  "QoSTransportSettings::filter_decode: size after decode: "
623  " %d of %d (header.size=%d)\n",
624  size, max_size, (int)header.size);
625 
626  if(size > 0)
627  {
628  memmove(
629  source, source + filters::BufferFilterHeader::encoded_size(), size);
630  }
631  }
632  else
633  {
635  "QoSTransportSettings::filter_decode: "
636  " %d byte size encoding cannot fit in %d byte buffer\n",
637  size, max_size);
638 
639  return 0;
640  } // end if size is bigger than the buffer header
641  } // end for loop iteration of filters
642 
643  return size;
644 }
645 
648  const madara::knowledge::KnowledgeRecord& input, const std::string& name,
649  transport::TransportContext& context) const
650 {
651  return receive_filters_.filter(input, name, context);
652 }
653 
655  knowledge::KnowledgeMap& records,
656  const transport::TransportContext& transport_context) const
657 {
658  receive_filters_.filter(records, transport_context);
659 }
660 
663  const madara::knowledge::KnowledgeRecord& input, const std::string& name,
664  transport::TransportContext& context) const
665 {
666  return rebroadcast_filters_.filter(input, name, context);
667 }
668 
670  knowledge::KnowledgeMap& records,
671  const transport::TransportContext& transport_context) const
672 {
673  rebroadcast_filters_.filter(records, transport_context);
674 }
675 
677 {
678  send_filters_.print_num_filters();
679 }
680 
682  void) const
683 {
684  receive_filters_.print_num_filters();
685 }
686 
688  void) const
689 {
690  rebroadcast_filters_.print_num_filters();
691 }
692 
693 size_t
695  void) const
696 {
697  return send_filters_.get_number_of_filtered_types();
698 }
699 
700 size_t
702  void) const
703 {
704  return send_filters_.get_number_of_aggregate_filters();
705 }
706 
709 {
710  return rebroadcast_filters_.get_number_of_filtered_types();
711 }
712 
715 {
716  return rebroadcast_filters_.get_number_of_aggregate_filters();
717 }
718 
719 size_t
721  void) const
722 {
723  return receive_filters_.get_number_of_filtered_types();
724 }
725 
728 {
729  return receive_filters_.get_number_of_aggregate_filters();
730 }
731 
733  void) const
734 {
735  return buffer_filters_.size();
736 }
737 
739  double drop_rate, int drop_type, uint64_t drop_burst)
740 {
741  packet_drop_rate_ = drop_rate;
742  packet_drop_type_ = drop_type;
743  packet_drop_burst_ = drop_burst;
744 }
745 
747 {
748  return packet_drop_rate_;
749 }
750 
752 {
753  return packet_drop_type_;
754 }
755 
757 {
758  return packet_drop_burst_;
759 }
760 
762  int64_t send_bandwidth)
763 {
764  max_send_bandwidth_ = send_bandwidth;
765 }
766 
768  void) const
769 {
770  return max_send_bandwidth_;
771 }
772 
774  int64_t total_bandwidth)
775 {
776  max_total_bandwidth_ = total_bandwidth;
777 }
778 
780  void) const
781 {
782  return max_total_bandwidth_;
783 }
784 
786 {
787  deadline_ = deadline;
788 }
789 
791 {
792  return deadline_;
793 }
794 
796  const std::string& filename, const std::string& prefix)
797 {
798  TransportSettings::load(filename, prefix);
799 
801  knowledge.load_context(filename);
802 
803  containers::Map trusted_peers(prefix + ".trusted_peers", knowledge);
804  containers::Map banned_peers(prefix + ".banned_peers", knowledge);
805 
806  rebroadcast_ttl_ =
807  (unsigned char)knowledge.get(prefix + ".rebroadcast_ttl").to_integer();
808  participant_rebroadcast_ttl_ =
809  (unsigned char)knowledge.get(prefix + ".participant_rebroadcast_ttl")
810  .to_integer();
811 
812  std::vector<std::string> trusted_keys, banned_keys;
813 
814  trusted_peers.keys(trusted_keys);
815  banned_peers.keys(banned_keys);
816 
817  for(size_t i = 0; i < trusted_keys.size(); ++i)
818  {
819  trusted_peers_[trusted_keys[i]] = 1;
820  }
821 
822  for(size_t i = 0; i < banned_keys.size(); ++i)
823  {
824  banned_peers_[banned_keys[i]] = 1;
825  }
826 
828 
829  value = knowledge.get(prefix + ".packet_drop_rate");
830  if (value.exists())
831  {
832  packet_drop_rate_ = (uint32_t)value.to_double();
833  }
834 
835  value = knowledge.get(prefix + ".packet_drop_type");
836  if (value.exists())
837  {
838  packet_drop_type_ = (int)value.to_integer();
839  }
840 
841  value = knowledge.get(prefix + ".packet_drop_burst");
842  if (value.exists())
843  {
844  packet_drop_burst_ = (uint64_t)value.to_integer();
845  }
846 
847  value = knowledge.get(prefix + ".max_send_bandwidth");
848  if (value.exists())
849  {
850  max_send_bandwidth_ = (int64_t)value.to_integer();
851  }
852 
853  value = knowledge.get(prefix + ".max_total_bandwidth");
854  if (value.exists())
855  {
856  max_total_bandwidth_ = (int64_t)value.to_integer();
857  }
858 
859  value = knowledge.get(prefix + ".deadline");
860  if (value.exists())
861  {
862  deadline_ = value.to_double();
863  }
864 }
865 
867  const std::string& filename, const std::string& prefix)
868 {
869  TransportSettings::load_text(filename, prefix);
870 
872 
873 
874 #ifndef _MADARA_NO_KARL_
875 
876  knowledge.evaluate(madara::utility::file_to_string(filename));
877 
878 #endif // end karl support
879 
880  containers::Map trusted_peers(prefix + ".trusted_peers", knowledge);
881  containers::Map banned_peers(prefix + ".banned_peers", knowledge);
882 
883  rebroadcast_ttl_ =
884  (unsigned char)knowledge.get(prefix + ".rebroadcast_ttl").to_integer();
885  participant_rebroadcast_ttl_ =
886  (unsigned char)knowledge.get(prefix + ".participant_rebroadcast_ttl")
887  .to_integer();
888 
889  std::vector<std::string> trusted_keys, banned_keys;
890 
891  trusted_peers.keys(trusted_keys);
892  banned_peers.keys(banned_keys);
893 
894  for(size_t i = 0; i < trusted_keys.size(); ++i)
895  {
896  trusted_peers_[trusted_keys[i]] = 1;
897  }
898 
899  for(size_t i = 0; i < banned_keys.size(); ++i)
900  {
901  banned_peers_[banned_keys[i]] = 1;
902  }
903 
905 
906  value = knowledge.get(prefix + ".packet_drop_rate");
907  if (value.exists())
908  {
909  packet_drop_rate_ = (uint32_t)value.to_double();
910  }
911 
912  value = knowledge.get(prefix + ".packet_drop_type");
913  if (value.exists())
914  {
915  packet_drop_type_ = (int)value.to_integer();
916  }
917 
918  value = knowledge.get(prefix + ".packet_drop_burst");
919  if (value.exists())
920  {
921  packet_drop_burst_ = (uint64_t)value.to_integer();
922  }
923 
924  value = knowledge.get(prefix + ".max_send_bandwidth");
925  if (value.exists())
926  {
927  max_send_bandwidth_ = (int64_t)value.to_integer();
928  }
929 
930  value = knowledge.get(prefix + ".max_total_bandwidth");
931  if (value.exists())
932  {
933  max_total_bandwidth_ = (int64_t)value.to_integer();
934  }
935 
936  value = knowledge.get(prefix + ".deadline");
937  if (value.exists())
938  {
939  deadline_ = value.to_double();
940  }
941 }
942 
944  const std::string& filename, const std::string& prefix) const
945 {
946  // Save the underlying base settings first
947  TransportSettings::save(filename, prefix);
948 
949  // then load the savings
951  knowledge.load_context(filename);
952 
953  containers::Map trusted_peers(prefix + ".trusted_peers", knowledge);
954  containers::Map banned_peers(prefix + ".banned_peers", knowledge);
955 
956  knowledge.set(prefix + ".rebroadcast_ttl", Integer(rebroadcast_ttl_));
957  knowledge.set(prefix + ".participant_rebroadcast_ttl",
958  Integer(participant_rebroadcast_ttl_));
959 
960  for(std::map<std::string, int>::const_iterator i = trusted_peers_.begin();
961  i != trusted_peers_.end(); ++i)
962  {
963  trusted_peers.set(i->first, Integer(1));
964  }
965 
966  for(std::map<std::string, int>::const_iterator i = banned_peers_.begin();
967  i != banned_peers_.end(); ++i)
968  {
969  banned_peers.set(i->first, Integer(1));
970  }
971 
972  knowledge.set(prefix + ".packet_drop_rate", packet_drop_rate_);
973  knowledge.set(prefix + ".packet_drop_type", Integer(packet_drop_type_));
974  knowledge.set(prefix + ".packet_drop_burst", Integer(packet_drop_burst_));
975 
976  knowledge.set(prefix + ".max_send_bandwidth", Integer(max_send_bandwidth_));
977  knowledge.set(prefix + ".max_total_bandwidth", Integer(max_total_bandwidth_));
978  knowledge.set(prefix + ".deadline", deadline_);
979 
980  knowledge.save_context(filename);
981 }
982 
984  const std::string& filename, const std::string& prefix) const
985 {
986  // Save the underlying base settings first
987  TransportSettings::save_text(filename, prefix);
988 
989  // then load the savings
991 
992 #ifndef _MADARA_NO_KARL_
993 
994  knowledge.evaluate(madara::utility::file_to_string(filename));
995 
996 #endif // end karl support
997 
998 
999  containers::Map trusted_peers(prefix + ".trusted_peers", knowledge);
1000  containers::Map banned_peers(prefix + ".banned_peers", knowledge);
1001 
1002  knowledge.set(prefix + ".rebroadcast_ttl", Integer(rebroadcast_ttl_));
1003  knowledge.set(prefix + ".participant_rebroadcast_ttl",
1004  Integer(participant_rebroadcast_ttl_));
1005 
1006  for(std::map<std::string, int>::const_iterator i = trusted_peers_.begin();
1007  i != trusted_peers_.end(); ++i)
1008  {
1009  trusted_peers.set(i->first, Integer(1));
1010  }
1011 
1012  for(std::map<std::string, int>::const_iterator i = banned_peers_.begin();
1013  i != banned_peers_.end(); ++i)
1014  {
1015  banned_peers.set(i->first, Integer(1));
1016  }
1017 
1018  knowledge.set(prefix + ".packet_drop_rate", packet_drop_rate_);
1019  knowledge.set(prefix + ".packet_drop_type", Integer(packet_drop_type_));
1020  knowledge.set(prefix + ".packet_drop_burst", Integer(packet_drop_burst_));
1021 
1022  knowledge.set(prefix + ".max_send_bandwidth", Integer(max_send_bandwidth_));
1023  knowledge.set(prefix + ".max_total_bandwidth", Integer(max_total_bandwidth_));
1024  knowledge.set(prefix + ".deadline", deadline_);
1025 
1026  knowledge.save_as_karl(filename);
1027 }
#define madara_logger_ptr_log(loggering, level,...)
Fast version of the madara::logger::log method for Logger pointers.
Definition: Logger.h:41
madara::knowledge::KnowledgeRecord::Integer Integer
An exception for general memory errors like out-of-memory.
Abstract base class for implementing aggregate record filters via a functor interface.
Defines a buffer filter header.
uint64_t size
the size of this header plus the updates
static uint64_t encoded_size(void)
Returns the size of the encoded BufferFilterHeader class.
char * write(char *buffer, int64_t &buffer_remaining)
Writes a BufferFilterHeader instance to a buffer and updates the amount of buffer room remaining.
void read(filters::BufferFilter *filter)
Reads relevant fields from a filter.
bool check_filter(filters::BufferFilter *filter)
Checks compatability between the header and the filter.
Abstract base class for implementing buffer filters via a functor interface.
Definition: BufferFilter.h:27
Abstract base class for implementing individual record filters via a functor interface.
Definition: RecordFilter.h:34
This class provides a distributed knowledge base to users.
Definition: KnowledgeBase.h:45
This class encapsulates an entry in a KnowledgeBase.
double to_double(void) const
converts the value to a float/double.
bool exists(void) const
Checks if record exists (i.e., is not uncreated)
Integer to_integer(void) const
converts the value to an integer.
This class stores variables and their values for use by any entity needing state information in a thr...
Provides an interface for external functions into the MADARA KaRL variable settings.
Definition: Variables.h:53
This class stores a map of strings to KaRL variables.
Definition: Map.h:33
int set(const std::string &key, madara::knowledge::KnowledgeRecord::Integer value=madara::knowledge::KnowledgeRecord::MODIFIED)
Sets a location within the map to the specified value.
Definition: Map.cpp:582
void keys(std::vector< std::string > &curkeys) const
Returns the keys within the map.
Definition: Map.cpp:501
Container for quality-of-service settings.
double packet_drop_rate_
Rate for dropping packets.
virtual void load(const std::string &filename, const std::string &prefix="transport")
Loads the settings from a binary file.
size_t get_number_of_send_filtered_types(void) const
Returns the number of types that are filtered before send.
knowledge::KnowledgeRecordFilters receive_filters_
A container for receive filters.
virtual void save_text(const std::string &filename, const std::string &prefix="transport") const
Saves the settings from a text file.
knowledge::KnowledgeRecord filter_rebroadcast(const madara::knowledge::KnowledgeRecord &input, const std::string &name, transport::TransportContext &context) const
Filters an input according to the rebroadcast filter chain.
void set_send_bandwidth_limit(int64_t bandwidth)
Sets a bandwidth limit for sending on this transport in bytes per sec.
size_t get_number_of_receive_aggregate_filters(void) const
Returns the number of aggregate filters applied after receiving @ return the number of aggregate filt...
int filter_encode(char *source, int size, int max_size) const
Calls encode on the the buffer filter chain.
void add_trusted_peer(const std::string &peer)
Adds a trusted peer.
void set_total_bandwidth_limit(int64_t bandwidth)
Sets a bandwidth limit for receiving and sending over the transport.
void print_num_filters_rebroadcast(void) const
Prints the number of filters chained for each type to the rebroadcast filter.
void add_rebroadcast_filter(uint32_t types, knowledge::KnowledgeRecord(*function)(knowledge::FunctionArguments &, knowledge::Variables &))
Adds a filter that will be applied to certain types after receiving and before rebroadcasting (if TTL...
knowledge::KnowledgeRecord filter_receive(const madara::knowledge::KnowledgeRecord &input, const std::string &name, transport::TransportContext &context) const
Filters an input according to the receive filter chain.
unsigned char participant_rebroadcast_ttl_
This field is meant to limit the number of rebroadcasts that this transport will participate in.
void enable_participant_ttl(unsigned char maximum_ttl=255)
Enables rebroadcast support up to a certain time to live for other agent's messages.
std::map< std::string, int > trusted_peers_
A container of all trusted peers.
double deadline_
Deadline for packets at which packets drop.
unsigned char get_participant_ttl(void) const
Returns the maximum time to live participation of this transport in rebroadcasting of other agent's m...
size_t get_number_of_receive_filtered_types(void) const
Returns the number of types that are filtered after received.
double get_drop_rate(void) const
Returns the percentage of dropped packets to enforce on sends.
void operator=(const QoSTransportSettings &settings)
Assignment operator.
double get_deadline(void) const
Returns the latency deadline in seconds.
void set_deadline(double deadline)
Sets the packet deadline in seconds.
int64_t max_send_bandwidth_
Maximum send bandwidth usage per second before packets drop.
size_t get_number_of_send_aggregate_filters(void) const
Returns the number of aggregate filters applied before sending @ return the number of aggregate filte...
void clear_buffer_filters(void)
Clears the list of buffer filters.
void attach(knowledge::ThreadSafeContext *context)
Attaches a context to the various filtering systems.
knowledge::KnowledgeRecordFilters rebroadcast_filters_
A container for rebroadcast filters.
knowledge::KnowledgeRecordFilters send_filters_
A container for filters applied before sending from this host.
bool remove_banned_peer(const std::string &peer)
Removes a trusted peer, if it exists in the list.
filters::BufferFilters buffer_filters_
buffer filters have an encode and decode method
void add_filter(filters::BufferFilter *filter)
Adds a buffer filter to the chain.
void print_num_filters_receive(void) const
Prints the number of filters chained for each type to the receive filter.
size_t get_number_of_buffer_filters(void) const
Returns the number of buffer filters.
void clear_rebroadcast_filters(uint32_t types)
Clears the list of filters for the specified types.
void set_rebroadcast_ttl(unsigned char ttl)
Sets the time to live for our packets.
knowledge::KnowledgeRecord filter_send(const madara::knowledge::KnowledgeRecord &input, const std::string &name, transport::TransportContext &context) const
Filters an input according to send's filter chain.
virtual void save(const std::string &filename, const std::string &prefix="transport") const
Saves the settings from a binary file.
void clear_rebroadcast_aggregate_filters(void)
Clears the list of rebroadcast time aggregate filters.
unsigned char get_rebroadcast_ttl(void) const
Gets the time to live for rebroadcasting (number of rebroadcasts per message).
void add_banned_peer(const std::string &peer)
Adds a banned peer.
unsigned char rebroadcast_ttl_
number of rebroadcasts for receivers to ultimately do.
bool is_trusted(const std::string &peer) const
Checks if a peer is trusted.
void clear_send_filters(uint32_t types)
Clears the list of filters for the specified types.
uint64_t get_drop_burst(void) const
Returns the bursts of packet drops.
std::map< std::string, int > banned_peers_
A container of all banned peers.
void print_num_filters_send(void) const
Prints the number of filters chained for each type to the send filter.
int filter_decode(char *source, int size, int max_size) const
Calls decode on the the buffer filter chain.
void clear_receive_filters(uint32_t types)
Clears the list of filters for the specified types.
virtual void load_text(const std::string &filename, const std::string &prefix="transport")
Loads the settings from a text file.
int get_drop_type(void) const
Returns the policy type for packet drops.
void clear_send_aggregate_filters(void)
Clears the list of send time aggregate filters.
int64_t get_send_bandwidth_limit(void) const
Returns the limit for sending on this transport in bytes per second.
int64_t max_total_bandwidth_
Maximum bandwidth usage for the transport (receive/send) before drop.
void add_receive_filter(uint32_t types, knowledge::KnowledgeRecord(*function)(knowledge::FunctionArguments &, knowledge::Variables &))
Adds a filter that will be applied to certain types after receiving and before applying to the local ...
void update_drop_rate(double drop_rate, int drop_type=PACKET_DROP_DETERMINISTIC, uint64_t drop_burst=1)
Updates a packet drop rate, type, and burst.
int64_t get_total_bandwidth_limit(void) const
Returns the total limit for this transport in bytes per second.
uint64_t packet_drop_burst_
Burst of packet drops.
size_t get_number_of_rebroadcast_aggregate_filters(void) const
Returns the number of aggregate filters applied before rebroadcasting @ return the number of aggregat...
void clear_receive_aggregate_filters(void)
Clears the list of receive time aggregate filters.
void add_send_filter(uint32_t types, knowledge::KnowledgeRecord(*function)(knowledge::FunctionArguments &, knowledge::Variables &))
Adds a filter that will be applied to certain types before sending.
size_t get_number_of_rebroadcast_filtered_types(void) const
Returns the number of types that are filtered before rebroadcast.
bool remove_trusted_peer(const std::string &peer)
Removes a trusted peer, if it exists in the list.
Provides context about the transport.
Holds basic transport settings.
virtual void save_text(const std::string &filename, const std::string &prefix="transport") const
Saves the settings from a text file.
virtual void load_text(const std::string &filename, const std::string &prefix="transport")
Loads the settings from a text file.
virtual void save(const std::string &filename, const std::string &prefix="transport") const
Saves the settings from a binary file.
virtual void load(const std::string &filename, const std::string &prefix="transport")
Loads the settings from a binary file.
Provides container classes for fast knowledge base access and mutation.
Definition: Barrier.h:27
constexpr string_t string
Provides functions and classes for the distributed knowledge base.
T get(const KnowledgeRecord &kr)
Get the value of a KnowlegeRecord.
Definition: GetRecord.h:121
std::vector< KnowledgeRecord > FunctionArguments
::std::map< std::string, KnowledgeRecord > KnowledgeMap
Provides knowledge logging services to files and terminals.
Definition: GlobalLogger.h:12
MADARA_EXPORT utility::Refcounter< logger::Logger > global_logger
std::string file_to_string(const std::string &filename)
Reads a file into a string.
Definition: Utility.cpp:324
std::string to_string_version(uint32_t version)
Converts a MADARA uint32_t version number to human-readable.
Definition: Utility.cpp:64
MADARA_EXPORT bool begins_with(const std::string &input, const std::string &prefix)
Check if input contains prefix at the beginning.
Definition: Utility.inl:638