14 participant_rebroadcast_ttl_(0),
17 packet_drop_rate_(0.0),
18 packet_drop_burst_(1),
19 max_send_bandwidth_(-1),
20 max_total_bandwidth_(-1),
28 rebroadcast_ttl_(settings.rebroadcast_ttl_),
29 participant_rebroadcast_ttl_(settings.participant_rebroadcast_ttl_),
30 trusted_peers_(settings.trusted_peers_),
31 banned_peers_(settings.banned_peers_),
32 rebroadcast_filters_(settings.rebroadcast_filters_),
33 receive_filters_(settings.receive_filters_),
34 send_filters_(settings.send_filters_),
35 buffer_filters_(settings.buffer_filters_),
36 packet_drop_rate_(settings.packet_drop_rate_),
37 packet_drop_type_(settings.packet_drop_type_),
38 packet_drop_burst_(settings.packet_drop_burst_),
39 max_send_bandwidth_(settings.max_send_bandwidth_),
40 max_total_bandwidth_(settings.max_total_bandwidth_),
41 deadline_(settings.deadline_)
49 participant_rebroadcast_ttl_(0),
52 packet_drop_rate_(0.0),
53 max_send_bandwidth_(-1),
54 max_total_bandwidth_(-1),
96 *lhs_base = *rhs_base;
120 rebroadcast_ttl_ = 0;
121 participant_rebroadcast_ttl_ = 0;
122 trusted_peers_.clear();
123 banned_peers_.clear();
127 packet_drop_rate_ = 0.0;
129 packet_drop_burst_ = 1;
130 max_send_bandwidth_ = -1;
131 max_total_bandwidth_ = -1;
137 *lhs_base = *rhs_base;
144 rebroadcast_ttl_ = ttl;
150 return rebroadcast_ttl_;
156 participant_rebroadcast_ttl_ = ttl;
162 return participant_rebroadcast_ttl_;
168 banned_peers_.erase(peer);
169 trusted_peers_[peer] = 1;
175 trusted_peers_.erase(peer);
176 banned_peers_[peer] = 1;
182 bool condition =
false;
183 if(trusted_peers_.find(peer) != trusted_peers_.end())
185 trusted_peers_.erase(peer);
194 bool condition =
false;
195 if(banned_peers_.find(peer) != banned_peers_.end())
197 banned_peers_.erase(peer);
206 bool condition =
false;
215 if(trusted_peers_.size() == 0)
217 if(banned_peers_.find(peer) == banned_peers_.end())
222 condition = (trusted_peers_.find(peer) != trusted_peers_.end());
232 send_filters_.add(types,
function);
238 send_filters_.add(types, functor);
244 send_filters_.add(
function);
250 send_filters_.add(functor);
256 receive_filters_.add(functor);
262 buffer_filters_.push_back(functor);
269 receive_filters_.add(types,
function);
275 receive_filters_.add(types, functor);
282 receive_filters_.add(
function);
289 rebroadcast_filters_.add(types,
function);
295 rebroadcast_filters_.add(types, functor);
302 rebroadcast_filters_.add(
function);
308 rebroadcast_filters_.add(functor);
314 uint32_t types, jobject&
object)
317 "QoSTransportSettings::add: "
318 "Adding Java record filter to receive queue\n");
320 receive_filters_.add(types,
object);
324 uint32_t types, jobject&
object)
326 send_filters_.add(types,
object);
330 uint32_t types, jobject&
object)
332 rebroadcast_filters_.add(types,
object);
338 receive_filters_.add(
object);
343 send_filters_.add(
object);
349 rebroadcast_filters_.add(
object);
354 #ifdef _MADARA_PYTHON_CALLBACKS_
357 uint32_t types, boost::python::object&
object)
359 receive_filters_.add(types,
object);
363 uint32_t types, boost::python::object&
object)
365 send_filters_.add(types,
object);
369 uint32_t types, boost::python::object&
object)
371 rebroadcast_filters_.add(types,
object);
375 boost::python::object&
object)
377 receive_filters_.add(
object);
381 boost::python::object&
object)
383 send_filters_.add(
object);
387 boost::python::object&
object)
389 rebroadcast_filters_.add(
object);
397 send_filters_.attach(context);
398 receive_filters_.attach(context);
399 rebroadcast_filters_.attach(context);
404 send_filters_.clear(types);
409 send_filters_.clear_aggregate_filters();
415 receive_filters_.clear(types);
420 receive_filters_.clear_aggregate_filters();
426 rebroadcast_filters_.clear(types);
431 buffer_filters_.clear();
437 rebroadcast_filters_.clear_aggregate_filters();
445 return send_filters_.filter(input, name, context);
452 send_filters_.filter(records, transport_context);
456 char* source,
int size,
int max_size)
const
459 for(filters::BufferFilters::const_iterator i = buffer_filters_.begin();
460 i != buffer_filters_.end(); ++i)
463 "QoSTransportSettings::filter_encode: size before encode: "
467 size = (*i)->encode(source, size, max_size);
470 "QoSTransportSettings::filter_encode: size after encode: "
474 if(max_size > size + 20)
476 memmove(source + 20, source, size);
480 header.
size = (uint64_t)size;
482 int64_t buffer_remaining = 20;
484 header.
write((
char*)source, buffer_remaining);
489 "QoSTransportSettings::filter_encode: header: "
490 "%s:%s within size %d\n",
495 std::stringstream buffer;
496 buffer <<
"QoSTransportSettings::filter_encode: ";
497 buffer << (size + 20) <<
" 20 byte size encoding cannot fit in ";
498 buffer << max_size <<
" byte buffer\n";
508 char* source,
int size,
int max_size)
const
514 header.
read((
char*)source, buffer_size);
520 "QoSTransportSettings::filter_decode: header: "
521 " Detected %s. decode has to be called on defragged buffer.\n",
529 "QoSTransportSettings::filter_decode: header: "
534 if(buffer_filters_.size() == 0)
541 "QoSTransportSettings::filter_decode: header: "
548 "QoSTransportSettings::filter_decode: header: "
549 " Detected %s, which is not a message or checkpoint header\n",
557 for(filters::BufferFilters::const_reverse_iterator i =
558 buffer_filters_.rbegin();
559 i != buffer_filters_.rend(); ++i)
565 header.
read((
char*)source, buffer_size);
567 if(header.
size > (uint64_t)max_size)
570 "QoSTransportSettings::filter_decode: header: "
571 " %d byte size encoding cannot fit in %d byte buffer\n",
572 (
int)header.
size, max_size);
578 "QoSTransportSettings::filter_decode: header: "
585 "QoSTransportSettings::filter_decode: filter is null somehow\n");
592 "QoSTransportSettings::filter_decode: filter is not null\n");
598 "QoSTransportSettings::filter_decode: buffer filter %s is a "
605 "QoSTransportSettings::filter_decode: buffer filter %s doesn't "
614 "QoSTransportSettings::filter_decode: size before decode: "
615 " %d of %d (header.size=%d)\n",
616 size, max_size, (
int)header.
size);
619 (
int)header.
size, max_size);
622 "QoSTransportSettings::filter_decode: size after decode: "
623 " %d of %d (header.size=%d)\n",
624 size, max_size, (
int)header.
size);
635 "QoSTransportSettings::filter_decode: "
636 " %d byte size encoding cannot fit in %d byte buffer\n",
651 return receive_filters_.filter(input, name, context);
658 receive_filters_.filter(records, transport_context);
666 return rebroadcast_filters_.filter(input, name, context);
673 rebroadcast_filters_.filter(records, transport_context);
678 send_filters_.print_num_filters();
684 receive_filters_.print_num_filters();
690 rebroadcast_filters_.print_num_filters();
697 return send_filters_.get_number_of_filtered_types();
704 return send_filters_.get_number_of_aggregate_filters();
710 return rebroadcast_filters_.get_number_of_filtered_types();
716 return rebroadcast_filters_.get_number_of_aggregate_filters();
723 return receive_filters_.get_number_of_filtered_types();
729 return receive_filters_.get_number_of_aggregate_filters();
735 return buffer_filters_.size();
739 double drop_rate,
int drop_type, uint64_t drop_burst)
741 packet_drop_rate_ = drop_rate;
742 packet_drop_type_ = drop_type;
743 packet_drop_burst_ = drop_burst;
748 return packet_drop_rate_;
753 return packet_drop_type_;
758 return packet_drop_burst_;
762 int64_t send_bandwidth)
764 max_send_bandwidth_ = send_bandwidth;
770 return max_send_bandwidth_;
774 int64_t total_bandwidth)
776 max_total_bandwidth_ = total_bandwidth;
782 return max_total_bandwidth_;
787 deadline_ = deadline;
807 (
unsigned char)
knowledge.get(prefix +
".rebroadcast_ttl").to_integer();
808 participant_rebroadcast_ttl_ =
809 (
unsigned char)
knowledge.get(prefix +
".participant_rebroadcast_ttl")
812 std::vector<std::string> trusted_keys, banned_keys;
814 trusted_peers.
keys(trusted_keys);
815 banned_peers.
keys(banned_keys);
817 for(
size_t i = 0; i < trusted_keys.size(); ++i)
819 trusted_peers_[trusted_keys[i]] = 1;
822 for(
size_t i = 0; i < banned_keys.size(); ++i)
824 banned_peers_[banned_keys[i]] = 1;
829 value =
knowledge.get(prefix +
".packet_drop_rate");
832 packet_drop_rate_ = (uint32_t)value.
to_double();
835 value =
knowledge.get(prefix +
".packet_drop_type");
841 value =
knowledge.get(prefix +
".packet_drop_burst");
844 packet_drop_burst_ = (uint64_t)value.
to_integer();
847 value =
knowledge.get(prefix +
".max_send_bandwidth");
850 max_send_bandwidth_ = (int64_t)value.
to_integer();
853 value =
knowledge.get(prefix +
".max_total_bandwidth");
856 max_total_bandwidth_ = (int64_t)value.
to_integer();
859 value =
knowledge.get(prefix +
".deadline");
874 #ifndef _MADARA_NO_KARL_
884 (
unsigned char)
knowledge.get(prefix +
".rebroadcast_ttl").to_integer();
885 participant_rebroadcast_ttl_ =
886 (
unsigned char)
knowledge.get(prefix +
".participant_rebroadcast_ttl")
889 std::vector<std::string> trusted_keys, banned_keys;
891 trusted_peers.
keys(trusted_keys);
892 banned_peers.
keys(banned_keys);
894 for(
size_t i = 0; i < trusted_keys.size(); ++i)
896 trusted_peers_[trusted_keys[i]] = 1;
899 for(
size_t i = 0; i < banned_keys.size(); ++i)
901 banned_peers_[banned_keys[i]] = 1;
906 value =
knowledge.get(prefix +
".packet_drop_rate");
909 packet_drop_rate_ = (uint32_t)value.
to_double();
912 value =
knowledge.get(prefix +
".packet_drop_type");
918 value =
knowledge.get(prefix +
".packet_drop_burst");
921 packet_drop_burst_ = (uint64_t)value.
to_integer();
924 value =
knowledge.get(prefix +
".max_send_bandwidth");
927 max_send_bandwidth_ = (int64_t)value.
to_integer();
930 value =
knowledge.get(prefix +
".max_total_bandwidth");
933 max_total_bandwidth_ = (int64_t)value.
to_integer();
936 value =
knowledge.get(prefix +
".deadline");
957 knowledge.set(prefix +
".participant_rebroadcast_ttl",
958 Integer(participant_rebroadcast_ttl_));
960 for(std::map<std::string, int>::const_iterator i = trusted_peers_.begin();
961 i != trusted_peers_.end(); ++i)
966 for(std::map<std::string, int>::const_iterator i = banned_peers_.begin();
967 i != banned_peers_.end(); ++i)
972 knowledge.set(prefix +
".packet_drop_rate", packet_drop_rate_);
976 knowledge.set(prefix +
".max_send_bandwidth",
Integer(max_send_bandwidth_));
977 knowledge.set(prefix +
".max_total_bandwidth",
Integer(max_total_bandwidth_));
978 knowledge.set(prefix +
".deadline", deadline_);
992 #ifndef _MADARA_NO_KARL_
1003 knowledge.set(prefix +
".participant_rebroadcast_ttl",
1004 Integer(participant_rebroadcast_ttl_));
1006 for(std::map<std::string, int>::const_iterator i = trusted_peers_.begin();
1007 i != trusted_peers_.end(); ++i)
1012 for(std::map<std::string, int>::const_iterator i = banned_peers_.begin();
1013 i != banned_peers_.end(); ++i)
1018 knowledge.set(prefix +
".packet_drop_rate", packet_drop_rate_);
1022 knowledge.set(prefix +
".max_send_bandwidth",
Integer(max_send_bandwidth_));
1023 knowledge.set(prefix +
".max_total_bandwidth",
Integer(max_total_bandwidth_));
1024 knowledge.set(prefix +
".deadline", deadline_);
#define madara_logger_ptr_log(loggering, level,...)
Fast version of the madara::logger::log method for Logger pointers.
madara::knowledge::KnowledgeRecord::Integer Integer
An exception for general memory errors like out-of-memory.
Abstract base class for implementing aggregate record filters via a functor interface.
Abstract base class for implementing buffer filters via a functor interface.
Abstract base class for implementing individual record filters via a functor interface.
This class provides a distributed knowledge base to users.
This class encapsulates an entry in a KnowledgeBase.
double to_double(void) const
converts the value to a float/double.
bool exists(void) const
Checks if record exists (i.e., is not uncreated)
Integer to_integer(void) const
converts the value to an integer.
This class stores variables and their values for use by any entity needing state information in a thr...
Provides an interface for external functions into the MADARA KaRL variable settings.
This class stores a map of strings to KaRL variables.
int set(const std::string &key, madara::knowledge::KnowledgeRecord::Integer value=madara::knowledge::KnowledgeRecord::MODIFIED)
Sets a location within the map to the specified value.
void keys(std::vector< std::string > &curkeys) const
Returns the keys within the map.
Container for quality-of-service settings.
double packet_drop_rate_
Rate for dropping packets.
virtual void load(const std::string &filename, const std::string &prefix="transport")
Loads the settings from a binary file.
size_t get_number_of_send_filtered_types(void) const
Returns the number of types that are filtered before send.
knowledge::KnowledgeRecordFilters receive_filters_
A container for receive filters.
virtual void save_text(const std::string &filename, const std::string &prefix="transport") const
Saves the settings from a text file.
knowledge::KnowledgeRecord filter_rebroadcast(const madara::knowledge::KnowledgeRecord &input, const std::string &name, transport::TransportContext &context) const
Filters an input according to the rebroadcast filter chain.
void set_send_bandwidth_limit(int64_t bandwidth)
Sets a bandwidth limit for sending on this transport in bytes per sec.
size_t get_number_of_receive_aggregate_filters(void) const
Returns the number of aggregate filters applied after receiving @ return the number of aggregate filt...
int filter_encode(char *source, int size, int max_size) const
Calls encode on the the buffer filter chain.
void add_trusted_peer(const std::string &peer)
Adds a trusted peer.
void set_total_bandwidth_limit(int64_t bandwidth)
Sets a bandwidth limit for receiving and sending over the transport.
void print_num_filters_rebroadcast(void) const
Prints the number of filters chained for each type to the rebroadcast filter.
void add_rebroadcast_filter(uint32_t types, knowledge::KnowledgeRecord(*function)(knowledge::FunctionArguments &, knowledge::Variables &))
Adds a filter that will be applied to certain types after receiving and before rebroadcasting (if TTL...
knowledge::KnowledgeRecord filter_receive(const madara::knowledge::KnowledgeRecord &input, const std::string &name, transport::TransportContext &context) const
Filters an input according to the receive filter chain.
unsigned char participant_rebroadcast_ttl_
This field is meant to limit the number of rebroadcasts that this transport will participate in.
void enable_participant_ttl(unsigned char maximum_ttl=255)
Enables rebroadcast support up to a certain time to live for other agent's messages.
std::map< std::string, int > trusted_peers_
A container of all trusted peers.
double deadline_
Deadline for packets at which packets drop.
unsigned char get_participant_ttl(void) const
Returns the maximum time to live participation of this transport in rebroadcasting of other agent's m...
size_t get_number_of_receive_filtered_types(void) const
Returns the number of types that are filtered after received.
double get_drop_rate(void) const
Returns the percentage of dropped packets to enforce on sends.
void operator=(const QoSTransportSettings &settings)
Assignment operator.
double get_deadline(void) const
Returns the latency deadline in seconds.
void set_deadline(double deadline)
Sets the packet deadline in seconds.
int64_t max_send_bandwidth_
Maximum send bandwidth usage per second before packets drop.
size_t get_number_of_send_aggregate_filters(void) const
Returns the number of aggregate filters applied before sending @ return the number of aggregate filte...
void clear_buffer_filters(void)
Clears the list of buffer filters.
void attach(knowledge::ThreadSafeContext *context)
Attaches a context to the various filtering systems.
knowledge::KnowledgeRecordFilters rebroadcast_filters_
A container for rebroadcast filters.
knowledge::KnowledgeRecordFilters send_filters_
A container for filters applied before sending from this host.
bool remove_banned_peer(const std::string &peer)
Removes a trusted peer, if it exists in the list.
filters::BufferFilters buffer_filters_
buffer filters have an encode and decode method
void add_filter(filters::BufferFilter *filter)
Adds a buffer filter to the chain.
void print_num_filters_receive(void) const
Prints the number of filters chained for each type to the receive filter.
size_t get_number_of_buffer_filters(void) const
Returns the number of buffer filters.
void clear_rebroadcast_filters(uint32_t types)
Clears the list of filters for the specified types.
void set_rebroadcast_ttl(unsigned char ttl)
Sets the time to live for our packets.
knowledge::KnowledgeRecord filter_send(const madara::knowledge::KnowledgeRecord &input, const std::string &name, transport::TransportContext &context) const
Filters an input according to send's filter chain.
virtual void save(const std::string &filename, const std::string &prefix="transport") const
Saves the settings from a binary file.
void clear_rebroadcast_aggregate_filters(void)
Clears the list of rebroadcast time aggregate filters.
unsigned char get_rebroadcast_ttl(void) const
Gets the time to live for rebroadcasting (number of rebroadcasts per message).
void add_banned_peer(const std::string &peer)
Adds a banned peer.
unsigned char rebroadcast_ttl_
number of rebroadcasts for receivers to ultimately do.
virtual ~QoSTransportSettings()
Destructor.
bool is_trusted(const std::string &peer) const
Checks if a peer is trusted.
void clear_send_filters(uint32_t types)
Clears the list of filters for the specified types.
uint64_t get_drop_burst(void) const
Returns the bursts of packet drops.
std::map< std::string, int > banned_peers_
A container of all banned peers.
void print_num_filters_send(void) const
Prints the number of filters chained for each type to the send filter.
int filter_decode(char *source, int size, int max_size) const
Calls decode on the the buffer filter chain.
void clear_receive_filters(uint32_t types)
Clears the list of filters for the specified types.
virtual void load_text(const std::string &filename, const std::string &prefix="transport")
Loads the settings from a text file.
int get_drop_type(void) const
Returns the policy type for packet drops.
QoSTransportSettings()
Default constructor.
void clear_send_aggregate_filters(void)
Clears the list of send time aggregate filters.
int64_t get_send_bandwidth_limit(void) const
Returns the limit for sending on this transport in bytes per second.
int64_t max_total_bandwidth_
Maximum bandwidth usage for the transport (receive/send) before drop.
void add_receive_filter(uint32_t types, knowledge::KnowledgeRecord(*function)(knowledge::FunctionArguments &, knowledge::Variables &))
Adds a filter that will be applied to certain types after receiving and before applying to the local ...
void update_drop_rate(double drop_rate, int drop_type=PACKET_DROP_DETERMINISTIC, uint64_t drop_burst=1)
Updates a packet drop rate, type, and burst.
int64_t get_total_bandwidth_limit(void) const
Returns the total limit for this transport in bytes per second.
uint64_t packet_drop_burst_
Burst of packet drops.
size_t get_number_of_rebroadcast_aggregate_filters(void) const
Returns the number of aggregate filters applied before rebroadcasting @ return the number of aggregat...
void clear_receive_aggregate_filters(void)
Clears the list of receive time aggregate filters.
int packet_drop_type_
Drop rate type.
void add_send_filter(uint32_t types, knowledge::KnowledgeRecord(*function)(knowledge::FunctionArguments &, knowledge::Variables &))
Adds a filter that will be applied to certain types before sending.
size_t get_number_of_rebroadcast_filtered_types(void) const
Returns the number of types that are filtered before rebroadcast.
bool remove_trusted_peer(const std::string &peer)
Removes a trusted peer, if it exists in the list.
Provides context about the transport.
Holds basic transport settings.
virtual void save_text(const std::string &filename, const std::string &prefix="transport") const
Saves the settings from a text file.
virtual void load_text(const std::string &filename, const std::string &prefix="transport")
Loads the settings from a text file.
virtual void save(const std::string &filename, const std::string &prefix="transport") const
Saves the settings from a binary file.
virtual void load(const std::string &filename, const std::string &prefix="transport")
Loads the settings from a binary file.
Provides container classes for fast knowledge base access and mutation.
Provides functions and classes for the distributed knowledge base.
T get(const KnowledgeRecord &kr)
Get the value of a KnowlegeRecord.
std::vector< KnowledgeRecord > FunctionArguments
::std::map< std::string, KnowledgeRecord > KnowledgeMap
Provides knowledge logging services to files and terminals.
MADARA_EXPORT utility::Refcounter< logger::Logger > global_logger
@ PACKET_DROP_PROBABLISTIC
std::string file_to_string(const std::string &filename)
Reads a file into a string.
std::string to_string_version(uint32_t version)
Converts a MADARA uint32_t version number to human-readable.
MADARA_EXPORT bool begins_with(const std::string &input, const std::string &prefix)
Check if input contains prefix at the beginning.