17 bool launch_transport)
19 enforcer_(1 / config.max_send_hertz)
50 "UdpTransport::setup_read_thread:"
51 " Starting UdpTransport read thread: %s\n",
72 "UdpTransport::setup_read_socket:"
73 " Bound to port: %d\n",
76 catch (
const boost::system::system_error& e)
79 "UdpTransport::setup:"
80 " Error setting up read socket: %s\n",
102 char* buf,
size_t & bytes_read, udp::endpoint& remote)
104 boost::system::error_code err;
105 bytes_read =
socket_.receive_from(
107 udp::socket::message_flags{}, err);
109 if (err == asio::error::would_block || bytes_read == 0)
112 "UdpTransport::receive_buffer: "
113 " no bytes to read. Proceeding to next wait\n");
120 "UdpTransport::receive_buffer: unexpected error: %s. "
121 "Proceeding to next wait\n",
122 err.message().c_str());
131 const udp::endpoint& target,
const char* buf,
size_t size)
133 uint64_t bytes_sent = 0;
135 int send_attempts = -1;
136 ssize_t actual_sent = -1;
149 actual_sent =
socket_.send_to(asio::buffer(buf, size), target);
151 catch (
const boost::system::system_error& e)
154 "UdpTransport::send_buffer:"
155 " Error sending packet to %s:%d: %s\n",
156 target.address().to_string().c_str(), (
int)target.port(), e.what());
171 "UdpTransport::send_buffer: Sent %d byte packet to %s:%d\n",
172 (
int)actual_sent, target.address().to_string().c_str(),
175 bytes_sent += actual_sent;
199 return (
long)bytes_sent;
205 static const char print_prefix[] =
"UdpTransport::send_message";
207 uint64_t bytes_sent = 0;
215 " fragmenting %" PRIu64
" byte packet (%" PRIu32
216 " bytes is max fragment size)\n",
225 for(FragmentMap::iterator i = map.begin(); i != map.end(); ++i, ++j)
229 " Sending fragment %d\n",
237 address, i->second.get(),
249 " Sent fragments totalling %" PRIu64
" bytes\n",
250 print_prefix, bytes_sent);
258 " Sending packet of size %ld\n",
259 print_prefix, packet_size);
263 size_t addr_index = &address - &*
addresses_.begin();
268 " Deciding to send to %s:%d (index %d): %d\n",
269 print_prefix, address.address().to_string().c_str(),
270 (
int)address.port(), addr_index, should_send);
274 bytes_sent +=
send_buffer(address, buf, (
size_t)packet_size);
286 " Send bandwidth = %" PRIu64
" B/s\n",
289 return (
long)bytes_sent;
296 const char* print_prefix =
"UdpTransport::send_data";
300 result =
prep_send(orig_updates, print_prefix);
#define madara_logger_log(loggering, level,...)
Fast version of the madara::logger::log method.
This class provides a distributed knowledge base to users.
void use(ThreadSafeContext &original)
Refer to and use another knowledge base's context.
This class stores variables and their values for use by any entity needing state information in a thr...
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
void set_name(const std::string &var_name, KnowledgeBase &knowledge)
Sets the variable name that this refers to.
void run(const std::string &name, BaseThread *thread, bool paused=false)
Starts a new thread and executes the provided user thread once.
uint64_t get_bytes_per_second(void)
Queries the monitor for the current bandwidth utilization per second over the past window.
void add(uint64_t size)
Adds a message to the monitor.
madara::utility::ScopedArray< char > buffer_
buffer for sending
QoSTransportSettings settings_
madara::knowledge::ThreadSafeContext & context_
const std::string id_
host:port identifier of this process
BandwidthMonitor send_monitor_
monitor for sending bandwidth usage
long prep_send(const knowledge::KnowledgeMap &orig_updates, const char *print_prefix)
Preps a message for sending.
void invalidate_transport(void)
Invalidates a transport to indicate it is shutting down.
threads::Threader read_threads_
threads for reading knowledge updates
virtual int setup_write_socket()
virtual int setup_read_socket()
std::vector< udp::endpoint > addresses_
holds all multicast addresses we are sending to
udp::socket socket_
underlying socket
int setup() override
all subclasses should call this method at the end of its setup
Holds basic transport settings.
double max_send_hertz
Maximum rate of sending messages.
uint32_t max_fragment_size
Maximum allowed fragment size for partitioning large messages.
uint32_t queue_length
Length of the buffer used to store history of events.
std::string debug_to_kb_prefix
if not empty, save debug information to knowledge base at prefix
std::string write_domain
All class members are accessible to users for easy setup.
bool no_sending
if true, never send over transport
int resend_attempts
Maximum number of attempts to resend if transport is busy.
double slack_time
Time to sleep between sends and rebroadcasts.
knowledge::containers::Integer sent_packets
sent packets
long send_data(const madara::knowledge::KnowledgeMap &updates) override
Sends a list of knowledge updates to listeners.
int setup_write_socket() override
int reliability(void) const
Accesses reliability setting.
knowledge::containers::Integer sent_data_min
min data sent
virtual bool pre_send_buffer(size_t addr_index)
utility::EpochEnforcer< utility::Clock > enforcer_
enforces epochs when user specifies a max_send_hertz
knowledge::containers::Integer sent_data
sent data
int setup_read_socket() override
long receive_buffer(char *buf, size_t &bytes_read, udp::endpoint &remote)
Receives a buffer from a remote host.
friend class UdpTransportReadThread
long send_message(const char *buf, size_t size, uint64_t clock)
int setup_read_thread(double hertz, const std::string &name) override
knowledge::containers::Integer failed_sends
failed sends
UdpTransport(const std::string &id, madara::knowledge::ThreadSafeContext &context, TransportSettings &config, bool launch_transport)
long send_buffer(const udp::endpoint &target, const char *buf, size_t size)
Sends a buffer to a host endpoint.
knowledge::containers::Integer sent_data_max
max data sent
void sleep_until_next(void)
Sleeps until the next epoch.
T * get_ptr(void)
get the underlying pointer
::std::map< std::string, KnowledgeRecord > KnowledgeMap
std::map< uint32_t, utility::ScopedArray< const char > > FragmentMap
Map of fragment identifiers to fragments.
MADARA_EXPORT void frag(const char *source, uint64_t total_size, const char *originator, const char *domain, uint64_t clock, uint64_t timestamp, uint32_t quality, unsigned char ttl, uint64_t fragment_size, FragmentMap &map)
Breaks a large packet into smaller packets.
MADARA_EXPORT void delete_fragments(FragmentMap &map)
Deletes fragments within a fragment map and clears the map.
double sleep(double sleep_time)
Sleeps for a certain amount of time.
int64_t get_time(void)
Returns a time of day in nanoseconds If simtime feature is enabled, this may be simulation time inste...
Copyright(c) 2020 Galois.