MADARA  3.4.1
UdpTransport.cpp
Go to the documentation of this file.
4 
8 
9 #include <iostream>
10 
11 namespace madara
12 {
13 namespace transport
14 {
17  bool launch_transport)
18  : BasicASIOTransport(id, context, config),
19  enforcer_(1 / config.max_send_hertz)
20 {
21  if(launch_transport)
22  setup();
23 
24  if(config.debug_to_kb_prefix != "")
25  {
27  kb.use(context);
28 
29  sent_packets.set_name(config.debug_to_kb_prefix + ".sent_packets", kb);
30  failed_sends.set_name(config.debug_to_kb_prefix + ".failed_sends", kb);
31  sent_data_max.set_name(config.debug_to_kb_prefix + ".sent_data_max", kb);
32  sent_data_min.set_name(config.debug_to_kb_prefix + ".sent_data_min", kb);
33  sent_data.set_name(config.debug_to_kb_prefix + ".sent_data", kb);
34  }
35 }
36 
38 {
39  return BEST_EFFORT;
40 }
41 
43 {
44  return BEST_EFFORT;
45 }
46 
47 int UdpTransport::setup_read_thread(double hertz, const std::string& name)
48 {
50  "UdpTransport::setup_read_thread:"
51  " Starting UdpTransport read thread: %s\n",
52  name.c_str());
53  read_threads_.run(hertz, name, new UdpTransportReadThread(*this));
54 
55  return 0;
56 }
57 
59 {
61  {
62  return -1;
63  }
64 
65  try
66  {
67  socket_.non_blocking(true);
68 
69  socket_.bind(udp::endpoint(ip::address_v4::any(), addresses_[0].port()));
70 
72  "UdpTransport::setup_read_socket:"
73  " Bound to port: %d\n",
74  (int)addresses_[0].port());
75  }
76  catch (const boost::system::system_error& e)
77  {
79  "UdpTransport::setup:"
80  " Error setting up read socket: %s\n",
81  e.what());
82 
83  this->invalidate_transport();
84  return -1;
85  }
86 
87  return 0;
88 }
89 
91 {
93  {
94  return -1;
95  }
96 
97  return 0;
98 }
99 
100 
102  char* buf, size_t & bytes_read, udp::endpoint& remote)
103 {
104  boost::system::error_code err;
105  bytes_read = socket_.receive_from(
106  asio::buffer((void*)buf, settings_.queue_length), remote,
107  udp::socket::message_flags{}, err);
108 
109  if (err == asio::error::would_block || bytes_read == 0)
110  {
112  "UdpTransport::receive_buffer: "
113  " no bytes to read. Proceeding to next wait\n");
114 
115  return 1;
116  }
117  else if (err)
118  {
120  "UdpTransport::receive_buffer: unexpected error: %s. "
121  "Proceeding to next wait\n",
122  err.message().c_str());
123 
124  return 2;
125  }
126 
127  return 0;
128 }
129 
131  const udp::endpoint& target, const char* buf, size_t size)
132 {
133  uint64_t bytes_sent = 0;
134 
135  int send_attempts = -1;
136  ssize_t actual_sent = -1;
137 
138  while (actual_sent < 0 && (settings_.resend_attempts < 0 ||
139  send_attempts < settings_.resend_attempts))
140  {
141  if(settings_.max_send_hertz > 0)
142  {
144  }
145 
146  // send the fragment
147  try
148  {
149  actual_sent = socket_.send_to(asio::buffer(buf, size), target);
150  }
151  catch (const boost::system::system_error& e)
152  {
154  "UdpTransport::send_buffer:"
155  " Error sending packet to %s:%d: %s\n",
156  target.address().to_string().c_str(), (int)target.port(), e.what());
157 
158  // ensure erroneous data is not being used
159  actual_sent = -1;
160  }
161 
162  ++send_attempts;
163  if(settings_.debug_to_kb_prefix != "")
164  {
165  ++sent_packets;
166  }
167 
168  if(actual_sent > 0)
169  {
171  "UdpTransport::send_buffer: Sent %d byte packet to %s:%d\n",
172  (int)actual_sent, target.address().to_string().c_str(),
173  (int)target.port());
174 
175  bytes_sent += actual_sent;
176 
177  if(settings_.debug_to_kb_prefix != "")
178  {
179  sent_data += actual_sent;
180  if(sent_data_max < actual_sent)
181  {
182  sent_data_max = actual_sent;
183  }
184  if(sent_data_min > actual_sent || sent_data_min == 0)
185  {
186  sent_data_min = actual_sent;
187  }
188  }
189  }
190  else
191  {
192  if(settings_.debug_to_kb_prefix != "")
193  {
194  ++failed_sends;
195  }
196  }
197  }
198 
199  return (long)bytes_sent;
200 }
201 
202 long UdpTransport::send_message(const char* buf, size_t packet_size,
203  uint64_t clock)
204 {
205  static const char print_prefix[] = "UdpTransport::send_message";
206 
207  uint64_t bytes_sent = 0;
208 
209  if(packet_size > settings_.max_fragment_size)
210  {
211  FragmentMap map;
212 
214  "%s:"
215  " fragmenting %" PRIu64 " byte packet (%" PRIu32
216  " bytes is max fragment size)\n",
217  print_prefix, packet_size, settings_.max_fragment_size);
218 
219  // fragment the message
220  frag(buf, packet_size, id_.c_str (), settings_.write_domain.c_str(),
221  clock, utility::get_time(), 0, 0,
223 
224  int j(0);
225  for(FragmentMap::iterator i = map.begin(); i != map.end(); ++i, ++j)
226  {
228  "%s:"
229  " Sending fragment %d\n",
230  print_prefix, j);
231 
232  for(const auto& address : addresses_)
233  {
234  if(pre_send_buffer(&address - &*addresses_.begin()))
235  {
236  bytes_sent += send_buffer(
237  address, i->second.get(),
238  (size_t)MessageHeader::get_size(i->second.get()));
239  }
240  }
241 
242  // sleep between fragments, if such a slack time is specified
243  if(settings_.slack_time > 0)
245  }
246 
248  "%s:"
249  " Sent fragments totalling %" PRIu64 " bytes\n",
250  print_prefix, bytes_sent);
251 
252  delete_fragments(map);
253  }
254  else
255  {
257  "%s:"
258  " Sending packet of size %ld\n",
259  print_prefix, packet_size);
260 
261  for(const auto& address : addresses_)
262  {
263  size_t addr_index = &address - &*addresses_.begin();
264  bool should_send = pre_send_buffer(addr_index);
265 
267  "%s:"
268  " Deciding to send to %s:%d (index %d): %d\n",
269  print_prefix, address.address().to_string().c_str(),
270  (int)address.port(), addr_index, should_send);
271 
272  if(should_send)
273  {
274  bytes_sent += send_buffer(address, buf, (size_t)packet_size);
275  }
276  }
277  }
278 
279  if(bytes_sent > 0)
280  {
281  send_monitor_.add((uint32_t)bytes_sent);
282  }
283 
285  "%s:"
286  " Send bandwidth = %" PRIu64 " B/s\n",
287  print_prefix, send_monitor_.get_bytes_per_second());
288 
289  return (long)bytes_sent;
290 }
291 
293  const knowledge::KnowledgeMap& orig_updates)
294 {
295  long result(0);
296  const char* print_prefix = "UdpTransport::send_data";
297 
298  if(!settings_.no_sending && orig_updates.size() != 0)
299  {
300  result = prep_send(orig_updates, print_prefix);
301 
302  if(addresses_.size() > 0 && result > 0)
303  {
304  result = send_message(
305  buffer_.get_ptr(), result, orig_updates.begin()->second.clock);
306  }
307  }
308 
309  return result;
310 }
311 }
312 }
#define madara_logger_log(loggering, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
This class provides a distributed knowledge base to users.
Definition: KnowledgeBase.h:45
void use(ThreadSafeContext &original)
Refer to and use another knowledge base's context.
This class stores variables and their values for use by any entity needing state information in a thr...
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
void set_name(const std::string &var_name, KnowledgeBase &knowledge)
Sets the variable name that this refers to.
Definition: Integer.inl:54
void run(const std::string &name, BaseThread *thread, bool paused=false)
Starts a new thread and executes the provided user thread once.
Definition: Threader.cpp:62
uint64_t get_bytes_per_second(void)
Queries the monitor for the current bandwidth utilization per second over the past window.
void add(uint64_t size)
Adds a message to the monitor.
madara::utility::ScopedArray< char > buffer_
buffer for sending
Definition: Transport.h:153
QoSTransportSettings settings_
Definition: Transport.h:132
madara::knowledge::ThreadSafeContext & context_
Definition: Transport.h:135
const std::string id_
host:port identifier of this process
Definition: Transport.h:130
BandwidthMonitor send_monitor_
monitor for sending bandwidth usage
Definition: Transport.h:144
long prep_send(const knowledge::KnowledgeMap &orig_updates, const char *print_prefix)
Preps a message for sending.
Definition: Transport.cpp:860
void invalidate_transport(void)
Invalidates a transport to indicate it is shutting down.
Definition: Transport.inl:31
threads::Threader read_threads_
threads for reading knowledge updates
std::vector< udp::endpoint > addresses_
holds all multicast addresses we are sending to
udp::socket socket_
underlying socket
int setup() override
all subclasses should call this method at the end of its setup
static uint64_t get_size(const char *buffer)
Returns the size field of the header.
Holds basic transport settings.
double max_send_hertz
Maximum rate of sending messages.
uint32_t max_fragment_size
Maximum allowed fragment size for partitioning large messages.
uint32_t queue_length
Length of the buffer used to store history of events.
std::string debug_to_kb_prefix
if not empty, save debug information to knowledge base at prefix
std::string write_domain
All class members are accessible to users for easy setup.
bool no_sending
if true, never send over transport
int resend_attempts
Maximum number of attempts to resend if transport is busy.
double slack_time
Time to sleep between sends and rebroadcasts.
knowledge::containers::Integer sent_packets
sent packets
Definition: UdpTransport.h:66
long send_data(const madara::knowledge::KnowledgeMap &updates) override
Sends a list of knowledge updates to listeners.
int reliability(void) const
Accesses reliability setting.
knowledge::containers::Integer sent_data_min
min data sent
Definition: UdpTransport.h:78
virtual bool pre_send_buffer(size_t addr_index)
Definition: UdpTransport.h:106
utility::EpochEnforcer< utility::Clock > enforcer_
enforces epochs when user specifies a max_send_hertz
Definition: UdpTransport.h:112
knowledge::containers::Integer sent_data
sent data
Definition: UdpTransport.h:72
long receive_buffer(char *buf, size_t &bytes_read, udp::endpoint &remote)
Receives a buffer from a remote host.
long send_message(const char *buf, size_t size, uint64_t clock)
int setup_read_thread(double hertz, const std::string &name) override
knowledge::containers::Integer failed_sends
failed sends
Definition: UdpTransport.h:69
UdpTransport(const std::string &id, madara::knowledge::ThreadSafeContext &context, TransportSettings &config, bool launch_transport)
long send_buffer(const udp::endpoint &target, const char *buf, size_t size)
Sends a buffer to a host endpoint.
knowledge::containers::Integer sent_data_max
max data sent
Definition: UdpTransport.h:75
void sleep_until_next(void)
Sleeps until the next epoch.
Definition: EpochEnforcer.h:91
T * get_ptr(void)
get the underlying pointer
Definition: ScopedArray.inl:64
constexpr string_t string
::std::map< std::string, KnowledgeRecord > KnowledgeMap
std::map< uint32_t, utility::ScopedArray< const char > > FragmentMap
Map of fragment identifiers to fragments.
MADARA_EXPORT void frag(const char *source, uint64_t total_size, const char *originator, const char *domain, uint64_t clock, uint64_t timestamp, uint32_t quality, unsigned char ttl, uint64_t fragment_size, FragmentMap &map)
Breaks a large packet into smaller packets.
MADARA_EXPORT void delete_fragments(FragmentMap &map)
Deletes fragments within a fragment map and clears the map.
double sleep(double sleep_time)
Sleeps for a certain amount of time.
Definition: Utility.cpp:555
int64_t get_time(void)
Returns a time of day in nanoseconds If simtime feature is enabled, this may be simulation time inste...
Definition: Utility.inl:265
Copyright(c) 2020 Galois.