MADARA  3.4.1
ZMQTransport.cpp
Go to the documentation of this file.
1 
5 
11 
12 #include <iostream>
14 #include "ZMQContext.h"
15 
18  bool launch_transport)
19  : Base(id, config, context), write_socket_(0)
20 {
21  // create a reference to the knowledge base for threading
22  knowledge_.use(context);
23 
24  // set the data plane for the read threads
26 
27  // keep track of references to context so management is automatic
29 
30  if (launch_transport)
31  setup();
32 
33  if (config.debug_to_kb_prefix != "")
34  {
36  kb.use(context);
37 
38  sent_packets_.set_name(config.debug_to_kb_prefix + ".sent_packets", kb);
39  failed_sends_.set_name(config.debug_to_kb_prefix + ".failed_sends", kb);
40  sent_data_max_.set_name(config.debug_to_kb_prefix + ".sent_data_max", kb);
41  sent_data_min_.set_name(config.debug_to_kb_prefix + ".sent_data_min", kb);
42  sent_data_.set_name(config.debug_to_kb_prefix + ".sent_data", kb);
43  }
44 }
45 
47 {
48  close();
49 
50  // destroy context if it has no references anymore
52 }
53 
55 {
56  this->invalidate_transport();
57 
58  if (write_socket_ != 0)
59  {
60  int result = zmq_close(write_socket_);
61  write_socket_ = 0;
62 
63  if (result != 0)
64  {
66  "ZMQTransport::close:"
67  " ERROR: errno = %s\n",
68  zmq_strerror(zmq_errno()));
69  }
70  }
71 
73  "ZMQTransport::close:"
74  " calling terminate on read threads\n");
75 
76  read_threads_.terminate();
77 
79  "ZMQTransport::close:"
80  " waiting on read threads\n");
81 
82  read_threads_.wait();
83 
85  "ZMQTransport::close:"
86  " waiting on read threads\n");
87 }
88 
90 {
92 }
93 
95 {
97 }
98 
100 {
101  // call base setup method to initialize certain common variables
102  Base::setup();
103 
104  if (settings_.hosts.size() > 0)
105  {
107  "ZMQTransport::setup:"
108  " setting up write socket\n");
109 
110  write_socket_ = zmq_socket(zmq_context.get_context(), ZMQ_PUB);
111 
112  if (write_socket_ == NULL)
113  {
115  "ZMQTransport::setup:"
116  " ERROR: could not create PUB socket\n");
118  "ZMQTransport::setup:"
119  " ERROR: errno = %s\n",
120  zmq_strerror(zmq_errno()));
121  }
122 
123  for (size_t i = 0; i < settings_.hosts.size(); ++i)
124  {
125  if (!utility::begins_with(settings_.hosts[i], "tcp://") &&
126  !utility::begins_with(settings_.hosts[i], "ipc://") &&
127  !utility::begins_with(settings_.hosts[i], "inproc://") &&
128  !utility::begins_with(settings_.hosts[i], "pgm://") &&
129  !utility::begins_with(settings_.hosts[i], "epgm://"))
130  {
132  "ZMQTransport::setup:"
133  " converting incorrect host format to tcp://%s\n",
134  settings_.hosts[i].c_str());
135 
136  settings_.hosts[i] = "tcp://" + settings_.hosts[i];
137  }
138  }
139 
141  "ZMQTransport::setup:"
142  " binding write to %s\n",
143  settings_.hosts[0].c_str());
144 
145  int bind_result = zmq_bind(write_socket_, settings_.hosts[0].c_str());
146 
147  if (bind_result != 0)
148  {
150  "ZMQTransport::setup:"
151  " ERROR: could not bind to %s\n",
152  settings_.hosts[0].c_str());
154  "ZMQTransport::setup:"
155  " ERROR: errno = %s\n",
156  zmq_strerror(zmq_errno()));
157  }
158  else
159  {
161  "ZMQTransport::setup:"
162  " successfully bound to %s\n",
163  settings_.hosts[0].c_str());
164  }
165 
166  int send_buff_size = 0;
167  // int rcv_buff_size = 0;
168  int buff_size = settings_.queue_length;
169  int timeout = 300;
170  int zero = 0;
171  size_t opt_len = sizeof(int);
172 
174  "ZMQTransport::setup:"
175  " setting send buff size to settings.queue_length (%d)\n",
176  buff_size);
177 
178  int result =
179  zmq_setsockopt(write_socket_, ZMQ_SNDBUF, (void*)&buff_size, opt_len);
180 
181  if (result == 0)
182  {
183  result = zmq_getsockopt(
184  write_socket_, ZMQ_SNDBUF, (void*)&send_buff_size, &opt_len);
185 
187  "ZMQTransport::setup:"
188  " successfully set sockopt sendbuf size to %d. Actual %d allocated\n",
189  buff_size, send_buff_size);
190  }
191  else
192  {
194  "ZMQTransport::setup:"
195  " ERROR: errno = %s\n",
196  zmq_strerror(zmq_errno()));
197  }
198 
199  // if you don't do this, ZMQ waits forever for no reason. Super smart.
200  result =
201  zmq_setsockopt(write_socket_, ZMQ_LINGER, (void*)&zero, sizeof(int));
202 
203  result =
204  zmq_setsockopt(write_socket_, ZMQ_SNDTIMEO, (void*)&timeout, opt_len);
205 
206  if (result == 0)
207  {
208  result = zmq_getsockopt(
209  write_socket_, ZMQ_SNDTIMEO, (void*)&timeout, &opt_len);
210 
212  "ZMQTransport::setup:"
213  " successfully set send timeout to %d\n",
214  timeout);
215  }
216  else
217  {
219  "ZMQTransport::setup:"
220  " ERROR: When setting timeout on send, errno = %s\n",
221  zmq_strerror(zmq_errno()));
222  }
223 
224  if (!settings_.no_receiving)
225  {
226  double hertz = settings_.read_thread_hertz;
227  if (hertz < 0.0)
228  {
229  hertz = 0.0;
230  }
231 
233  "ZMQTransport::setup:"
234  " starting %d threads at %f hertz\n",
235  settings_.read_threads, hertz);
236 
237  for (uint32_t i = 0; i < settings_.read_threads; ++i)
238  {
239  std::stringstream thread_name;
240  thread_name << "read";
241  thread_name << i;
242 
243  read_threads_.run(hertz, thread_name.str(),
244  new ZMQTransportReadThread(settings_, id_, write_socket_,
245  send_monitor_, receive_monitor_, packet_scheduler_));
246  }
247  }
248  }
249 
250  return this->validate_transport();
251 }
252 
254  const madara::knowledge::KnowledgeMap& orig_updates)
255 {
256  long result(0);
257  const char* print_prefix = "ZMQTransport::send_data";
258 
259  if (!settings_.no_sending)
260  {
261  result = prep_send(orig_updates, print_prefix);
262 
263  if (settings_.hosts.size() > 0 && result > 0)
264  {
266  "ZMQTransport::send:"
267  " sending %d bytes on socket\n",
268  (int)result);
269 
270  // send the prepped buffer over ZeroMQ with timeout of 300ms
271  result = (long)zmq_send(
272  write_socket_, (void*)buffer_.get_ptr(), (size_t)result, 0);
273 
274  if (result > 0)
275  {
276  if (settings_.debug_to_kb_prefix != "")
277  {
278  sent_data_ += result;
279  ++sent_packets_;
280  if (sent_data_max_ < result)
281  {
282  sent_data_max_ = result;
283  }
284  if (sent_data_min_ > result || sent_data_min_ == 0)
285  {
286  sent_data_min_ = result;
287  }
288  }
289 
291  "ZMQTransport::send:"
292  " sent %d bytes on socket\n",
293  (int)result);
294  }
295  else
296  {
298  "ZMQTransport::send:"
299  " failed to send message. Error code %d\n",
300  (int)result);
301  }
302  }
303  }
304 
305  return result;
306 }
#define madara_logger_log(loggering, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
const ThreadSafeContext * context_
This class provides a distributed knowledge base to users.
Definition: KnowledgeBase.h:45
void use(ThreadSafeContext &original)
Refer to and use another knowledge base's context.
This class stores variables and their values for use by any entity needing state information in a thr...
void set_name(const std::string &var_name, KnowledgeBase &knowledge)
Sets the variable name that this refers to.
Definition: Integer.inl:54
void set_data_plane(knowledge::KnowledgeBase &data_plane)
Sets the data plane for new threads.
Definition: Threader.cpp:168
Base class from which all transports must be derived.
Definition: Transport.h:46
virtual int setup(void)
all subclasses should call this method at the end of its setup
Definition: Transport.cpp:32
Holds basic transport settings.
std::string debug_to_kb_prefix
if not empty, save debug information to knowledge base at prefix
void rem_ref(void)
Removes a reference to the context.
Definition: ZMQContext.cpp:60
void * get_context()
Retrieves the underlying ZMQ context.
Definition: ZMQContext.h:46
void add_ref(void)
Adds a reference to the context.
Definition: ZMQContext.cpp:37
Thread for reading knowledge updates through a ZMQ datagram socket.
virtual void close(void) override
Closes the transport.
virtual int setup(void) override
Initializes the transport.
virtual ~ZMQTransport()
Destructor.
knowledge::containers::Integer failed_sends_
failed sends
Definition: ZMQTransport.h:107
knowledge::containers::Integer sent_data_min_
min data sent
Definition: ZMQTransport.h:116
long send_data(const madara::knowledge::KnowledgeMap &updates) override
Sends a list of knowledge updates to listeners.
threads::Threader read_threads_
threads for reading knowledge updates
Definition: ZMQTransport.h:98
knowledge::containers::Integer sent_data_max_
max data sent
Definition: ZMQTransport.h:113
knowledge::containers::Integer sent_data_
sent data
Definition: ZMQTransport.h:110
int reliability(void) const
Accesses reliability setting.
knowledge::containers::Integer sent_packets_
sent packets
Definition: ZMQTransport.h:104
knowledge::KnowledgeBase knowledge_
knowledge base for threads to use
Definition: ZMQTransport.h:95
ZMQTransport(const std::string &id, madara::knowledge::ThreadSafeContext &context, TransportSettings &config, bool launch_transport)
Constructor.
constexpr string_t string
::std::map< std::string, KnowledgeRecord > KnowledgeMap
MADARA_EXPORT ZMQContext zmq_context
Definition: ZMQContext.cpp:5
MADARA_EXPORT bool begins_with(const std::string &input, const std::string &prefix)
Check if input contains prefix at the beginning.
Definition: Utility.inl:638