19 : settings_ (settings), id_ (id), context_ (0),
20 write_socket_ (write_socket),
22 send_monitor_ (send_monitor),
23 receive_monitor_ (receive_monitor),
24 packet_scheduler_ (packet_scheduler)
37 int rcv_buff_size = 0;
40 size_t opt_len =
sizeof (int);
47 "ZMQTransportReadThread::init:" \
48 " setting up read socket\n");
53 zmq_setsockopt (read_socket_, ZMQ_SUBSCRIBE, 0, 0);
56 "ZMQTransportReadThread::init:" \
57 " setting rcv buff size to settings.queue_length (%d)\n",
60 int result = zmq_setsockopt (
61 read_socket_, ZMQ_RCVBUF, (
void *)&buff_size, opt_len);
65 result = zmq_getsockopt (
66 read_socket_, ZMQ_RCVBUF, (
void *)&rcv_buff_size, &opt_len);
69 "ZMQTransportReadThread::init:" \
70 " successfully set sockopt rcvbuf size to %d. Actual %d allocated\n",
71 buff_size, rcv_buff_size);
76 "ZMQTransportReadThread::init:" \
77 " ERROR: errno = %s\n",
78 zmq_strerror (zmq_errno ()));
82 result = zmq_setsockopt (
83 read_socket_, ZMQ_RCVTIMEO, (
void *)&timeout, opt_len);
87 result = zmq_getsockopt (
88 read_socket_, ZMQ_RCVTIMEO, (
void *)&timeout, &opt_len);
91 "ZMQTransportReadThread::init:" \
92 " successfully set rcv timeout to %d\n",
98 "ZMQTransportReadThread::init:" \
99 " ERROR: When setting timeout on rcv, errno = %s\n",
100 zmq_strerror (zmq_errno ()));
129 int connect_result = zmq_connect (
132 if (connect_result == 0)
135 "ZMQTransportReadThread::init:" \
136 " successfully connected to %s\n",
142 "ZMQTransportReadThread::init:" \
143 " ERROR: could not connect to %s\n",
146 "ZMQTransportReadThread::init:" \
147 " ERROR: errno = %s\n",
148 zmq_strerror (zmq_errno ()));
159 "ZMQTransportReadThread::init:" \
160 " setting rules to %s\n",
164 #ifndef _MADARA_NO_KARL_ 167 #endif // _MADARA_NO_KARL_ 172 "ZMQTransportReadThread::init:" \
173 " no permanent rules were set\n");
182 "ZMQTransportReadThread::cleanup:" \
183 " starting cleanup\n");
188 "ZMQTransportReadThread::cleanup:" \
189 " closing read socket\n");
193 zmq_setsockopt (
read_socket_, ZMQ_LINGER, (
void *)&option,
sizeof (
int));
201 "ZMQTransportReadThread::cleanup:" \
202 " finished cleanup\n");
208 const char * print_prefix,
228 "ZMQTransportReadThread::send:" \
229 " sending %d bytes on socket\n", result);
236 "ZMQTransportReadThread::send:" \
237 " sent %d bytes on socket\n", result);
250 const char * print_prefix =
"ZMQTransportReadThread::run";
252 size_t zmq_buffer_size = buffer_remaining;
256 " entering main service loop.\n",
265 " Unable to allocate buffer of size " PRIu32
". Exiting thread.\n",
274 " entering a recv on the socket.\n",
278 buffer_remaining = (int64_t)zmq_recv (
283 " past recv on the socket.\n",
286 if (buffer_remaining > 0)
292 " processing %d byte update from %s.\n",
293 print_prefix, (int)buffer_remaining, header->
originator);
297 #ifndef _MADARA_NO_KARL_
305 " done processing %d byte update from %s.\n",
306 print_prefix, (int)buffer_remaining, header->
originator);
318 " wait timeout on new messages. Proceeding to next wait\n",
const QoSTransportSettings settings_
quality-of-service transport settings
void init(knowledge::KnowledgeBase &knowledge)
Initializes MADARA context-related items.
const std::string id_
host:port identifier of this process
BandwidthMonitor & send_monitor_
monitor for sending bandwidth usage
madara::knowledge::CompiledExpression on_data_received_
data received rules, defined in Transport settings
void * read_socket_
The multicast socket we are reading from.
knowledge::ThreadSafeContext * context_
knowledge context
MADARA_EXPORT ZMQContext zmq_context
void * write_socket_
underlying socket for sending
Provides scheduler for dropping packets.
std::vector< std::string > hosts
Host information for transports that require it.
BandwidthMonitor & receive_monitor_
monitor for receiving bandwidth usage
void cleanup(void)
Cleanup function called by thread manager.
Holds basic transport settings.
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
ZMQTransportReadThread(const TransportSettings &settings, const std::string &id, void *write_socket, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, PacketScheduler &packet_scheduler)
Constructor.
double sleep(double sleep_time)
Sleeps for a certain amount of time.
T * get_ptr(void)
get the underlying pointer
bool no_receiving
if true, never receive over transport
This class provides a distributed knowledge base to users.
Parses incoming expression strings into a parse tree and generates an expression tree from the parse ...
::std::map< std::string, KnowledgeRecord > KnowledgeMap
bool no_sending
if true, never send over transport
CompiledExpression compile(const std::string &expression)
Compiles a KaRL expression into an expression tree.
std::string on_data_received_logic
logic to be evaluated after every successful update
uint32_t queue_length
Length of the buffer used to store history of events.
ThreadSafeContext & get_context(void)
Returns the ThreadSafeContext associated with this Knowledge Base.
Provides monitoring capability of a transport's bandwidth.
void * get_context()
Retrieves the underlying ZMQ context.
Provides functions and classes for the distributed knowledge base.
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
PacketScheduler & packet_scheduler_
scheduler for mimicking target network conditions
MADARA_EXPORT bool begins_with(const std::string &input, const std::string &prefix)
Check if input contains prefix at the beginning.
void rebroadcast(const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records)
Sends a rebroadcast packet.
int process_received_update(const char *buffer, uint32_t bytes_read, const std::string &id, knowledge::ThreadSafeContext &context, const QoSTransportSettings &settings, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, knowledge::KnowledgeMap &rebroadcast_records, knowledge::CompiledExpression &on_data_received, const char *print_prefix, const char *remote_host, MessageHeader *&header)
Processes a received update, updates monitors, fills rebroadcast records according to settings filter...
void run(void)
The main loop internals for the read thread.
int prep_rebroadcast(knowledge::ThreadSafeContext &context, char *buffer, int64_t &buffer_remaining, const QoSTransportSettings &settings, const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records, PacketScheduler &packet_scheduler)
Preps a buffer for rebroadcasting records to other agents on the network.
madara::utility::ScopedArray< char > buffer_
buffer for receiving