MADARA  3.2.3
UdpTransport.cpp
Go to the documentation of this file.
4 
8 
9 #include <iostream>
10 
11 namespace madara { namespace transport {
12 
15  TransportSettings & config, bool launch_transport)
16 : BasicASIOTransport (id, context, config)
17 {
18  if (launch_transport)
19  setup ();
20 }
21 
22 int
24 {
25  return BEST_EFFORT;
26 }
27 
28 int
30 {
31  return BEST_EFFORT;
32 }
33 
34 int
36  double hertz, const std::string &name)
37 {
39  "UdpTransport::setup_read_thread:" \
40  " Starting UdpTransport read thread: %s\n", name.c_str ());
41  read_threads_.run (hertz, name,
42  new UdpTransportReadThread (*this));
43 
44  return 0;
45 }
46 
47 int
49 {
51  return -1;
52  }
53 
54  try {
55  socket_.non_blocking(true);
56 
57  socket_.bind(udp::endpoint (ip::address_v4::any (),
58  addresses_[0].port ()));
59 
61  "UdpTransport::setup_read_socket:" \
62  " Bound to port: %d\n", (int)addresses_[0].port ());
63  } catch (const boost::system::system_error &e) {
65  "UdpTransport::setup:" \
66  " Error setting up read socket: %s\n", e.what ());
67 
68  this->invalidate_transport ();
69  return -1;
70  }
71 
72  return 0;
73 }
74 
75 int
77 {
79  return -1;
80  }
81 
82  return 0;
83 }
84 
85 long
87  const udp::endpoint &target, const char *buf, size_t size)
88 {
89  uint64_t bytes_sent = 0;
90 
91  int send_attempts = -1;
92  ssize_t actual_sent = -1;
93 
94  while (actual_sent < 0 &&
96  send_attempts < settings_.resend_attempts))
97  {
98 
99  // send the fragment
100  try {
101  actual_sent = socket_.send_to (
102  asio::buffer(buf, size), target);
103  } catch (const boost::system::system_error &e) {
105  "UdpTransport::send_buffer:" \
106  " Error sending packet to %s:%d: %s\n",
107  target.address ().to_string ().c_str (), (int)target.port (),
108  e.what ());
109  }
110 
111  ++send_attempts;
112 
113  if (actual_sent > 0)
114  {
116  "UdpTransport::send_buffer: Sent %d byte packet to %s:%d\n",
117  (int)actual_sent,
118  target.address ().to_string ().c_str (),
119  (int)target.port ());
120 
121  bytes_sent += actual_sent;
122  }
123  }
124 
125  return (long)bytes_sent;
126 }
127 
128 long
129 UdpTransport::send_message (const char *buf, size_t packet_size)
130 {
131  static const char print_prefix[] = "UdpTransport::send_message";
132 
133  uint64_t bytes_sent = 0;
134 
135  if (packet_size > settings_.max_fragment_size)
136  {
137  FragmentMap map;
138 
140  "%s:" \
141  " fragmenting %" PRIu64 " byte packet (%" PRIu32 " bytes is max fragment size)\n",
142  print_prefix, packet_size, settings_.max_fragment_size);
143 
144  // fragment the message
145  frag (buf, settings_.max_fragment_size, map);
146 
147  int j (0);
148  for (FragmentMap::iterator i = map.begin (); i != map.end (); ++i)
149  {
151  "%s:" \
152  " Sending fragment %d\n",
153  print_prefix, j);
154 
155  for (const auto &address : addresses_)
156  {
157  if (pre_send_buffer (&address - &*addresses_.begin ())) {
158  bytes_sent += send_buffer(address, i->second,
159  (size_t)MessageHeader::get_size (i->second));
160  }
161  }
162 
163  // sleep between fragments, if such a slack time is specified
164  if (settings_.slack_time > 0)
166  }
167 
169  "%s:" \
170  " Sent fragments totalling %" PRIu64 " bytes\n",
171  print_prefix, bytes_sent);
172 
173  delete_fragments (map);
174  }
175  else
176  {
178  "%s:" \
179  " Sending packet of size %ld\n",
180  print_prefix, packet_size);
181 
182  for (const auto &address : addresses_)
183  {
184  size_t addr_index = &address - &*addresses_.begin ();
185  bool should_send = pre_send_buffer(addr_index);
186 
188  "%s:" \
189  " Deciding to send to %s:%d (index %d): %d\n",
190  print_prefix,
191  address.address ().to_string ().c_str (),
192  (int)address.port (),
193  addr_index, should_send);
194 
195  if (should_send) {
196  bytes_sent += send_buffer(address, buf, (size_t)packet_size);
197  }
198  }
199  }
200 
201  if (bytes_sent > 0)
202  {
203  send_monitor_.add ((uint32_t)bytes_sent);
204  }
205 
206 
208  "%s:" \
209  " Send bandwidth = %" PRIu64 " B/s\n",
210  print_prefix, send_monitor_.get_bytes_per_second ());
211 
212  return (long) bytes_sent;
213 }
214 
215 long
217  const knowledge::VariableReferenceMap & orig_updates)
218 {
219  long result (0);
220  const char * print_prefix = "UdpTransport::send_data";
221 
222  if (!settings_.no_sending)
223  {
224  result = prep_send (orig_updates, print_prefix);
225 
226  if (addresses_.size () > 0 && result > 0)
227  {
228  result = send_message (buffer_.get_ptr (), result);
229  }
230  }
231 
232  return result;
233 }
234 
235 } }
QoSTransportSettings settings_
Definition: Transport.h:133
long send_message(const char *buf, size_t size)
uint64_t get_bytes_per_second(void)
Queries the monitor for the current bandwidth utilization per second over the past window...
UdpTransport(const std::string &id, madara::knowledge::ThreadSafeContext &context, TransportSettings &config, bool launch_transport)
void invalidate_transport(void)
Invalidates a transport to indicate it is shutting down.
Definition: Transport.inl:33
udp::socket socket_
underlying socket
This class stores variables and their values for use by any entity needing state information in a thr...
long send_buffer(const udp::endpoint &target, const char *buf, size_t size)
Holds basic transport settings.
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
virtual bool pre_send_buffer(size_t addr_index)
Definition: UdpTransport.h:70
void run(const std::string name, BaseThread *thread, bool paused=false)
Starts a new thread and executes the provided user thread once.
Definition: Threader.cpp:71
double sleep(double sleep_time)
Sleeps for a certain amount of time.
Definition: Utility.cpp:619
static struct madara::knowledge::tags::string_t string
T * get_ptr(void)
get the underlying pointer
Definition: ScopedArray.inl:68
static uint64_t get_size(const char *buffer)
Returns the size field of the header.
bool no_sending
if true, never send over transport
void add(uint64_t size)
Adds a message to the monitor.
std::vector< udp::endpoint > addresses_
holds all multicast addresses we are sending to
double slack_time
time to sleep between sends and rebroadcasts
MADARA_EXPORT void delete_fragments(FragmentMap &map)
Deletes fragments within a fragment map and clears the map.
int reliability(void) const
Accesses reliability setting.
std::map< const char *, VariableReference, VariableReferenceMapCompare > VariableReferenceMap
a map of variable references
threads::Threader read_threads_
threads for reading knowledge updates
int setup() override
all subclasses should call this method at the end of its setup
int setup_read_thread(double hertz, const std::string &name) override
std::map< uint32_t, const char * > FragmentMap
Map of fragment identifiers to fragments.
Copyright (c) 2015 Carnegie Mellon University.
madara::utility::ScopedArray< char > buffer_
buffer for sending
Definition: Transport.h:154
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
long prep_send(const knowledge::VariableReferenceMap &orig_updates, const char *print_prefix)
Preps a message for sending.
Definition: Transport.cpp:812
long send_data(const madara::knowledge::VariableReferenceMap &updates) override
Sends a list of knowledge updates to listeners.
Thread for reading knowledge updates through a UDP socket.
uint32_t max_fragment_size
Maximum allowed fragment size for partitioning large messages.
BandwidthMonitor send_monitor_
monitor for sending bandwidth usage
Definition: Transport.h:145
madara::knowledge::ThreadSafeContext & context_
Definition: Transport.h:136
int resend_attempts
Maximum number of attempts to resend if transport is busy.
MADARA_EXPORT void frag(const char *source, uint32_t fragment_size, FragmentMap &map)
Breaks a large packet into smaller packets.