18 bool launch_transport)
19 :
Base(id, config, context), write_socket_(0)
56 this->invalidate_transport();
58 if (write_socket_ != 0)
60 int result = zmq_close(write_socket_);
66 "ZMQTransport::close:"
67 " ERROR: errno = %s\n",
68 zmq_strerror(zmq_errno()));
73 "ZMQTransport::close:"
74 " calling terminate on read threads\n");
76 read_threads_.terminate();
79 "ZMQTransport::close:"
80 " waiting on read threads\n");
85 "ZMQTransport::close:"
86 " waiting on read threads\n");
104 if (settings_.hosts.size() > 0)
107 "ZMQTransport::setup:"
108 " setting up write socket\n");
112 if (write_socket_ == NULL)
115 "ZMQTransport::setup:"
116 " ERROR: could not create PUB socket\n");
118 "ZMQTransport::setup:"
119 " ERROR: errno = %s\n",
120 zmq_strerror(zmq_errno()));
123 for (
size_t i = 0; i < settings_.hosts.size(); ++i)
132 "ZMQTransport::setup:"
133 " converting incorrect host format to tcp://%s\n",
134 settings_.hosts[i].c_str());
136 settings_.hosts[i] =
"tcp://" + settings_.hosts[i];
141 "ZMQTransport::setup:"
142 " binding write to %s\n",
143 settings_.hosts[0].c_str());
145 int bind_result = zmq_bind(write_socket_, settings_.hosts[0].c_str());
147 if (bind_result != 0)
150 "ZMQTransport::setup:"
151 " ERROR: could not bind to %s\n",
152 settings_.hosts[0].c_str());
154 "ZMQTransport::setup:"
155 " ERROR: errno = %s\n",
156 zmq_strerror(zmq_errno()));
161 "ZMQTransport::setup:"
162 " successfully bound to %s\n",
163 settings_.hosts[0].c_str());
166 int send_buff_size = 0;
168 int buff_size = settings_.queue_length;
171 size_t opt_len =
sizeof(int);
174 "ZMQTransport::setup:"
175 " setting send buff size to settings.queue_length (%d)\n",
179 zmq_setsockopt(write_socket_, ZMQ_SNDBUF, (
void*)&buff_size, opt_len);
183 result = zmq_getsockopt(
184 write_socket_, ZMQ_SNDBUF, (
void*)&send_buff_size, &opt_len);
187 "ZMQTransport::setup:"
188 " successfully set sockopt sendbuf size to %d. Actual %d allocated\n",
189 buff_size, send_buff_size);
194 "ZMQTransport::setup:"
195 " ERROR: errno = %s\n",
196 zmq_strerror(zmq_errno()));
201 zmq_setsockopt(write_socket_, ZMQ_LINGER, (
void*)&zero,
sizeof(
int));
204 zmq_setsockopt(write_socket_, ZMQ_SNDTIMEO, (
void*)&timeout, opt_len);
208 result = zmq_getsockopt(
209 write_socket_, ZMQ_SNDTIMEO, (
void*)&timeout, &opt_len);
212 "ZMQTransport::setup:"
213 " successfully set send timeout to %d\n",
219 "ZMQTransport::setup:"
220 " ERROR: When setting timeout on send, errno = %s\n",
221 zmq_strerror(zmq_errno()));
224 if (!settings_.no_receiving)
226 double hertz = settings_.read_thread_hertz;
233 "ZMQTransport::setup:"
234 " starting %d threads at %f hertz\n",
235 settings_.read_threads, hertz);
237 for (uint32_t i = 0; i < settings_.read_threads; ++i)
239 std::stringstream thread_name;
240 thread_name <<
"read";
243 read_threads_.run(hertz, thread_name.str(),
245 send_monitor_, receive_monitor_, packet_scheduler_));
250 return this->validate_transport();
257 const char* print_prefix =
"ZMQTransport::send_data";
259 if (!settings_.no_sending)
261 result = prep_send(orig_updates, print_prefix);
263 if (settings_.hosts.size() > 0 && result > 0)
266 "ZMQTransport::send:"
267 " sending %d bytes on socket\n",
271 result = (long)zmq_send(
272 write_socket_, (
void*)buffer_.get_ptr(), (size_t)result, 0);
276 if (settings_.debug_to_kb_prefix !=
"")
278 sent_data_ += result;
280 if (sent_data_max_ < result)
282 sent_data_max_ = result;
284 if (sent_data_min_ > result || sent_data_min_ == 0)
286 sent_data_min_ = result;
291 "ZMQTransport::send:"
292 " sent %d bytes on socket\n",
298 "ZMQTransport::send:"
299 " failed to send message. Error code %d\n",
#define madara_logger_log(loggering, level,...)
Fast version of the madara::logger::log method.
const ThreadSafeContext * context_
This class provides a distributed knowledge base to users.
void use(ThreadSafeContext &original)
Refer to and use another knowledge base's context.
This class stores variables and their values for use by any entity needing state information in a thr...
void set_name(const std::string &var_name, KnowledgeBase &knowledge)
Sets the variable name that this refers to.
void set_data_plane(knowledge::KnowledgeBase &data_plane)
Sets the data plane for new threads.
Base class from which all transports must be derived.
virtual int setup(void)
all subclasses should call this method at the end of its setup
Holds basic transport settings.
std::string debug_to_kb_prefix
if not empty, save debug information to knowledge base at prefix
void rem_ref(void)
Removes a reference to the context.
void * get_context()
Retrieves the underlying ZMQ context.
void add_ref(void)
Adds a reference to the context.
Thread for reading knowledge updates through a ZMQ datagram socket.
virtual void close(void) override
Closes the transport.
virtual int setup(void) override
Initializes the transport.
virtual ~ZMQTransport()
Destructor.
knowledge::containers::Integer failed_sends_
failed sends
knowledge::containers::Integer sent_data_min_
min data sent
long send_data(const madara::knowledge::KnowledgeMap &updates) override
Sends a list of knowledge updates to listeners.
threads::Threader read_threads_
threads for reading knowledge updates
knowledge::containers::Integer sent_data_max_
max data sent
knowledge::containers::Integer sent_data_
sent data
int reliability(void) const
Accesses reliability setting.
knowledge::containers::Integer sent_packets_
sent packets
knowledge::KnowledgeBase knowledge_
knowledge base for threads to use
ZMQTransport(const std::string &id, madara::knowledge::ThreadSafeContext &context, TransportSettings &config, bool launch_transport)
Constructor.
::std::map< std::string, KnowledgeRecord > KnowledgeMap
MADARA_EXPORT ZMQContext zmq_context
MADARA_EXPORT bool begins_with(const std::string &input, const std::string &prefix)
Check if input contains prefix at the beginning.