16 shutting_down_(false),
18 settings_(new_settings),
21 #ifndef _MADARA_NO_KARL_
23 on_data_received_(context.get_logger())
38 "transport::Base::setup"
39 " setting rules to %s\n",
42 #ifndef _MADARA_NO_KARL_
51 "transport::Base::setup"
52 " no permanent rules were set\n");
63 "transport::Base::setup"
64 " no read domains set. Adding write domain (%s)\n",
72 "transport::Base::setup"
73 " settings configured with %d read domains\n",
80 std::vector<std::string> domains;
83 std::stringstream buffer;
85 for(
unsigned int i = 0; i < domains.size(); ++i)
89 if(i != domains.size() - 1)
96 "transport::Base::setup"
97 " Write domain: %s. Read domains: %s\n",
114 #ifndef _MADARA_NO_KARL_
118 const char* print_prefix,
const char* remote_host,
MessageHeader*& header)
123 int max_buffer_size = (int)bytes_read;
126 receive_monitor.
add(bytes_read);
130 " Receive bandwidth = %" PRIu64
" B/s\n",
133 bool is_reduced =
false;
134 bool is_fragment =
false;
138 " calling decode filters on %" PRIu32
" bytes\n",
139 print_prefix, bytes_read);
143 (
char*)buffer, max_buffer_size, max_buffer_size);
147 " Decoding resulted in %" PRIu32
" final bytes\n",
148 print_prefix, bytes_read);
151 int64_t buffer_remaining = (int64_t)bytes_read;
154 rebroadcast_records.clear();
160 std::map<std::string, std::vector<knowledge::KnowledgeRecord>> past_updates;
168 " processing reduced KaRL message from %s\n",
169 print_prefix, remote_host);
179 " %s: processing KaRL message from %s\n",
180 print_prefix,
id.c_str(), remote_host);
189 " processing KaRL fragment message from %s\n",
190 print_prefix, remote_host);
203 " dropping non-KaRL message with id %s from %s\n",
204 print_prefix, identifier, remote_host);
212 " dropping too short message from %s (length %i)\n",
213 print_prefix, remote_host, bytes_read);
218 const char* update = header->
read(buffer, buffer_remaining);
222 " header info: %s\n",
223 print_prefix, header->
to_string().c_str());
225 if(header->
size < bytes_read)
229 " Message header.size (%" PRIu64
" bytes) is less than actual"
230 " bytes read (%" PRIu32
" bytes). Dropping message.\n",
231 print_prefix, header->
size, bytes_read);
242 " Fragment already exists in fragment map. Dropping.\n",
255 " %s: dropping message from ourself\n",
256 print_prefix,
id.c_str());
264 " remote id (%s) is not our own\n",
265 print_prefix, remote_host);
271 "%s: remote id (%s) is trusted\n", print_prefix, remote_host);
277 " dropping message from untrusted peer (%s\n",
278 print_prefix, remote_host);
290 " originator (%s) is trusted\n",
291 print_prefix, originator.c_str());
297 " dropping message from untrusted originator (%s)\n",
298 print_prefix, originator.c_str());
308 " remote id (%s) has an untrusted domain (%s). Dropping message.\n",
309 print_prefix, remote_host, header->
domain);
318 " remote id (%s) message is in our domain\n",
319 print_prefix, remote_host);
331 uint64_t total_size = 0;
335 " Processing fragment %" PRIu32
" of %s:%" PRIu64
".\n",
345 if(!message || total_size == 0)
351 char* buffer_override = (
char*)buffer;
352 memcpy(buffer_override, message, total_size);
360 if (decode_result <= 0)
364 " ERROR: Unable to decode fragments. Likely incorrect filters.\n",
373 " Message has been pieced together from fragments. Processing...\n",
381 buffer_remaining = (int64_t)decode_result;
392 " processing reduced KaRL message from %s\n",
393 print_prefix, remote_host);
397 update = header->
read(buffer, buffer_remaining);
403 " processing KaRL message from %s\n",
404 print_prefix, remote_host);
407 update = header->
read(buffer, buffer_remaining);
413 " ERROR: defrag resulted in unknown message header.\n",
421 " past fragment header create.\n",
428 int actual_updates = 0;
434 " create transport_context with: "
435 "originator(%s), domain(%s), remote(%s), time(%" PRIu64
").\n",
446 " past transport_context create.\n",
455 latency = current_time - header->
timestamp;
457 if(latency > deadline)
461 " deadline violation (latency is %" PRIu64
", deadline is %f).\n",
462 print_prefix, latency, deadline);
471 " Cannot compute message latency."
472 " Message header timestamp is in the future."
473 " message.timestamp = %" PRIu64
", cur_timestamp = %" PRIu64
".\n",
474 print_prefix, header->
timestamp, current_time);
480 " iterating over the %" PRIu32
" updates\n",
481 print_prefix, header->
updates);
489 bool dropped =
false;
496 " Send monitor has detected violation of bandwidth limit."
497 " Dropping packet from rebroadcast list\n",
506 " Receive monitor has detected violation of bandwidth limit."
507 " Dropping packet from rebroadcast list...\n",
515 " Transport participant TTL is lower than header ttl."
516 " Dropping packet from rebroadcast list...\n",
522 " Applying %" PRIu32
" updates\n",
523 print_prefix, header->
updates);
525 const auto add_record = [&](
const std::string& key,
527 auto& entry = updates[key];
534 past_updates[key].emplace_back(std::move(rec));
538 entry = std::move(rec);
543 for(uint32_t i = 0; i < header->
updates; ++i)
546 update = record.
read(update, key, buffer_remaining);
548 if(buffer_remaining < 0)
552 " unable to process message. Buffer remaining is negative."
553 " Server is likely being targeted by custom KaRL tools.\n",
563 " Applying receive filter to %s (clk %i, qual %i) = %s\n",
564 print_prefix, key.c_str(), record.
clock, record.
quality,
573 " Filter results for %s were %s\n",
574 print_prefix, key.c_str(), record.
to_string().c_str());
576 add_record(key, record);
582 " Filter resulted in dropping %s\n",
583 print_prefix, key.c_str());
590 if(additionals.size() > 0)
594 " %lld additional records being handled after receive.\n",
595 print_prefix, (
long long)additionals.size());
597 for(knowledge::KnowledgeMap::const_iterator i = additionals.begin();
598 i != additionals.end(); ++i)
600 add_record(i->first, i->second);
618 " Applying aggregate receive filters.\n",
627 " No aggregate receive filters were applied...\n",
633 " Locking the context to apply updates.\n",
641 " Applying updates to context.\n",
646 for(knowledge::KnowledgeMap::iterator i = updates.begin();
647 i != updates.end(); ++i)
653 result = record.
apply(
654 context, i->first, header->
quality, header->
clock,
false);
661 " update %s=%s was rejected\n",
662 print_prefix, key.c_str(), record.
to_string().c_str());
668 " update %s=%s was accepted\n",
669 print_prefix, key.c_str(), record.
to_string().c_str());
673 auto iter = past_updates.find(i->first);
674 if(iter != past_updates.end())
676 for(
auto& cur : iter->second)
697 " Applying rebroadcast filters to receive results.\n",
701 for(knowledge::KnowledgeMap::iterator i = updates.begin();
702 i != updates.end(); ++i)
707 if(i->second.exists())
709 if(i->second.to_string() !=
"")
713 " Filter results for key %s were %s\n",
714 print_prefix, i->first.c_str(), i->second.to_string().c_str());
716 rebroadcast_records[i->first] = i->second;
722 " Filter resulted in dropping %s\n",
723 print_prefix, i->first.c_str());
730 for(knowledge::KnowledgeMap::const_iterator i = additionals.begin();
731 i != additionals.end(); ++i)
733 rebroadcast_records[i->first] = i->second;
738 " Applying aggregate rebroadcast filters to %d records.\n",
739 print_prefix, rebroadcast_records.size());
743 rebroadcast_records.size() > 0)
751 " No aggregate rebroadcast filters were applied...\n",
757 " Returning to caller with %d rebroadcast records.\n",
758 print_prefix, rebroadcast_records.size());
764 " Rebroadcast packet was dropped...\n",
771 #ifndef _MADARA_NO_KARL_
774 " evaluating rules in %s\n",
784 " no permanent rules were set\n",
788 return actual_updates;
798 if(header->
ttl > 0 && records.size() > 0 && packet_scheduler.
add())
801 uint64_t* message_size = (uint64_t*)buffer;
802 int max_buffer_size = (int)buffer_remaining;
805 header->
updates = uint32_t(records.size());
808 char* update = header->
write(buffer, buffer_remaining);
810 for(knowledge::KnowledgeMap::const_iterator i = records.begin();
811 i != records.end(); ++i)
813 update = i->second.write(update, i->first, buffer_remaining);
816 if(buffer_remaining > 0)
818 int size = (int)(settings.
queue_length - buffer_remaining);
823 " %" PRIu64
" bytes prepped for rebroadcast packet\n",
830 " calling encode filters\n",
839 " Not enough buffer for rebroadcasting packet\n",
849 " No rebroadcast necessary.\n",
861 const char* print_prefix)
868 "%s: transport has been told to shutdown", print_prefix);
875 "%s: transport is not valid", print_prefix);
882 uint64_t latest_toi = 0;
883 bool reduced =
false;
889 " Applying filters to %zu updates before sending...\n",
890 print_prefix, orig_updates.size());
897 bool dropped =
false;
904 " Send monitor has detected violation of bandwidth limit."
905 " Dropping packet...\n",
914 " Receive monitor has detected violation of bandwidth limit."
915 " Dropping packet...\n",
927 for(
auto e : orig_updates)
931 " Calling filter chain of %s.\n",
932 print_prefix, e.first.c_str());
934 const auto record = e.second;
936 if(record.toi() > latest_toi)
938 latest_toi = record.toi();
947 " Filter returned for %s.\n",
948 print_prefix, e.first.c_str());
951 if(result.
exists() || !record.exists())
955 " Adding record to update list.\n",
958 filtered_updates.emplace(std::make_pair(e.first, result));
964 " Filter removed record from update list.\n",
971 " Through individual record filters. Proceeding to add update "
978 for(knowledge::KnowledgeMap::const_iterator i = additionals.begin();
979 i != additionals.end(); ++i)
983 " Filter added a record %s to the update list.\n",
984 print_prefix, i->first.c_str());
985 filtered_updates.emplace(std::make_pair(i->first, i->second));
990 for(
auto e : orig_updates)
992 const auto record = e.second;
994 if(record.toi() > latest_toi)
996 latest_toi = record.toi();
1001 " Adding record %s to update list.\n",
1002 print_prefix, e.first.c_str());
1004 filtered_updates.emplace(std::make_pair(e.first, record));
1026 " Packet scheduler has dropped packet...\n",
1034 " Applying %d aggregate update send filters to %d updates...\n",
1036 (
int)filtered_updates.size());
1040 filtered_updates.size() > 0)
1048 " No aggregate send filters were applied...\n",
1056 " Finished applying filters before sending...\n",
1059 if(filtered_updates.size() == 0)
1063 " Filters removed all data. Nothing to send.\n",
1077 " Unable to allocate buffer of size %" PRIu32
". Exiting thread.\n",
1090 " Preparing message with reduced message header.\n",
1100 " Preparing message with normal message header.\n",
1130 header->
updates = uint32_t(filtered_updates.size());
1136 int max_buffer_size = (int)buffer_remaining;
1139 char* update = header->
write(buffer, buffer_remaining);
1140 uint64_t* message_size = (uint64_t*)buffer;
1141 uint32_t* message_updates = (uint32_t*)(buffer + 116);
1166 uint32_t actual_updates = 0;
1167 for(knowledge::KnowledgeMap::const_iterator i = filtered_updates.begin();
1168 i != filtered_updates.end(); ++i)
1170 const auto& key = i->first;
1171 const auto& rec = i->second;
1177 " update[%d] => value is empty\n",
1178 print_prefix, j, key.c_str());
1182 update = rec.write(update, key, buffer_remaining);
1184 if(buffer_remaining > 0)
1188 " update[%d] => encoding %s of type %" PRId32
" and size %" PRIu32
1190 print_prefix, j, key.c_str(), rec.type(), rec.size(), rec.toi());
1198 " unable to encode update[%d] => %s of type %" PRId32
1199 " and size %" PRIu32
"\n",
1200 print_prefix, j, key.c_str(), rec.type(), rec.size());
1210 auto buf = rec.share_circular_buffer();
1211 auto end = buf->end();
1212 auto cur = buf->begin();
1218 return lhs < rhs.
toi();
1221 for(; cur != end; ++cur)
1230 if(buffer_remaining > 0)
1233 header->
size = size;
1235 header->
updates = actual_updates;
1241 #ifndef _MADARA_NO_KARL_
1245 " evaluating rules in %s\n",
1252 " rules have been successfully evaluated\n",
1261 " no permanent rules were set\n",
1268 " calling encode filters\n",
1277 " header info before encode: %s\n",
1278 print_prefix, header->
to_string().c_str());
#define madara_logger_log(loggering, level,...)
Fast version of the madara::logger::log method.
const ThreadSafeContext * context_
madara::knowledge::KnowledgeRecord evaluate(const madara::knowledge::KnowledgeUpdateSettings &settings=knowledge::KnowledgeUpdateSettings())
Evaluates the expression tree.
Parses incoming expression strings into a parse tree and generates an expression tree from the parse ...
ExpressionTree interpret(madara::knowledge::ThreadSafeContext &context, const std::string &input)
Compiles an expression into an expression tree.
Compiled, optimized KaRL logic.
A thread-safe guard for a context or knowledge base.
This class encapsulates an entry in a KnowledgeBase.
std::string to_string(const std::string &delimiter=", ") const
converts the value to a string.
uint64_t clock
last modification lamport clock time
void set_toi(uint64_t new_toi)
int apply(madara::knowledge::ThreadSafeContext &context, const std::string &key, unsigned int quality, uint64_t clock, bool perform_lock)
Apply the knowledge record to a context, given some quality and clock.
bool exists(void) const
Checks if record exists (i.e., is not uncreated)
uint32_t quality
priority of the update
const char * read(const char *buffer, int64_t &buffer_remaining)
Reads a KnowledgeRecord instance from a buffer and updates the amount of buffer room remaining.
This class stores variables and their values for use by any entity needing state information in a thr...
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
knowledge::KnowledgeRecord evaluate(CompiledExpression expression, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Evaluate a compiled expression.
void set_changed(void)
Force a change to be registered, waking up anyone waiting on entry.
uint64_t get_clock(void) const
Atomically gets the Lamport clock.
int get_level(void)
Gets the maximum logging detail level.
Provides monitoring capability of a transport's bandwidth.
uint64_t get_bytes_per_second(void)
Queries the monitor for the current bandwidth utilization per second over the past window.
bool is_bandwidth_violated(int64_t limit)
Checks send and receive bandwidth against send and receive limits.
void add(uint64_t size)
Adds a message to the monitor.
int validate_transport(void)
Validates a transport to indicate it is not shutting down.
madara::utility::ScopedArray< char > buffer_
buffer for sending
QoSTransportSettings settings_
madara::knowledge::ThreadSafeContext & context_
int check_transport(void)
all subclasses should call this method at the beginning of send_data
uint64_t last_toi_sent_
Latest TOI the previous send operation included.
const std::string id_
host:port identifier of this process
virtual int setup(void)
all subclasses should call this method at the end of its setup
BandwidthMonitor send_monitor_
monitor for sending bandwidth usage
PacketScheduler packet_scheduler_
scheduler for dropping packets to simulate network issues
Base(const std::string &id, TransportSettings &new_settings, knowledge::ThreadSafeContext &context)
Constructor.
long prep_send(const knowledge::KnowledgeMap &orig_updates, const char *print_prefix)
Preps a message for sending.
BandwidthMonitor receive_monitor_
monitor for receiving bandwidth usage
virtual ~Base()=0
Destructor.
virtual void close(void)
Closes this transport.
void invalidate_transport(void)
Invalidates a transport to indicate it is shutting down.
madara::expression::ExpressionTree on_data_received_
data received rules, defined in Transport settings
Provides scheduler for dropping packets.
void print_status(unsigned int log_level=0, const char *prefix="PacketScheduler")
Prints the number of status of the packet scheduler.
bool add(void)
Adds a message to the monitor.
void attach(const QoSTransportSettings *settings)
Attaches settings.
Container for quality-of-service settings.
size_t get_number_of_send_filtered_types(void) const
Returns the number of types that are filtered before send.
knowledge::KnowledgeRecord filter_rebroadcast(const madara::knowledge::KnowledgeRecord &input, const std::string &name, transport::TransportContext &context) const
Filters an input according to the rebroadcast filter chain.
size_t get_number_of_receive_aggregate_filters(void) const
Returns the number of aggregate filters applied after receiving @ return the number of aggregate filt...
int filter_encode(char *source, int size, int max_size) const
Calls encode on the the buffer filter chain.
knowledge::KnowledgeRecord filter_receive(const madara::knowledge::KnowledgeRecord &input, const std::string &name, transport::TransportContext &context) const
Filters an input according to the receive filter chain.
unsigned char get_participant_ttl(void) const
Returns the maximum time to live participation of this transport in rebroadcasting of other agent's m...
double get_deadline(void) const
Returns the latency deadline in seconds.
size_t get_number_of_send_aggregate_filters(void) const
Returns the number of aggregate filters applied before sending @ return the number of aggregate filte...
void attach(knowledge::ThreadSafeContext *context)
Attaches a context to the various filtering systems.
knowledge::KnowledgeRecord filter_send(const madara::knowledge::KnowledgeRecord &input, const std::string &name, transport::TransportContext &context) const
Filters an input according to send's filter chain.
unsigned char get_rebroadcast_ttl(void) const
Gets the time to live for rebroadcasting (number of rebroadcasts per message).
bool is_trusted(const std::string &peer) const
Checks if a peer is trusted.
int filter_decode(char *source, int size, int max_size) const
Calls decode on the the buffer filter chain.
int64_t get_send_bandwidth_limit(void) const
Returns the limit for sending on this transport in bytes per second.
int64_t get_total_bandwidth_limit(void) const
Returns the total limit for this transport in bytes per second.
size_t get_number_of_rebroadcast_aggregate_filters(void) const
Returns the number of aggregate filters applied before rebroadcasting @ return the number of aggregat...
Provides context about the transport.
@ REBROADCASTING_OPERATION
void set_operation(int64_t operation)
Sets the operation that the context is/should be performing.
void clear_records(void)
Clears records added through filtering operations.
const knowledge::KnowledgeMap & get_records(void) const
Returns the additional records stored in the context.
Holds basic transport settings.
uint32_t fragment_queue_length
Indicates queue length for holding clock-keyed fragments.
OriginatorFragmentMap fragment_map
Map of fragments received by originator.
void add_read_domain(const std::string &domain)
Adds a read domain to the list of domains to read from.
std::string on_data_received_logic
Logic to be evaluated after every successful update.
uint32_t queue_length
Length of the buffer used to store history of events.
bool send_history
if true, send all updates since last send, for records that have history enabled (if the history capa...
void get_read_domains(std::vector< std::string > &domains) const
Retrieves the list of read domains.
std::string write_domain
All class members are accessible to users for easy setup.
bool is_reading_domain(const std::string domain) const
Checks if a domain is in the domain read list.
bool send_reduced_message_header
Send a reduced message header (clock, size, updates, KaRL id)
size_t num_read_domains(void) const
Returns the number of read domains.
T * get_ptr(void)
get the underlying pointer
uint32_t max_quality(const KnowledgeRecords &records)
Returns the maximum quality within the records.
::std::map< std::string, KnowledgeRecord > KnowledgeMap
MADARA_EXPORT char * add_fragment(const char *originator, uint64_t clock, uint32_t update_number, const char *fragment, uint32_t queue_length, OriginatorFragmentMap &map, uint64_t &total_size, bool clear=true)
Adds a fragment to an originator fragment map and returns the aggregate message if the message is com...
int process_received_update(const char *buffer, uint32_t bytes_read, const std::string &id, knowledge::ThreadSafeContext &context, const QoSTransportSettings &settings, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, knowledge::KnowledgeMap &rebroadcast_records, knowledge::CompiledExpression &on_data_received, const char *print_prefix, const char *remote_host, MessageHeader *&header)
Processes a received update, updates monitors, fills rebroadcast records according to settings filter...
int prep_rebroadcast(knowledge::ThreadSafeContext &context, char *buffer, int64_t &buffer_remaining, const QoSTransportSettings &settings, const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records, PacketScheduler &packet_scheduler)
Preps a buffer for rebroadcasting records to other agents on the network.
MADARA_EXPORT bool exists(const char *originator, uint64_t clock, uint32_t update_number, OriginatorFragmentMap &map)
Checks if a fragment already exists within a fragment map.
uint64_t endian_swap(uint64_t value)
Converts a host format uint64_t into big endian.
int64_t get_time(void)
Returns a time of day in nanoseconds If simtime feature is enabled, this may be simulation time inste...
MADARA_EXPORT void strncpy_safe(char *dst, const char *src, size_t dst_size)
Performs a strncpy in a way that will compile without warnings.
Copyright(c) 2020 Galois.