MADARA  3.2.3
ZMQTransportReadThread.cpp
Go to the documentation of this file.
2 
6 #include "ZMQContext.h"
7 
8 
9 #include <iostream>
10 #include <algorithm>
11 
13  const TransportSettings & settings,
14  const std::string & id,
15  void * write_socket,
16  BandwidthMonitor & send_monitor,
17  BandwidthMonitor & receive_monitor,
18  PacketScheduler & packet_scheduler)
19  : settings_ (settings), id_ (id), context_ (0),
20  write_socket_ (write_socket),
21  read_socket_ (0),
22  send_monitor_ (send_monitor),
23  receive_monitor_ (receive_monitor),
24  packet_scheduler_ (packet_scheduler)
25 {
26 }
27 
28 void
31 {
32  context_ = &(knowledge.get_context ());
33 
35  {
36  //int send_buff_size = 0;
37  int rcv_buff_size = 0;
38  int timeout = 1000;
39  int buff_size = settings_.queue_length;
40  size_t opt_len = sizeof (int);
41 
42  // setup the receive buffer
43  if (settings_.queue_length > 0)
44  buffer_ = new char[settings_.queue_length];
45 
47  "ZMQTransportReadThread::init:" \
48  " setting up read socket\n");
49 
50  read_socket_ = zmq_socket (zmq_context.get_context (), ZMQ_SUB);
51 
52  // subscribe to all messages
53  zmq_setsockopt (read_socket_, ZMQ_SUBSCRIBE, 0, 0);
54 
56  "ZMQTransportReadThread::init:" \
57  " setting rcv buff size to settings.queue_length (%d)\n",
58  buff_size);
59 
60  int result = zmq_setsockopt (
61  read_socket_, ZMQ_RCVBUF, (void *)&buff_size, opt_len);
62 
63  if (result == 0)
64  {
65  result = zmq_getsockopt (
66  read_socket_, ZMQ_RCVBUF, (void *)&rcv_buff_size, &opt_len);
67 
69  "ZMQTransportReadThread::init:" \
70  " successfully set sockopt rcvbuf size to %d. Actual %d allocated\n",
71  buff_size, rcv_buff_size);
72  }
73  else
74  {
76  "ZMQTransportReadThread::init:" \
77  " ERROR: errno = %s\n",
78  zmq_strerror (zmq_errno ()));
79  }
80 
81 
82  result = zmq_setsockopt (
83  read_socket_, ZMQ_RCVTIMEO, (void *)&timeout, opt_len);
84 
85  if (result == 0)
86  {
87  result = zmq_getsockopt (
88  read_socket_, ZMQ_RCVTIMEO, (void *)&timeout, &opt_len);
89 
91  "ZMQTransportReadThread::init:" \
92  " successfully set rcv timeout to %d\n",
93  timeout);
94  }
95  else
96  {
98  "ZMQTransportReadThread::init:" \
99  " ERROR: When setting timeout on rcv, errno = %s\n",
100  zmq_strerror (zmq_errno ()));
101  }
102 
103 
104  if (settings_.hosts.size () >= 1)
105  {
106  // if the first host was a reliable multicast, we need to connect to it
107  if (utility::begins_with (settings_.hosts[0], "pgm") ||
108  utility::begins_with (settings_.hosts[0], "epgm"))
109  {
110 
111  }
112  }
113 
114  // connect the reader to the host sockets
115  size_t i = 0;
116 
117  if (settings_.hosts.size () >= 1)
118  {
119  // we ignore first host unless it is pgm because writer is on first host
120  if (!utility::begins_with (settings_.hosts[0], "pgm") &&
121  !utility::begins_with (settings_.hosts[0], "epgm"))
122  {
123  ++i;
124  }
125  }
126 
127  for (; i < settings_.hosts.size (); ++i)
128  {
129  int connect_result = zmq_connect (
130  read_socket_, settings_.hosts[i].c_str ());
131 
132  if (connect_result == 0)
133  {
135  "ZMQTransportReadThread::init:" \
136  " successfully connected to %s\n",
137  settings_.hosts[i].c_str ());
138  }
139  else
140  {
142  "ZMQTransportReadThread::init:" \
143  " ERROR: could not connect to %s\n",
144  settings_.hosts[i].c_str ());
146  "ZMQTransportReadThread::init:" \
147  " ERROR: errno = %s\n",
148  zmq_strerror (zmq_errno ()));
149  }
150  }
151  }
152 
153  if (context_)
154  {
155  // check for an on_data_received ruleset
156  if (settings_.on_data_received_logic.length () != 0)
157  {
159  "ZMQTransportReadThread::init:" \
160  " setting rules to %s\n",
162 
163 
164 #ifndef _MADARA_NO_KARL_
167 #endif // _MADARA_NO_KARL_
168  }
169  else
170  {
172  "ZMQTransportReadThread::init:" \
173  " no permanent rules were set\n");
174  }
175  }
176 }
177 
178 void
180 {
182  "ZMQTransportReadThread::cleanup:" \
183  " starting cleanup\n");
184 
185  if (read_socket_ != 0)
186  {
188  "ZMQTransportReadThread::cleanup:" \
189  " closing read socket\n");
190 
191  int option = 0;
192  // if you don't do this, ZMQ waits forever for no reason. Super smart.
193  zmq_setsockopt (read_socket_, ZMQ_LINGER, (void *)&option, sizeof (int));
194 
195  madara::utility::sleep (0.100);
196 
197  zmq_close (read_socket_);
198  }
199 
201  "ZMQTransportReadThread::cleanup:" \
202  " finished cleanup\n");
203 
204 }
205 
206 void
208  const char * print_prefix,
209  MessageHeader * header,
210  const knowledge::KnowledgeMap & records)
211 {
212  int64_t buffer_remaining = (int64_t)settings_.queue_length;
213  char * buffer = buffer_.get_ptr ();
214  int result (0);
215 
216  if (!settings_.no_sending)
217  {
218  result = prep_rebroadcast (*context_, buffer, buffer_remaining,
219  settings_, print_prefix,
220  header, records,
222 
223  if (result > 0)
224  {
225  if (settings_.hosts.size () > 0 && result > 0)
226  {
228  "ZMQTransportReadThread::send:" \
229  " sending %d bytes on socket\n", result);
230 
231  //send the prepped buffer over ZeroMQ
232  result = zmq_send (
233  write_socket_, (void *)buffer_.get_ptr (), (size_t)result, ZMQ_DONTWAIT);
234 
236  "ZMQTransportReadThread::send:" \
237  " sent %d bytes on socket\n", result);
238  }
239  }
240  }
241 }
242 
243 void
245 {
246  if (!settings_.no_receiving)
247  {
248  // allocate a buffer to send
249  char * buffer = buffer_.get_ptr ();
250  const char * print_prefix = "ZMQTransportReadThread::run";
251  int64_t buffer_remaining = settings_.queue_length;
252  size_t zmq_buffer_size = buffer_remaining;
253 
255  "%s:" \
256  " entering main service loop.\n",
257  print_prefix);
258 
259  knowledge::KnowledgeMap rebroadcast_records;
260 
261  if (buffer == 0)
262  {
264  "%s:" \
265  " Unable to allocate buffer of size " PRIu32 ". Exiting thread.\n",
266  print_prefix,
268 
269  return;
270  }
271 
273  "%s:" \
274  " entering a recv on the socket.\n",
275  print_prefix);
276 
277  // blocking receive up to rcv timeout (1 second)
278  buffer_remaining = (int64_t)zmq_recv (
279  read_socket_, (void *)buffer, zmq_buffer_size, 0);
280 
282  "%s:" \
283  " past recv on the socket.\n",
284  print_prefix);
285 
286  if (buffer_remaining > 0)
287  {
288  MessageHeader * header = (MessageHeader *)buffer;
289 
291  "%s:" \
292  " processing %d byte update from %s.\n",
293  print_prefix, (int)buffer_remaining, header->originator);
294 
295  process_received_update (buffer, (uint32_t)buffer_remaining, id_, *context_,
296  settings_, send_monitor_, receive_monitor_, rebroadcast_records,
297 #ifndef _MADARA_NO_KARL_
299 #endif // _MADARA_NO_KARL_
300  print_prefix,
301  header->originator, header);
302 
304  "%s:" \
305  " done processing %d byte update from %s.\n",
306  print_prefix, (int)buffer_remaining, header->originator);
307 
308  if (header)
309  {
310  // delete header
311  delete header;
312  }
313  }
314  else
315  {
317  "%s:" \
318  " wait timeout on new messages. Proceeding to next wait\n",
319  print_prefix);
320  }
321  }
322 }
const QoSTransportSettings settings_
quality-of-service transport settings
void init(knowledge::KnowledgeBase &knowledge)
Initializes MADARA context-related items.
const std::string id_
host:port identifier of this process
BandwidthMonitor & send_monitor_
monitor for sending bandwidth usage
madara::knowledge::CompiledExpression on_data_received_
data received rules, defined in Transport settings
void * read_socket_
The multicast socket we are reading from.
knowledge::ThreadSafeContext * context_
knowledge context
MADARA_EXPORT ZMQContext zmq_context
Definition: ZMQContext.cpp:5
void * write_socket_
underlying socket for sending
Provides scheduler for dropping packets.
std::vector< std::string > hosts
Host information for transports that require it.
BandwidthMonitor & receive_monitor_
monitor for receiving bandwidth usage
void cleanup(void)
Cleanup function called by thread manager.
Holds basic transport settings.
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
ZMQTransportReadThread(const TransportSettings &settings, const std::string &id, void *write_socket, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, PacketScheduler &packet_scheduler)
Constructor.
double sleep(double sleep_time)
Sleeps for a certain amount of time.
Definition: Utility.cpp:619
static struct madara::knowledge::tags::string_t string
T * get_ptr(void)
get the underlying pointer
Definition: ScopedArray.inl:68
bool no_receiving
if true, never receive over transport
This class provides a distributed knowledge base to users.
Definition: KnowledgeBase.h:45
Parses incoming expression strings into a parse tree and generates an expression tree from the parse ...
Definition: Interpreter.h:42
::std::map< std::string, KnowledgeRecord > KnowledgeMap
bool no_sending
if true, never send over transport
CompiledExpression compile(const std::string &expression)
Compiles a KaRL expression into an expression tree.
std::string on_data_received_logic
logic to be evaluated after every successful update
char originator[64]
the originator of the message (host:port)
uint32_t queue_length
Length of the buffer used to store history of events.
ThreadSafeContext & get_context(void)
Returns the ThreadSafeContext associated with this Knowledge Base.
Provides monitoring capability of a transport&#39;s bandwidth.
void * get_context()
Retrieves the underlying ZMQ context.
Definition: ZMQContext.h:46
Provides functions and classes for the distributed knowledge base.
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
PacketScheduler & packet_scheduler_
scheduler for mimicking target network conditions
MADARA_EXPORT bool begins_with(const std::string &input, const std::string &prefix)
Check if input contains prefix at the beginning.
Definition: Utility.inl:317
void rebroadcast(const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records)
Sends a rebroadcast packet.
int process_received_update(const char *buffer, uint32_t bytes_read, const std::string &id, knowledge::ThreadSafeContext &context, const QoSTransportSettings &settings, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, knowledge::KnowledgeMap &rebroadcast_records, knowledge::CompiledExpression &on_data_received, const char *print_prefix, const char *remote_host, MessageHeader *&header)
Processes a received update, updates monitors, fills rebroadcast records according to settings filter...
Definition: Transport.cpp:109
Defines a robust message header which is the default for KaRL messages.
Definition: MessageHeader.h:57
void run(void)
The main loop internals for the read thread.
int prep_rebroadcast(knowledge::ThreadSafeContext &context, char *buffer, int64_t &buffer_remaining, const QoSTransportSettings &settings, const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records, PacketScheduler &packet_scheduler)
Preps a buffer for rebroadcasting records to other agents on the network.
Definition: Transport.cpp:737
madara::utility::ScopedArray< char > buffer_
buffer for receiving