19 :
Base (id, config, context), write_socket_ (0)
53 zmq_setsockopt (
write_socket_, ZMQ_LINGER, (
void *)&option,
sizeof (
int));
61 "ZMQTransport::close:" \
62 " calling terminate on read threads\n");
67 "ZMQTransport::close:" \
68 " waiting on read threads\n");
73 "ZMQTransport::close:" \
74 " waiting on read threads\n");
100 "ZMQTransport::setup:" \
101 " setting up write socket\n");
106 "ZMQTransport::setup:" \
107 " binding write to %s\n",
110 int bind_result = zmq_bind (write_socket_,
settings_.
hosts[0].c_str ());
112 if (bind_result != 0)
115 "ZMQTransport::setup:" \
116 " ERROR: could not bind to %s\n",
119 "ZMQTransport::setup:" \
120 " ERROR: errno = %s\n",
121 zmq_strerror (zmq_errno ()));
126 "ZMQTransport::setup:" \
127 " successfully bound to %s\n",
132 int send_buff_size = 0;
136 size_t opt_len =
sizeof (int);
139 "ZMQTransport::setup:" \
140 " setting send buff size to settings.queue_length (%d)\n",
143 int result = zmq_setsockopt (
144 write_socket_, ZMQ_SNDBUF, (
void *)&buff_size, opt_len);
148 result = zmq_getsockopt (
149 write_socket_, ZMQ_SNDBUF, (
void *)&send_buff_size, &opt_len);
152 "ZMQTransport::setup:" \
153 " successfully set sockopt sendbuf size to %d. Actual %d allocated\n",
154 buff_size, send_buff_size);
159 "ZMQTransport::setup:" \
160 " ERROR: errno = %s\n",
161 zmq_strerror (zmq_errno ()));
165 result = zmq_setsockopt (
166 write_socket_, ZMQ_SNDTIMEO, (
void *)&timeout, opt_len);
170 result = zmq_getsockopt (
171 write_socket_, ZMQ_SNDTIMEO, (
void *)&timeout, &opt_len);
174 "ZMQTransport::setup:" \
175 " successfully set send timeout to %d\n",
181 "ZMQTransport::setup:" \
182 " ERROR: When setting timeout on send, errno = %s\n",
183 zmq_strerror (zmq_errno ()));
195 "ZMQTransport::setup:" \
201 std::stringstream thread_name;
202 thread_name <<
"read";
221 const char * print_prefix =
"ZMQTransport::send_data";
225 result =
prep_send (orig_updates, print_prefix);
230 "ZMQTransport::send:" \
231 " sending %d bytes on socket\n", (int)result);
234 result = (long) zmq_send (
238 "ZMQTransport::send:" \
239 " sent %d bytes on socket\n", (int)result);
QoSTransportSettings settings_
void invalidate_transport(void)
Invalidates a transport to indicate it is shutting down.
int validate_transport(void)
Validates a transport to indicate it is not shutting down.
void rem_ref(void)
Removes a reference to the context.
MADARA_EXPORT ZMQContext zmq_context
virtual int setup(void) override
Initializes the transport.
This class stores variables and their values for use by any entity needing state information in a thr...
std::vector< std::string > hosts
Host information for transports that require it.
double read_thread_hertz
number of valid messages allowed to be received per second.
Holds basic transport settings.
void * write_socket_
underlying socket for sending
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
BandwidthMonitor receive_monitor_
monitor for receiving bandwidth usage
void run(const std::string name, BaseThread *thread, bool paused=false)
Starts a new thread and executes the provided user thread once.
double sleep(double sleep_time)
Sleeps for a certain amount of time.
T * get_ptr(void)
get the underlying pointer
bool no_receiving
if true, never receive over transport
virtual void close(void) override
Closes the transport.
void add_ref(void)
Adds a reference to the context.
bool no_sending
if true, never send over transport
int reliability(void) const
Accesses reliability setting.
knowledge::KnowledgeBase knowledge_
knowledge base for threads to use
long send_data(const madara::knowledge::VariableReferenceMap &updates) override
Sends a list of knowledge updates to listeners.
const std::string id_
host:port identifier of this process
Thread for reading knowledge updates through a ZMQ datagram socket.
uint32_t read_threads
the number of read threads to start
virtual ~ZMQTransport()
Destructor.
threads::Threader read_threads_
threads for reading knowledge updates
std::map< const char *, VariableReference, VariableReferenceMapCompare > VariableReferenceMap
a map of variable references
uint32_t queue_length
Length of the buffer used to store history of events.
void terminate(const std::string name)
Requests a specific thread to terminate.
void * get_context()
Retrieves the underlying ZMQ context.
void set_data_plane(knowledge::KnowledgeBase data_plane)
Sets the data plane for new threads.
PacketScheduler packet_scheduler_
scheduler for dropping packets to simulate network issues
madara::utility::ScopedArray< char > buffer_
buffer for sending
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
virtual int setup(void)
all subclasses should call this method at the end of its setup
Base class from which all transports must be derived.
long prep_send(const knowledge::VariableReferenceMap &orig_updates, const char *print_prefix)
Preps a message for sending.
bool wait(const std::string name, const knowledge::WaitSettings &ws=knowledge::WaitSettings())
Wait for a specific thread to complete.
BandwidthMonitor send_monitor_
monitor for sending bandwidth usage
madara::knowledge::ThreadSafeContext & context_
void use(ThreadSafeContext &original)
Refer to and use another knowledge base's context.
ZMQTransport(const std::string &id, madara::knowledge::ThreadSafeContext &context, TransportSettings &config, bool launch_transport)
Constructor.