15 : settings_(settings),
18 write_socket_(write_socket),
20 send_monitor_(send_monitor),
21 receive_monitor_(receive_monitor),
22 packet_scheduler_(packet_scheduler)
31 if (!settings_.no_receiving)
34 int rcv_buff_size = 0;
36 int buff_size = settings_.queue_length;
39 size_t opt_len =
sizeof(int);
41 if (settings_.debug_to_kb_prefix !=
"")
43 received_packets_.set_name(
44 settings_.debug_to_kb_prefix +
".received_packets",
knowledge);
45 failed_receives_.set_name(
46 settings_.debug_to_kb_prefix +
".failed_receives",
knowledge);
47 received_data_max_.set_name(
48 settings_.debug_to_kb_prefix +
".received_data_max",
knowledge);
49 received_data_min_.set_name(
50 settings_.debug_to_kb_prefix +
".received_data_min",
knowledge);
51 received_data_.set_name(
52 settings_.debug_to_kb_prefix +
".received_data",
knowledge);
56 if (settings_.queue_length > 0)
57 buffer_ =
new char[settings_.queue_length];
60 "ZMQTransportReadThread::init:"
61 " setting up read socket\n");
65 if (read_socket_ == NULL)
68 "ZMQTransportReadThread::init:"
69 " ERROR: could not create SUB socket\n");
71 "ZMQTransportReadThread::init:"
72 " ERROR: errno = %s\n",
73 zmq_strerror(zmq_errno()));
77 result = zmq_setsockopt(read_socket_, ZMQ_SUBSCRIBE, 0, 0);
82 "ZMQTransportReadThread::init:"
83 " successfully set sockopt for ZMQ_SUBSCRIBE\n");
88 "ZMQTransportReadThread::init:"
89 " ERROR: errno = %s\n",
90 zmq_strerror(zmq_errno()));
95 zmq_setsockopt(read_socket_, ZMQ_LINGER, (
void*)&zero,
sizeof(
int));
98 "ZMQTransportReadThread::init:"
99 " setting rcv buff size to settings.queue_length (%d)\n",
103 zmq_setsockopt(read_socket_, ZMQ_RCVBUF, (
void*)&buff_size, opt_len);
107 result = zmq_getsockopt(
108 read_socket_, ZMQ_RCVBUF, (
void*)&rcv_buff_size, &opt_len);
111 "ZMQTransportReadThread::init:"
112 " successfully set sockopt rcvbuf size to %d. Actual %d allocated\n",
113 buff_size, rcv_buff_size);
118 "ZMQTransportReadThread::init:"
119 " ERROR: errno = %s\n",
120 zmq_strerror(zmq_errno()));
124 zmq_setsockopt(read_socket_, ZMQ_RCVTIMEO, (
void*)&timeout, opt_len);
129 zmq_getsockopt(read_socket_, ZMQ_RCVTIMEO, (
void*)&timeout, &opt_len);
132 "ZMQTransportReadThread::init:"
133 " successfully set rcv timeout to %d\n",
139 "ZMQTransportReadThread::init:"
140 " ERROR: When setting timeout on rcv, errno = %s\n",
141 zmq_strerror(zmq_errno()));
144 if (settings_.hosts.size() >= 1)
156 if (settings_.hosts.size() >= 1)
166 for (; i < settings_.hosts.size(); ++i)
169 zmq_connect(read_socket_, settings_.hosts[i].c_str());
171 if (connect_result == 0)
174 "ZMQTransportReadThread::init:"
175 " successfully connected to %s\n",
176 settings_.hosts[i].c_str());
181 "ZMQTransportReadThread::init:"
182 " ERROR: could not connect to %s\n",
183 settings_.hosts[i].c_str());
185 "ZMQTransportReadThread::init:"
186 " ERROR: errno = %s\n",
187 zmq_strerror(zmq_errno()));
195 if (settings_.on_data_received_logic.length() != 0)
198 "ZMQTransportReadThread::init:"
199 " setting rules to %s\n",
200 settings_.on_data_received_logic.c_str());
202 #ifndef _MADARA_NO_KARL_
204 on_data_received_ =
context_->compile(settings_.on_data_received_logic);
210 "ZMQTransportReadThread::init:"
211 " no permanent rules were set\n");
219 "ZMQTransportReadThread::cleanup:"
220 " starting cleanup\n");
222 if (read_socket_ != 0)
225 "ZMQTransportReadThread::cleanup:"
226 " closing read socket\n");
228 int result = zmq_close(read_socket_);
234 "ZMQTransportReadThread::cleanup:"
235 " ERROR: errno = %s\n",
236 zmq_strerror(zmq_errno()));
241 "ZMQTransportReadThread::cleanup:"
242 " finished cleanup\n");
249 int64_t buffer_remaining = (int64_t)settings_.queue_length;
250 char* buffer = buffer_.get_ptr();
253 if (!settings_.no_sending)
256 print_prefix, header, records, packet_scheduler_);
260 if (settings_.hosts.size() > 0 && result > 0)
263 "ZMQTransportReadThread::send:"
264 " sending %d bytes on socket\n",
268 result = zmq_send(write_socket_, (
void*)buffer_.get_ptr(),
269 (
size_t)result, ZMQ_DONTWAIT);
272 "ZMQTransportReadThread::send:"
273 " sent %d bytes on socket\n",
282 if (!settings_.no_receiving)
285 char* buffer = buffer_.get_ptr();
286 const char* print_prefix =
"ZMQTransportReadThread::run";
287 int64_t buffer_remaining = settings_.queue_length;
288 size_t zmq_buffer_size = buffer_remaining;
292 " entering main service loop.\n",
301 " Unable to allocate buffer of size " PRIu32
". Exiting thread.\n",
302 print_prefix, settings_.queue_length);
309 " entering a recv on the socket.\n",
314 (int64_t)zmq_recv(read_socket_, (
void*)buffer, zmq_buffer_size, 0);
318 " past recv on the socket.\n",
321 if (buffer_remaining > 0)
323 if (settings_.debug_to_kb_prefix !=
"")
325 received_data_ += buffer_remaining;
328 if (received_data_max_ < buffer_remaining)
330 received_data_max_ = buffer_remaining;
332 if (received_data_min_ > buffer_remaining || received_data_min_ == 0)
334 received_data_min_ = buffer_remaining;
342 " processing %d byte update from %s.\n",
343 print_prefix, (
int)buffer_remaining, header->
originator);
346 *
context_, settings_, send_monitor_, receive_monitor_,
348 #ifndef _MADARA_NO_KARL_
355 " done processing %d byte update from %s.\n",
356 print_prefix, (
int)buffer_remaining, header->
originator);
368 " wait timeout on new messages. Proceeding to next wait\n",
371 if (settings_.debug_to_kb_prefix !=
"")
#define madara_logger_log(loggering, level,...)
Fast version of the madara::logger::log method.
const ThreadSafeContext * context_
Parses incoming expression strings into a parse tree and generates an expression tree from the parse ...
This class provides a distributed knowledge base to users.
Provides monitoring capability of a transport's bandwidth.
Provides scheduler for dropping packets.
Holds basic transport settings.
void * get_context()
Retrieves the underlying ZMQ context.
void run(void)
The main loop internals for the read thread.
void init(knowledge::KnowledgeBase &knowledge)
Initializes MADARA context-related items.
void cleanup(void)
Cleanup function called by thread manager.
ZMQTransportReadThread(const TransportSettings &settings, const std::string &id, void *write_socket, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, PacketScheduler &packet_scheduler)
Constructor.
void rebroadcast(const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records)
Sends a rebroadcast packet.
Provides functions and classes for the distributed knowledge base.
::std::map< std::string, KnowledgeRecord > KnowledgeMap
MADARA_EXPORT ZMQContext zmq_context
int process_received_update(const char *buffer, uint32_t bytes_read, const std::string &id, knowledge::ThreadSafeContext &context, const QoSTransportSettings &settings, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, knowledge::KnowledgeMap &rebroadcast_records, knowledge::CompiledExpression &on_data_received, const char *print_prefix, const char *remote_host, MessageHeader *&header)
Processes a received update, updates monitors, fills rebroadcast records according to settings filter...
int prep_rebroadcast(knowledge::ThreadSafeContext &context, char *buffer, int64_t &buffer_remaining, const QoSTransportSettings &settings, const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records, PacketScheduler &packet_scheduler)
Preps a buffer for rebroadcasting records to other agents on the network.
MADARA_EXPORT bool begins_with(const std::string &input, const std::string &prefix)
Check if input contains prefix at the beginning.