MADARA  3.4.1
UdpTransportReadThread.cpp
Go to the documentation of this file.
2 
5 
6 #include <iostream>
7 
8 namespace madara
9 {
10 namespace transport
11 {
13  : transport_(transport)
14 {
15 }
16 
18 {
19  const QoSTransportSettings& settings_ = transport_.settings_;
20 
21  context_ = &(knowledge.get_context());
22 
23  // setup the receive buffer
24  if (settings_.queue_length > 0)
25  buffer_ = new char[settings_.queue_length];
26 
28  "UdpTransportReadThread::init:"
29  " UdpTransportReadThread started with queue length %d\n",
30  settings_.queue_length);
31 
32  if (context_)
33  {
34  // check for an on_data_received ruleset
35  if (settings_.on_data_received_logic.length() != 0)
36  {
38  "UdpTransportReadThread::init:"
39  " setting rules to %s\n",
40  settings_.on_data_received_logic.c_str());
41 
42 #ifndef _MADARA_NO_KARL_
43  expression::Interpreter interpreter;
45 #endif // _MADARA_NO_KARL_
46  }
47  else
48  {
50  "UdpTransportReadThread::init:"
51  " no permanent rules were set\n");
52  }
53 
54  if (settings_.debug_to_kb_prefix != "")
55  {
57  kb.use(*context_);
59  settings_.debug_to_kb_prefix + ".received_packets", kb);
61  settings_.debug_to_kb_prefix + ".failed_receives", kb);
63  settings_.debug_to_kb_prefix + ".received_data_max", kb);
65  settings_.debug_to_kb_prefix + ".received_data_min", kb);
67  settings_.debug_to_kb_prefix + ".received_data", kb);
68  }
69  }
70 }
71 
73 
74 void UdpTransportReadThread::rebroadcast(const char* print_prefix,
75  MessageHeader* header, const knowledge::KnowledgeMap& records)
76 {
77  const QoSTransportSettings& settings_ = transport_.settings_;
78 
79  int64_t buffer_remaining = (int64_t)settings_.queue_length;
80  char* buffer = buffer_.get_ptr();
81  int result(0);
82 
83  if (!settings_.no_sending && records.size () > 0)
84  {
85  result = prep_rebroadcast(*context_, buffer, buffer_remaining, settings_,
86  print_prefix, header, records, transport_.packet_scheduler_);
87 
88  if (result > 0)
89  {
90  uint64_t clock = records.begin()->second.clock;
91  result = transport_.send_message(buffer, result, clock);
92 
93  if (result > 0)
94  {
96  if (settings_.debug_to_kb_prefix != "")
97  {
98  transport_.sent_data += result;
100  if (transport_.sent_data_max < result)
101  {
102  transport_.sent_data_max = result;
103  }
104  if (transport_.sent_data_min > result ||
106  {
107  transport_.sent_data_min = result;
108  }
109  }
110  }
111  else
112  {
113  if (settings_.debug_to_kb_prefix != "")
114  {
116  }
117  }
118 
120  "%s:"
121  " Send bandwidth = %" PRIu64 " B/s\n",
123  }
124  }
125 }
126 
128 {
129  const QoSTransportSettings& settings_ = transport_.settings_;
130 
131  if (settings_.no_receiving)
132  {
133  return;
134  }
135 
136  // allocate a buffer to send
137  char* buffer = buffer_.get_ptr();
138  static const char print_prefix[] = "UdpTransportReadThread::run";
139 
141  "%s:"
142  " entering main service loop.\n",
143  print_prefix);
144 
145  if (buffer == 0)
146  {
148  "%s:"
149  " Unable to allocate buffer of size %" PRIu32 ". Exiting thread.\n",
150  print_prefix, settings_.queue_length);
151 
152  return;
153  }
154 
156  "%s: entering a recv on the socket.\n", print_prefix);
157 
158  udp::endpoint remote;
159  boost::system::error_code err;
160  size_t bytes_read = transport_.socket_.receive_from(
161  asio::buffer((void*)buffer, settings_.queue_length), remote,
162  udp::socket::message_flags{}, err);
163 
164  if (err == asio::error::would_block || bytes_read == 0)
165  {
167  "%s: no bytes to read. Proceeding to next wait\n", print_prefix);
168 
169  if (settings_.debug_to_kb_prefix != "")
170  {
172  }
173 
174  return;
175  }
176  else if (err)
177  {
179  "%s: unexpected error: %s. Proceeding to next wait\n", print_prefix,
180  err.message().c_str());
181 
182  if (settings_.debug_to_kb_prefix != "")
183  {
185  }
186 
187  return;
188  }
189 
190  if (settings_.debug_to_kb_prefix != "")
191  {
192  received_data_ += bytes_read;
194 
195  if (received_data_max_ < bytes_read)
196  {
197  received_data_max_ = bytes_read;
198  }
199  if (received_data_min_ > bytes_read || received_data_min_ == 0)
200  {
201  received_data_min_ = bytes_read;
202  }
203  }
204 
205  if (remote.address().to_string() != "")
206  {
208  "%s:"
209  " received a message header of %lld bytes from %s:%d\n",
210  print_prefix, (long long)bytes_read,
211  remote.address().to_string().c_str(), (int)remote.port());
212  }
213  else
214  {
216  "%s:"
217  " received %lld bytes from unknown host\n",
218  print_prefix, (long long)bytes_read);
219  }
220 
221  MessageHeader* header = 0;
222 
223  std::stringstream remote_host;
224  remote_host << remote.address().to_string();
225  remote_host << ":";
226  remote_host << remote.port();
227 
228  knowledge::KnowledgeMap rebroadcast_records;
229 
230  process_received_update(buffer, (uint32_t)bytes_read, transport_.id_,
231  *context_, settings_, transport_.send_monitor_,
232  transport_.receive_monitor_, rebroadcast_records,
233 #ifndef _MADARA_NO_KARL_
235 #endif // _MADARA_NO_KARL_
236  print_prefix, remote_host.str().c_str(), header);
237 
238  if (header)
239  {
240  if (header->ttl > 0 && rebroadcast_records.size() > 0 &&
241  settings_.get_participant_ttl() > 0)
242  {
243  --header->ttl;
244  header->ttl = std::min(settings_.get_participant_ttl(), header->ttl);
245 
246  rebroadcast(print_prefix, header, rebroadcast_records);
247  }
248 
249  // delete header
250  delete header;
251  }
252 
254  "%s:"
255  " finished iteration.\n",
256  print_prefix);
257 }
258 }
259 }
#define madara_logger_ptr_log(loggering, level,...)
Fast version of the madara::logger::log method for Logger pointers.
Definition: Logger.h:41
#define madara_logger_log(loggering, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
Parses incoming expression strings into a parse tree and generates an expression tree from the parse ...
Definition: Interpreter.h:43
This class provides a distributed knowledge base to users.
Definition: KnowledgeBase.h:45
void use(ThreadSafeContext &original)
Refer to and use another knowledge base's context.
CompiledExpression compile(const std::string &expression)
Compiles a KaRL expression into an expression tree.
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
void set_name(const std::string &var_name, KnowledgeBase &knowledge)
Sets the variable name that this refers to.
Definition: Integer.inl:54
uint64_t get_bytes_per_second(void)
Queries the monitor for the current bandwidth utilization per second over the past window.
void add(uint64_t size)
Adds a message to the monitor.
QoSTransportSettings settings_
Definition: Transport.h:132
const std::string id_
host:port identifier of this process
Definition: Transport.h:130
BandwidthMonitor send_monitor_
monitor for sending bandwidth usage
Definition: Transport.h:144
PacketScheduler packet_scheduler_
scheduler for dropping packets to simulate network issues
Definition: Transport.h:150
BandwidthMonitor receive_monitor_
monitor for receiving bandwidth usage
Definition: Transport.h:147
udp::socket socket_
underlying socket
Defines a robust message header which is the default for KaRL messages.
Definition: MessageHeader.h:57
virtual std::string to_string(void)
Converts the relevant fields to a printable string.
unsigned char ttl
time to live (number of rebroadcasts to perform after original send
Container for quality-of-service settings.
unsigned char get_participant_ttl(void) const
Returns the maximum time to live participation of this transport in rebroadcasting of other agent's m...
bool no_receiving
if true, never receive over transport
std::string on_data_received_logic
Logic to be evaluated after every successful update.
uint32_t queue_length
Length of the buffer used to store history of events.
std::string debug_to_kb_prefix
if not empty, save debug information to knowledge base at prefix
bool no_sending
if true, never send over transport
knowledge::containers::Integer received_data_
received data
knowledge::containers::Integer received_packets_
received packets
void cleanup(void) override
Cleanup function called by thread manager.
madara::knowledge::CompiledExpression on_data_received_
data received rules, defined in Transport settings
madara::utility::ScopedArray< char > buffer_
buffer for receiving
knowledge::containers::Integer received_data_max_
max data received
knowledge::containers::Integer received_data_min_
min data received
void init(knowledge::KnowledgeBase &knowledge) override
Initializes MADARA context-related items.
void run(void) override
The main loop internals for the read thread.
knowledge::containers::Integer failed_receives_
bad receives
void rebroadcast(const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records)
Sends a rebroadcast packet.
UDP-based transport for knowledge.
Definition: UdpTransport.h:39
knowledge::containers::Integer sent_packets
sent packets
Definition: UdpTransport.h:66
knowledge::containers::Integer sent_data_min
min data sent
Definition: UdpTransport.h:78
knowledge::containers::Integer sent_data
sent data
Definition: UdpTransport.h:72
long send_message(const char *buf, size_t size, uint64_t clock)
knowledge::containers::Integer failed_sends
failed sends
Definition: UdpTransport.h:69
knowledge::containers::Integer sent_data_max
max data sent
Definition: UdpTransport.h:75
T * get_ptr(void)
get the underlying pointer
Definition: ScopedArray.inl:64
Provides functions and classes for the distributed knowledge base.
T get(const KnowledgeRecord &kr)
Get the value of a KnowlegeRecord.
Definition: GetRecord.h:121
::std::map< std::string, KnowledgeRecord > KnowledgeMap
MADARA_EXPORT utility::Refcounter< logger::Logger > global_logger
int process_received_update(const char *buffer, uint32_t bytes_read, const std::string &id, knowledge::ThreadSafeContext &context, const QoSTransportSettings &settings, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, knowledge::KnowledgeMap &rebroadcast_records, knowledge::CompiledExpression &on_data_received, const char *print_prefix, const char *remote_host, MessageHeader *&header)
Processes a received update, updates monitors, fills rebroadcast records according to settings filter...
Definition: Transport.cpp:109
int prep_rebroadcast(knowledge::ThreadSafeContext &context, char *buffer, int64_t &buffer_remaining, const QoSTransportSettings &settings, const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records, PacketScheduler &packet_scheduler)
Preps a buffer for rebroadcasting records to other agents on the network.
Definition: Transport.cpp:791
Copyright(c) 2020 Galois.