9 namespace madara {
namespace transport {
14 : is_valid_ (false), shutting_down_ (false), id_ (id),
15 settings_ (new_settings), context_ (context)
17 #ifndef _MADARA_NO_KARL_
18 , on_data_received_ (context.get_logger ())
36 "transport::Base::setup" \
37 " setting rules to %s\n",
40 #ifndef _MADARA_NO_KARL_ 44 #endif // _MADARA_NO_KARL_ 49 "transport::Base::setup" \
50 " no permanent rules were set\n");
61 "transport::Base::setup" \
62 " no read domains set. Adding write domain (%s)\n",
70 "transport::Base::setup" \
71 " settings configured with %d read domains\n",
78 std::vector <std::string> domains;
81 std::stringstream buffer;
83 for (
unsigned int i = 0; i < domains.size (); ++i)
87 if (i != domains.size () - 1)
94 "transport::Base::setup" \
95 " Write domain: %s. Read domains: %s\n",
118 #ifndef _MADARA_NO_KARL_
122 const char * print_prefix,
123 const char * remote_host,
129 int max_buffer_size = (int)bytes_read;
132 receive_monitor.
add (bytes_read);
136 " Receive bandwidth = %" PRIu64
" B/s\n",
140 bool is_reduced =
false;
141 bool is_fragment =
false;
145 " calling decode filters on %" PRIu32
" bytes\n",
146 print_prefix, bytes_read);
149 bytes_read = (uint32_t)settings.
filter_decode ((
unsigned char *)buffer,
150 max_buffer_size, max_buffer_size);
154 " Decoding resulted in %" PRIu32
" final bytes\n",
155 print_prefix, bytes_read);
158 int64_t buffer_remaining = (int64_t)bytes_read;
161 rebroadcast_records.clear ();
172 " processing reduced KaRL message from %s\n",
184 " processing KaRL message from %s\n",
195 " processing KaRL fragment message from %s\n",
211 " dropping non-KaRL message with id %s from %s\n",
222 " dropping too short message from %s (length %i)\n",
230 const char * update = header->
read (buffer, buffer_remaining);
234 " header info: %s\n",
235 print_prefix, header->
to_string ().c_str ());
237 if (header->
size < bytes_read)
241 " Message header.size (%" PRIu64
" bytes) is less than actual" 242 " bytes read (%" PRIu32
" bytes). Dropping message.\n",
243 print_prefix, header->
size, bytes_read);
254 " Fragment already exists in fragment map. Dropping.\n",
267 " dropping message from ourself\n",
276 " remote id (%s) is not our own\n",
284 "%s: remote id (%s) is trusted\n",
292 " dropping message from untrusted peer (%s\n",
306 " originator (%s) is trusted\n",
308 originator.c_str ());
314 " dropping message from untrusted originator (%s)\n",
316 originator.c_str ());
326 " remote id (%s) has an untrusted domain (%s). Dropping message.\n",
338 " remote id (%s) message is in our domain\n",
353 " Processing fragment %" PRIu32
" of %s:%" PRIu64
".\n",
372 " Message has been pieced together from fragments. Processing...\n",
380 buffer_remaining = (int64_t)frag_header->
get_size (message);
383 char * buffer_override = (
char *)buffer;
384 memcpy (buffer_override, message, frag_header->
get_size (message));
391 " processing reduced KaRL message from %s\n",
397 update = header->read (buffer, buffer_remaining);
403 " processing KaRL message from %s\n",
408 update = header->read (buffer, buffer_remaining);
416 int actual_updates = 0;
427 uint64_t latency (0);
433 latency = current_time - header->
timestamp;
435 if (latency > deadline)
439 " deadline violation (latency is %" PRIu64
", deadline is %f).\n",
450 " Cannot compute message latency." \
451 " Message header timestamp is in the future." \
452 " message.timestamp = %" PRIu64
", cur_timestamp = %" PRIu64
".\n",
460 " iterating over the %" PRIu32
" updates\n",
467 record.clock = header->
clock;
470 bool dropped =
false;
478 " Send monitor has detected violation of bandwidth limit." \
479 " Dropping packet from rebroadcast list\n", print_prefix);
487 " Receive monitor has detected violation of bandwidth limit." \
488 " Dropping packet from rebroadcast list...\n", print_prefix);
495 " Transport participant TTL is lower than header ttl." \
496 " Dropping packet from rebroadcast list...\n", print_prefix);
501 " Applying %" PRIu32
" updates\n", print_prefix, header->
updates);
504 for (uint32_t i = 0; i < header->
updates; ++i)
507 update = record.read (update, key, buffer_remaining);
509 if (buffer_remaining < 0)
513 " unable to process message. Buffer remaining is negative." \
514 " Server is likely being targeted by custom KaRL tools.\n",
524 " Applying receive filter to %s\n", print_prefix, key.c_str ());
526 record = settings.
filter_receive (record, key, transport_context);
528 if (record.exists ())
532 " Filter results for %s were %s\n", print_prefix,
533 key.c_str (), record.to_string ().c_str ());
535 updates[key] = record;
541 " Filter resulted in dropping %s\n", print_prefix, key.c_str ());
548 if (additionals.size () > 0)
552 " %lld additional records being handled after receive.\n", print_prefix,
553 (
long long)additionals.size ());
555 for (knowledge::KnowledgeMap::const_iterator i = additionals.begin ();
556 i != additionals.end (); ++i)
558 updates[i->first] = i->second;
561 transport_context.clear_records ();
578 " Applying aggregate receive filters.\n", print_prefix);
587 " No aggregate receive filters were applied...\n",
593 " Locking the context to apply updates.\n", print_prefix);
600 " Applying updates to context.\n", print_prefix);
603 for (knowledge::KnowledgeMap::iterator i = updates.begin ();
604 i != updates.end (); ++i)
608 result = i->second.apply (context, i->first, header->
quality,
609 header->
clock,
false);
616 " update %s=%s was rejected\n",
618 key.c_str (), record.to_string ().c_str ());
624 " update %s=%s was accepted\n",
626 key.c_str (), record.to_string ().c_str ());
635 transport_context.set_operation (
640 " Applying rebroadcast filters to receive results.\n", print_prefix);
643 for (knowledge::KnowledgeMap::iterator i = updates.begin ();
644 i != updates.end (); ++i)
647 i->second, i->first, transport_context);
649 if (i->second.exists ())
651 if (i->second.to_string () !=
"")
655 " Filter results for key %s were %s\n", print_prefix,
656 i->first.c_str (), i->second.to_string ().c_str ());
658 rebroadcast_records[i->first] = i->second;
664 " Filter resulted in dropping %s\n", print_prefix,
671 for (knowledge::KnowledgeMap::const_iterator i = additionals.begin ();
672 i != additionals.end (); ++i)
674 rebroadcast_records[i->first] = i->second;
679 " Applying aggregate rebroadcast filters to %d records.\n",
680 print_prefix, rebroadcast_records.size ());
684 && rebroadcast_records.size () > 0)
692 " No aggregate rebroadcast filters were applied...\n",
698 " Returning to caller with %d rebroadcast records.\n",
699 print_prefix, rebroadcast_records.size ());
706 " Rebroadcast packet was dropped...\n",
713 #ifndef _MADARA_NO_KARL_ 716 " evaluating rules in %s\n",
720 context.
evaluate (on_data_received);
721 #endif // _MADARA_NO_KARL_ 728 " no permanent rules were set\n",
732 return actual_updates;
740 int64_t & buffer_remaining,
742 const char * print_prefix,
749 if (header->
ttl > 0 && records.size () > 0 && packet_scheduler.
add ())
752 uint64_t * message_size = (uint64_t *)buffer;
753 int max_buffer_size = (int)buffer_remaining;
756 header->
updates = uint32_t (records.size ());
759 char * update = header->
write (buffer, buffer_remaining);
761 for (knowledge::KnowledgeMap::const_iterator i = records.begin ();
762 i != records.end (); ++i)
764 update = i->second.write (update, i->first, buffer_remaining);
767 if (buffer_remaining > 0)
769 int size = (int)(settings.
queue_length - buffer_remaining);
774 " %" PRIu64
" bytes prepped for rebroadcast packet\n",
781 " calling encode filters\n",
785 result, max_buffer_size);
791 " Not enough buffer for rebroadcasting packet\n",
801 " No rebroadcast necessary.\n",
814 const char * print_prefix)
821 "%s: transport has been told to shutdown",
829 "%s: transport is not valid",
837 bool reduced =
false;
843 " Applying filters before sending...\n",
853 bool dropped =
false;
861 " Send monitor has detected violation of bandwidth limit." \
862 " Dropping packet...\n", print_prefix);
870 " Receive monitor has detected violation of bandwidth limit." \
871 " Dropping packet...\n", print_prefix);
880 for (
const auto &e : orig_updates)
884 " Calling filter chain.\n", print_prefix);
888 *e.second.get_record_unsafe (), e.first, transport_context);
892 " Filter returned.\n", print_prefix);
894 if (result.exists ())
898 " Adding record to update list.\n", print_prefix);
900 filtered_updates[e.first] = result;
906 " Filter removed record from update list.\n", print_prefix);
912 for (knowledge::KnowledgeMap::const_iterator i = additionals.begin ();
913 i != additionals.end (); ++i)
917 " Filter added a record %s to the update list.\n",
918 print_prefix, i->first.c_str ());
919 filtered_updates[i->first] = i->second;
926 " Packet scheduler has dropped packet...\n", print_prefix);
933 " Applying %d aggregate update send filters to %d updates...\n",
935 (int)filtered_updates.size ());
939 filtered_updates.size () > 0)
947 " No aggregate send filters were applied...\n",
955 " Finished applying filters before sending...\n",
958 if (filtered_updates.size () == 0)
962 " Filters removed all data. Nothing to send.\n",
976 " Unable to allocate buffer of size " PRIu32
". Exiting thread.\n",
991 " Preparing message with reduced message header.\n",
1001 " Preparing message with normal message header.\n",
1013 strncpy (header->
domain, this->settings_.write_domain.c_str (),
1014 sizeof (header->
domain) - 1);
1032 header->
updates = uint32_t (filtered_updates.size ());
1038 int max_buffer_size = (int)buffer_remaining;
1041 char * update = header->
write (buffer, buffer_remaining);
1042 uint64_t * message_size = (uint64_t *)buffer;
1067 for (knowledge::KnowledgeMap::const_iterator i = filtered_updates.begin ();
1068 i != filtered_updates.end (); ++i, ++j)
1070 update = i->second.write (update, i->first, buffer_remaining);
1072 if (buffer_remaining > 0)
1076 " update[%d] => encoding %s of type %" PRId32
" and size %" PRIu32
"\n",
1078 j, i->first.c_str (), i->second.type (), i->second.size ());
1084 " unable to encode update[%d] => %s of type %" 1085 PRId32
" and size %" PRIu32
"\n",
1087 j, i->first.c_str (), i->second.type (), i->second.size ());
1093 if (buffer_remaining > 0)
1101 #ifndef _MADARA_NO_KARL_ 1105 " evaluating rules in %s\n",
1113 " rules have been successfully evaluated\n",
1116 #endif // _MADARA_NO_KARL_ 1123 " no permanent rules were set\n",
1130 " calling encode filters\n",
1135 (int)size, max_buffer_size);
1139 " header info before encode: %s\n",
1140 print_prefix, header->
to_string ().c_str ());
This class encapsulates an entry in a KnowledgeBase.
uint32_t max_quality(const KnowledgeRecords &records)
Returns the maximum quality within the records.
MADARA_EXPORT char * add_fragment(const char *originator, uint64_t clock, uint32_t update_number, const char *fragment, uint32_t queue_length, OriginatorFragmentMap &map, bool clear=true)
Adds a fragment to an originator fragment map and returns the aggregate message if the message is com...
madara::expression::ExpressionTree on_data_received_
data received rules, defined in Transport settings
bool is_trusted(const std::string &peer) const
Checks if a peer is trusted.
Base(const std::string &id, TransportSettings &new_settings, knowledge::ThreadSafeContext &context)
Constructor.
OriginatorFragmentMap fragment_map
map of fragments received by originator
void print_status(unsigned int log_level=0, const char *prefix="PacketScheduler")
Prints the number of status of the packet scheduler.
QoSTransportSettings settings_
int filter_decode(unsigned char *source, int size, int max_size) const
Calls decode on the the buffer filter chain.
uint64_t get_bytes_per_second(void)
Queries the monitor for the current bandwidth utilization per second over the past window...
void add_read_domain(const std::string domain)
Adds a read domain to the list of domains to read from.
void invalidate_transport(void)
Invalidates a transport to indicate it is shutting down.
uint32_t quality
priority of the update
bool is_bandwidth_violated(int64_t limit)
Checks send and receive bandwidth against send and receive limits.
void attach(knowledge::ThreadSafeContext *context)
Attaches a context to the various filtering systems.
int validate_transport(void)
Validates a transport to indicate it is not shutting down.
int64_t get_time(void)
Returns a time of day in nanoseconds If simtime feature is enabled, this may be simulation time inste...
int64_t get_send_bandwidth_limit(void) const
Returns the limit for sending on this transport in bytes per second.
knowledge::KnowledgeRecord filter_receive(const madara::knowledge::KnowledgeRecord &input, const std::string &name, transport::TransportContext &context) const
Filters an input according to the receive filter chain.
uint32_t fragment_queue_length
Indicates queue length for holding clock-keyed fragments.
This class stores variables and their values for use by any entity needing state information in a thr...
Provides scheduler for dropping packets.
unsigned char get_rebroadcast_ttl(void) const
Gets the time to live for rebroadcasting (number of rebroadcasts per message).
Holds basic transport settings.
Compiled, optimized KaRL logic.
double get_deadline(void) const
Returns the latency deadline in seconds.
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
Provides context about the transport.
bool add(void)
Adds a message to the monitor.
BandwidthMonitor receive_monitor_
monitor for receiving bandwidth usage
size_t get_number_of_send_aggregate_filters(void) const
Returns the number of aggregate filters applied before sending @ return the number of aggregate filte...
int get_level(void)
Gets the maximum logging detail level.
knowledge::KnowledgeRecord evaluate(CompiledExpression expression, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Evaluate a compiled expression.
size_t num_read_domains(void) const
Returns the number of read domains.
A thread-safe guard for a context or knowledge base.
uint64_t endian_swap(uint64_t value)
Converts a host format uint64_t into big endian.
T * get_ptr(void)
get the underlying pointer
knowledge::KnowledgeRecord filter_rebroadcast(const madara::knowledge::KnowledgeRecord &input, const std::string &name, transport::TransportContext &context) const
Filters an input according to the rebroadcast filter chain.
std::string write_domain
All class members are accessible to users for easy setup.
Parses incoming expression strings into a parse tree and generates an expression tree from the parse ...
::std::map< std::string, KnowledgeRecord > KnowledgeMap
void get_read_domains(std::vector< std::string > &domains) const
Retrieves the list of read domains.
bool send_reduced_message_header
send the reduced message header (clock, size, updates, KaRL id)
void add(uint64_t size)
Adds a message to the monitor.
size_t get_number_of_receive_aggregate_filters(void) const
Returns the number of aggregate filters applied after receiving @ return the number of aggregate filt...
void attach(const QoSTransportSettings *settings)
Attaches settings.
const std::string id_
host:port identifier of this process
bool is_reading_domain(const std::string domain) const
Checks if a domain is in the domain read list.
std::string on_data_received_logic
logic to be evaluated after every successful update
void set_changed(void)
Force a change to be registered, waking up anyone waiting on entry.
std::map< const char *, VariableReference, VariableReferenceMapCompare > VariableReferenceMap
a map of variable references
uint32_t queue_length
Length of the buffer used to store history of events.
Container for quality-of-service settings.
virtual void close(void)
Closes this transport.
MADARA_EXPORT bool exists(const char *originator, uint64_t clock, uint32_t update_number, OriginatorFragmentMap &map)
Checks if a fragment already exists within a fragment map.
int filter_encode(unsigned char *source, int size, int max_size) const
Calls encode on the the buffer filter chain.
int64_t get_total_bandwidth_limit(void) const
Returns the total limit for this transport in bytes per second.
Provides monitoring capability of a transport's bandwidth.
knowledge::KnowledgeRecord filter_send(const madara::knowledge::KnowledgeRecord &input, const std::string &name, transport::TransportContext &context) const
Filters an input according to send's filter chain.
uint64_t get_clock(void) const
Atomically gets the Lamport clock.
PacketScheduler packet_scheduler_
scheduler for dropping packets to simulate network issues
Copyright (c) 2015 Carnegie Mellon University.
madara::utility::ScopedArray< char > buffer_
buffer for sending
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
virtual int setup(void)
all subclasses should call this method at the end of its setup
virtual ~Base()=0
Destructor.
int process_received_update(const char *buffer, uint32_t bytes_read, const std::string &id, knowledge::ThreadSafeContext &context, const QoSTransportSettings &settings, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, knowledge::KnowledgeMap &rebroadcast_records, knowledge::CompiledExpression &on_data_received, const char *print_prefix, const char *remote_host, MessageHeader *&header)
Processes a received update, updates monitors, fills rebroadcast records according to settings filter...
long prep_send(const knowledge::VariableReferenceMap &orig_updates, const char *print_prefix)
Preps a message for sending.
size_t get_number_of_rebroadcast_aggregate_filters(void) const
Returns the number of aggregate filters applied before rebroadcasting @ return the number of aggregat...
int prep_rebroadcast(knowledge::ThreadSafeContext &context, char *buffer, int64_t &buffer_remaining, const QoSTransportSettings &settings, const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records, PacketScheduler &packet_scheduler)
Preps a buffer for rebroadcasting records to other agents on the network.
madara::knowledge::KnowledgeRecord evaluate(const madara::knowledge::KnowledgeUpdateSettings &settings=knowledge::KnowledgeUpdateSettings())
Evaluates the expression tree.
BandwidthMonitor send_monitor_
monitor for sending bandwidth usage
madara::knowledge::ThreadSafeContext & context_
int check_transport(void)
all subclasses should call this method at the beginning of send_data
unsigned char get_participant_ttl(void) const
Returns the maximum time to live participation of this transport in rebroadcasting of other agent's m...
TransportSettings & settings(void)
Getter for the transport settings.