11 namespace madara {
namespace transport {
39 "UdpTransport::setup_read_thread:" \
40 " Starting UdpTransport read thread: %s\n", name.c_str ());
57 socket_.bind(udp::endpoint (ip::address_v4::any (),
61 "UdpTransport::setup_read_socket:" \
62 " Bound to port: %d\n", (int)
addresses_[0].port ());
63 }
catch (
const boost::system::system_error &e) {
65 "UdpTransport::setup:" \
66 " Error setting up read socket: %s\n", e.what ());
87 const udp::endpoint &target,
const char *buf,
size_t size)
89 uint64_t bytes_sent = 0;
91 int send_attempts = -1;
92 ssize_t actual_sent = -1;
94 while (actual_sent < 0 &&
101 actual_sent =
socket_.send_to (
102 asio::buffer(buf, size), target);
103 }
catch (
const boost::system::system_error &e) {
105 "UdpTransport::send_buffer:" \
106 " Error sending packet to %s:%d: %s\n",
107 target.address ().to_string ().c_str (), (int)target.port (),
116 "UdpTransport::send_buffer: Sent %d byte packet to %s:%d\n",
118 target.address ().to_string ().c_str (),
119 (int)target.port ());
121 bytes_sent += actual_sent;
125 return (
long)bytes_sent;
131 static const char print_prefix[] =
"UdpTransport::send_message";
133 uint64_t bytes_sent = 0;
141 " fragmenting %" PRIu64
" byte packet (%" PRIu32
" bytes is max fragment size)\n",
148 for (FragmentMap::iterator i = map.begin (); i != map.end (); ++i)
152 " Sending fragment %d\n",
170 " Sent fragments totalling %" PRIu64
" bytes\n",
171 print_prefix, bytes_sent);
179 " Sending packet of size %ld\n",
180 print_prefix, packet_size);
184 size_t addr_index = &address - &*addresses_.begin ();
189 " Deciding to send to %s:%d (index %d): %d\n",
191 address.address ().to_string ().c_str (),
192 (int)address.port (),
193 addr_index, should_send);
196 bytes_sent +=
send_buffer(address, buf, (
size_t)packet_size);
209 " Send bandwidth = %" PRIu64
" B/s\n",
212 return (
long) bytes_sent;
220 const char * print_prefix =
"UdpTransport::send_data";
224 result =
prep_send (orig_updates, print_prefix);
QoSTransportSettings settings_
long send_message(const char *buf, size_t size)
uint64_t get_bytes_per_second(void)
Queries the monitor for the current bandwidth utilization per second over the past window...
int setup_write_socket() override
UdpTransport(const std::string &id, madara::knowledge::ThreadSafeContext &context, TransportSettings &config, bool launch_transport)
void invalidate_transport(void)
Invalidates a transport to indicate it is shutting down.
udp::socket socket_
underlying socket
This class stores variables and their values for use by any entity needing state information in a thr...
long send_buffer(const udp::endpoint &target, const char *buf, size_t size)
Holds basic transport settings.
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
virtual bool pre_send_buffer(size_t addr_index)
int setup_read_socket() override
void run(const std::string name, BaseThread *thread, bool paused=false)
Starts a new thread and executes the provided user thread once.
double sleep(double sleep_time)
Sleeps for a certain amount of time.
T * get_ptr(void)
get the underlying pointer
bool no_sending
if true, never send over transport
void add(uint64_t size)
Adds a message to the monitor.
std::vector< udp::endpoint > addresses_
holds all multicast addresses we are sending to
double slack_time
time to sleep between sends and rebroadcasts
MADARA_EXPORT void delete_fragments(FragmentMap &map)
Deletes fragments within a fragment map and clears the map.
int reliability(void) const
Accesses reliability setting.
std::map< const char *, VariableReference, VariableReferenceMapCompare > VariableReferenceMap
a map of variable references
threads::Threader read_threads_
threads for reading knowledge updates
int setup() override
all subclasses should call this method at the end of its setup
int setup_read_thread(double hertz, const std::string &name) override
std::map< uint32_t, const char * > FragmentMap
Map of fragment identifiers to fragments.
Copyright (c) 2015 Carnegie Mellon University.
madara::utility::ScopedArray< char > buffer_
buffer for sending
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
long prep_send(const knowledge::VariableReferenceMap &orig_updates, const char *print_prefix)
Preps a message for sending.
long send_data(const madara::knowledge::VariableReferenceMap &updates) override
Sends a list of knowledge updates to listeners.
Thread for reading knowledge updates through a UDP socket.
uint32_t max_fragment_size
Maximum allowed fragment size for partitioning large messages.
BandwidthMonitor send_monitor_
monitor for sending bandwidth usage
madara::knowledge::ThreadSafeContext & context_
int resend_attempts
Maximum number of attempts to resend if transport is busy.
MADARA_EXPORT void frag(const char *source, uint32_t fragment_size, FragmentMap &map)
Breaks a large packet into smaller packets.
virtual int setup_read_socket()
virtual int setup_write_socket()