13 : transport_(transport)
28 "UdpTransportReadThread::init:"
29 " UdpTransportReadThread started with queue length %d\n",
38 "UdpTransportReadThread::init:"
39 " setting rules to %s\n",
42 #ifndef _MADARA_NO_KARL_
50 "UdpTransportReadThread::init:"
51 " no permanent rules were set\n");
79 int64_t buffer_remaining = (int64_t)settings_.
queue_length;
83 if (!settings_.
no_sending && records.size () > 0)
90 uint64_t clock = records.begin()->second.clock;
121 " Send bandwidth = %" PRIu64
" B/s\n",
138 static const char print_prefix[] =
"UdpTransportReadThread::run";
142 " entering main service loop.\n",
149 " Unable to allocate buffer of size %" PRIu32
". Exiting thread.\n",
156 "%s: entering a recv on the socket.\n", print_prefix);
158 udp::endpoint remote;
159 boost::system::error_code err;
161 asio::buffer((
void*)buffer, settings_.
queue_length), remote,
162 udp::socket::message_flags{}, err);
164 if (err == asio::error::would_block || bytes_read == 0)
167 "%s: no bytes to read. Proceeding to next wait\n", print_prefix);
179 "%s: unexpected error: %s. Proceeding to next wait\n", print_prefix,
180 err.message().c_str());
205 if (remote.address().to_string() !=
"")
209 " received a message header of %lld bytes from %s:%d\n",
210 print_prefix, (
long long)bytes_read,
211 remote.address().to_string().c_str(), (
int)remote.port());
217 " received %lld bytes from unknown host\n",
218 print_prefix, (
long long)bytes_read);
223 std::stringstream remote_host;
224 remote_host << remote.address().
to_string();
226 remote_host << remote.port();
233 #ifndef _MADARA_NO_KARL_
236 print_prefix, remote_host.str().c_str(), header);
240 if (header->
ttl > 0 && rebroadcast_records.size() > 0 &&
246 rebroadcast(print_prefix, header, rebroadcast_records);
255 " finished iteration.\n",
#define madara_logger_ptr_log(loggering, level,...)
Fast version of the madara::logger::log method for Logger pointers.
#define madara_logger_log(loggering, level,...)
Fast version of the madara::logger::log method.
Parses incoming expression strings into a parse tree and generates an expression tree from the parse ...
This class provides a distributed knowledge base to users.
void use(ThreadSafeContext &original)
Refer to and use another knowledge base's context.
CompiledExpression compile(const std::string &expression)
Compiles a KaRL expression into an expression tree.
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
void set_name(const std::string &var_name, KnowledgeBase &knowledge)
Sets the variable name that this refers to.
uint64_t get_bytes_per_second(void)
Queries the monitor for the current bandwidth utilization per second over the past window.
void add(uint64_t size)
Adds a message to the monitor.
QoSTransportSettings settings_
const std::string id_
host:port identifier of this process
BandwidthMonitor send_monitor_
monitor for sending bandwidth usage
PacketScheduler packet_scheduler_
scheduler for dropping packets to simulate network issues
BandwidthMonitor receive_monitor_
monitor for receiving bandwidth usage
udp::socket socket_
underlying socket
Container for quality-of-service settings.
unsigned char get_participant_ttl(void) const
Returns the maximum time to live participation of this transport in rebroadcasting of other agent's m...
bool no_receiving
if true, never receive over transport
std::string on_data_received_logic
Logic to be evaluated after every successful update.
uint32_t queue_length
Length of the buffer used to store history of events.
std::string debug_to_kb_prefix
if not empty, save debug information to knowledge base at prefix
bool no_sending
if true, never send over transport
knowledge::containers::Integer received_data_
received data
knowledge::containers::Integer received_packets_
received packets
void cleanup(void) override
Cleanup function called by thread manager.
madara::knowledge::CompiledExpression on_data_received_
data received rules, defined in Transport settings
madara::utility::ScopedArray< char > buffer_
buffer for receiving
UdpTransportReadThread(UdpTransport &transport)
knowledge::containers::Integer received_data_max_
max data received
knowledge::containers::Integer received_data_min_
min data received
void init(knowledge::KnowledgeBase &knowledge) override
Initializes MADARA context-related items.
void run(void) override
The main loop internals for the read thread.
knowledge::containers::Integer failed_receives_
bad receives
knowledge::ThreadSafeContext * context_
void rebroadcast(const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records)
Sends a rebroadcast packet.
UdpTransport & transport_
UDP-based transport for knowledge.
knowledge::containers::Integer sent_packets
sent packets
knowledge::containers::Integer sent_data_min
min data sent
knowledge::containers::Integer sent_data
sent data
long send_message(const char *buf, size_t size, uint64_t clock)
knowledge::containers::Integer failed_sends
failed sends
knowledge::containers::Integer sent_data_max
max data sent
T * get_ptr(void)
get the underlying pointer
Provides functions and classes for the distributed knowledge base.
T get(const KnowledgeRecord &kr)
Get the value of a KnowlegeRecord.
::std::map< std::string, KnowledgeRecord > KnowledgeMap
MADARA_EXPORT utility::Refcounter< logger::Logger > global_logger
int process_received_update(const char *buffer, uint32_t bytes_read, const std::string &id, knowledge::ThreadSafeContext &context, const QoSTransportSettings &settings, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, knowledge::KnowledgeMap &rebroadcast_records, knowledge::CompiledExpression &on_data_received, const char *print_prefix, const char *remote_host, MessageHeader *&header)
Processes a received update, updates monitors, fills rebroadcast records according to settings filter...
int prep_rebroadcast(knowledge::ThreadSafeContext &context, char *buffer, int64_t &buffer_remaining, const QoSTransportSettings &settings, const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records, PacketScheduler &packet_scheduler)
Preps a buffer for rebroadcasting records to other agents on the network.
Copyright(c) 2020 Galois.