MADARA  3.4.1
ZMQTransportReadThread.cpp
Go to the documentation of this file.
2 
6 #include "ZMQContext.h"
7 
8 #include <iostream>
9 #include <algorithm>
10 
12  const TransportSettings& settings, const std::string& id,
13  void* write_socket, BandwidthMonitor& send_monitor,
14  BandwidthMonitor& receive_monitor, PacketScheduler& packet_scheduler)
15  : settings_(settings),
16  id_(id),
17  context_(0),
18  write_socket_(write_socket),
19  read_socket_(0),
20  send_monitor_(send_monitor),
21  receive_monitor_(receive_monitor),
22  packet_scheduler_(packet_scheduler)
23 {
24 }
25 
28 {
29  context_ = &(knowledge.get_context());
30 
31  if (!settings_.no_receiving)
32  {
33  // int send_buff_size = 0;
34  int rcv_buff_size = 0;
35  int timeout = 1000;
36  int buff_size = settings_.queue_length;
37  int result;
38  int zero = 0;
39  size_t opt_len = sizeof(int);
40 
41  if (settings_.debug_to_kb_prefix != "")
42  {
43  received_packets_.set_name(
44  settings_.debug_to_kb_prefix + ".received_packets", knowledge);
45  failed_receives_.set_name(
46  settings_.debug_to_kb_prefix + ".failed_receives", knowledge);
47  received_data_max_.set_name(
48  settings_.debug_to_kb_prefix + ".received_data_max", knowledge);
49  received_data_min_.set_name(
50  settings_.debug_to_kb_prefix + ".received_data_min", knowledge);
51  received_data_.set_name(
52  settings_.debug_to_kb_prefix + ".received_data", knowledge);
53  }
54 
55  // setup the receive buffer
56  if (settings_.queue_length > 0)
57  buffer_ = new char[settings_.queue_length];
58 
60  "ZMQTransportReadThread::init:"
61  " setting up read socket\n");
62 
63  read_socket_ = zmq_socket(zmq_context.get_context(), ZMQ_SUB);
64 
65  if (read_socket_ == NULL)
66  {
68  "ZMQTransportReadThread::init:"
69  " ERROR: could not create SUB socket\n");
71  "ZMQTransportReadThread::init:"
72  " ERROR: errno = %s\n",
73  zmq_strerror(zmq_errno()));
74  }
75 
76  // subscribe to all messages
77  result = zmq_setsockopt(read_socket_, ZMQ_SUBSCRIBE, 0, 0);
78 
79  if (result == 0)
80  {
82  "ZMQTransportReadThread::init:"
83  " successfully set sockopt for ZMQ_SUBSCRIBE\n");
84  }
85  else
86  {
88  "ZMQTransportReadThread::init:"
89  " ERROR: errno = %s\n",
90  zmq_strerror(zmq_errno()));
91  }
92 
93  // if you don't do this, ZMQ waits forever for no reason. Super smart.
94  result =
95  zmq_setsockopt(read_socket_, ZMQ_LINGER, (void*)&zero, sizeof(int));
96 
98  "ZMQTransportReadThread::init:"
99  " setting rcv buff size to settings.queue_length (%d)\n",
100  buff_size);
101 
102  result =
103  zmq_setsockopt(read_socket_, ZMQ_RCVBUF, (void*)&buff_size, opt_len);
104 
105  if (result == 0)
106  {
107  result = zmq_getsockopt(
108  read_socket_, ZMQ_RCVBUF, (void*)&rcv_buff_size, &opt_len);
109 
111  "ZMQTransportReadThread::init:"
112  " successfully set sockopt rcvbuf size to %d. Actual %d allocated\n",
113  buff_size, rcv_buff_size);
114  }
115  else
116  {
118  "ZMQTransportReadThread::init:"
119  " ERROR: errno = %s\n",
120  zmq_strerror(zmq_errno()));
121  }
122 
123  result =
124  zmq_setsockopt(read_socket_, ZMQ_RCVTIMEO, (void*)&timeout, opt_len);
125 
126  if (result == 0)
127  {
128  result =
129  zmq_getsockopt(read_socket_, ZMQ_RCVTIMEO, (void*)&timeout, &opt_len);
130 
132  "ZMQTransportReadThread::init:"
133  " successfully set rcv timeout to %d\n",
134  timeout);
135  }
136  else
137  {
139  "ZMQTransportReadThread::init:"
140  " ERROR: When setting timeout on rcv, errno = %s\n",
141  zmq_strerror(zmq_errno()));
142  }
143 
144  if (settings_.hosts.size() >= 1)
145  {
146  // if the first host was a reliable multicast, we need to connect to it
147  if (utility::begins_with(settings_.hosts[0], "pgm") ||
148  utility::begins_with(settings_.hosts[0], "epgm"))
149  {
150  }
151  }
152 
153  // connect the reader to the host sockets
154  size_t i = 0;
155 
156  if (settings_.hosts.size() >= 1)
157  {
158  // we ignore first host unless it is pgm because writer is on first host
159  if (!utility::begins_with(settings_.hosts[0], "pgm") &&
160  !utility::begins_with(settings_.hosts[0], "epgm"))
161  {
162  ++i;
163  }
164  }
165 
166  for (; i < settings_.hosts.size(); ++i)
167  {
168  int connect_result =
169  zmq_connect(read_socket_, settings_.hosts[i].c_str());
170 
171  if (connect_result == 0)
172  {
174  "ZMQTransportReadThread::init:"
175  " successfully connected to %s\n",
176  settings_.hosts[i].c_str());
177  }
178  else
179  {
181  "ZMQTransportReadThread::init:"
182  " ERROR: could not connect to %s\n",
183  settings_.hosts[i].c_str());
185  "ZMQTransportReadThread::init:"
186  " ERROR: errno = %s\n",
187  zmq_strerror(zmq_errno()));
188  }
189  }
190  }
191 
192  if (context_)
193  {
194  // check for an on_data_received ruleset
195  if (settings_.on_data_received_logic.length() != 0)
196  {
197  madara_logger_log(this->context_->get_logger(), logger::LOG_MAJOR,
198  "ZMQTransportReadThread::init:"
199  " setting rules to %s\n",
200  settings_.on_data_received_logic.c_str());
201 
202 #ifndef _MADARA_NO_KARL_
204  on_data_received_ = context_->compile(settings_.on_data_received_logic);
205 #endif // _MADARA_NO_KARL_
206  }
207  else
208  {
209  madara_logger_log(this->context_->get_logger(), logger::LOG_MAJOR,
210  "ZMQTransportReadThread::init:"
211  " no permanent rules were set\n");
212  }
213  }
214 }
215 
217 {
219  "ZMQTransportReadThread::cleanup:"
220  " starting cleanup\n");
221 
222  if (read_socket_ != 0)
223  {
225  "ZMQTransportReadThread::cleanup:"
226  " closing read socket\n");
227 
228  int result = zmq_close(read_socket_);
229  read_socket_ = 0;
230 
231  if (result != 0)
232  {
234  "ZMQTransportReadThread::cleanup:"
235  " ERROR: errno = %s\n",
236  zmq_strerror(zmq_errno()));
237  }
238  }
239 
241  "ZMQTransportReadThread::cleanup:"
242  " finished cleanup\n");
243 }
244 
246  const char* print_prefix, MessageHeader* header,
247  const knowledge::KnowledgeMap& records)
248 {
249  int64_t buffer_remaining = (int64_t)settings_.queue_length;
250  char* buffer = buffer_.get_ptr();
251  int result(0);
252 
253  if (!settings_.no_sending)
254  {
255  result = prep_rebroadcast(*context_, buffer, buffer_remaining, settings_,
256  print_prefix, header, records, packet_scheduler_);
257 
258  if (result > 0)
259  {
260  if (settings_.hosts.size() > 0 && result > 0)
261  {
263  "ZMQTransportReadThread::send:"
264  " sending %d bytes on socket\n",
265  result);
266 
267  // send the prepped buffer over ZeroMQ
268  result = zmq_send(write_socket_, (void*)buffer_.get_ptr(),
269  (size_t)result, ZMQ_DONTWAIT);
270 
272  "ZMQTransportReadThread::send:"
273  " sent %d bytes on socket\n",
274  result);
275  }
276  }
277  }
278 }
279 
281 {
282  if (!settings_.no_receiving)
283  {
284  // allocate a buffer to send
285  char* buffer = buffer_.get_ptr();
286  const char* print_prefix = "ZMQTransportReadThread::run";
287  int64_t buffer_remaining = settings_.queue_length;
288  size_t zmq_buffer_size = buffer_remaining;
289 
291  "%s:"
292  " entering main service loop.\n",
293  print_prefix);
294 
295  knowledge::KnowledgeMap rebroadcast_records;
296 
297  if (buffer == 0)
298  {
300  "%s:"
301  " Unable to allocate buffer of size " PRIu32 ". Exiting thread.\n",
302  print_prefix, settings_.queue_length);
303 
304  return;
305  }
306 
308  "%s:"
309  " entering a recv on the socket.\n",
310  print_prefix);
311 
312  // blocking receive up to rcv timeout (1 second)
313  buffer_remaining =
314  (int64_t)zmq_recv(read_socket_, (void*)buffer, zmq_buffer_size, 0);
315 
317  "%s:"
318  " past recv on the socket.\n",
319  print_prefix);
320 
321  if (buffer_remaining > 0)
322  {
323  if (settings_.debug_to_kb_prefix != "")
324  {
325  received_data_ += buffer_remaining;
326  ++received_packets_;
327 
328  if (received_data_max_ < buffer_remaining)
329  {
330  received_data_max_ = buffer_remaining;
331  }
332  if (received_data_min_ > buffer_remaining || received_data_min_ == 0)
333  {
334  received_data_min_ = buffer_remaining;
335  }
336  }
337 
338  MessageHeader* header = (MessageHeader*)buffer;
339 
341  "%s:"
342  " processing %d byte update from %s.\n",
343  print_prefix, (int)buffer_remaining, header->originator);
344 
345  process_received_update(buffer, (uint32_t)buffer_remaining, id_,
346  *context_, settings_, send_monitor_, receive_monitor_,
347  rebroadcast_records,
348 #ifndef _MADARA_NO_KARL_
349  on_data_received_,
350 #endif // _MADARA_NO_KARL_
351  print_prefix, header->originator, header);
352 
354  "%s:"
355  " done processing %d byte update from %s.\n",
356  print_prefix, (int)buffer_remaining, header->originator);
357 
358  if (header)
359  {
360  // delete header
361  delete header;
362  }
363  }
364  else
365  {
367  "%s:"
368  " wait timeout on new messages. Proceeding to next wait\n",
369  print_prefix);
370 
371  if (settings_.debug_to_kb_prefix != "")
372  {
373  ++failed_receives_;
374  }
375  }
376  }
377 }
#define madara_logger_log(loggering, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
const ThreadSafeContext * context_
Parses incoming expression strings into a parse tree and generates an expression tree from the parse ...
Definition: Interpreter.h:43
This class provides a distributed knowledge base to users.
Definition: KnowledgeBase.h:45
Provides monitoring capability of a transport's bandwidth.
Defines a robust message header which is the default for KaRL messages.
Definition: MessageHeader.h:57
char originator[64]
the originator of the message (host:port)
Provides scheduler for dropping packets.
Holds basic transport settings.
void * get_context()
Retrieves the underlying ZMQ context.
Definition: ZMQContext.h:46
void run(void)
The main loop internals for the read thread.
void init(knowledge::KnowledgeBase &knowledge)
Initializes MADARA context-related items.
void cleanup(void)
Cleanup function called by thread manager.
ZMQTransportReadThread(const TransportSettings &settings, const std::string &id, void *write_socket, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, PacketScheduler &packet_scheduler)
Constructor.
void rebroadcast(const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records)
Sends a rebroadcast packet.
constexpr string_t string
Provides functions and classes for the distributed knowledge base.
::std::map< std::string, KnowledgeRecord > KnowledgeMap
MADARA_EXPORT ZMQContext zmq_context
Definition: ZMQContext.cpp:5
int process_received_update(const char *buffer, uint32_t bytes_read, const std::string &id, knowledge::ThreadSafeContext &context, const QoSTransportSettings &settings, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, knowledge::KnowledgeMap &rebroadcast_records, knowledge::CompiledExpression &on_data_received, const char *print_prefix, const char *remote_host, MessageHeader *&header)
Processes a received update, updates monitors, fills rebroadcast records according to settings filter...
Definition: Transport.cpp:109
int prep_rebroadcast(knowledge::ThreadSafeContext &context, char *buffer, int64_t &buffer_remaining, const QoSTransportSettings &settings, const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records, PacketScheduler &packet_scheduler)
Preps a buffer for rebroadcasting records to other agents on the network.
Definition: Transport.cpp:791
MADARA_EXPORT bool begins_with(const std::string &input, const std::string &prefix)
Check if input contains prefix at the beginning.
Definition: Utility.inl:638