MADARA  3.2.3
UdpTransportReadThread.cpp
Go to the documentation of this file.
2 
5 
6 #include <iostream>
7 
8 namespace madara { namespace transport {
9 
11  UdpTransport &transport)
12  : transport_ (transport)
13 {
14 }
15 
16 void
19 {
20  const QoSTransportSettings &settings_ = transport_.settings_;
21 
22  context_ = &(knowledge.get_context ());
23 
24  // setup the receive buffer
25  if (settings_.queue_length > 0)
26  buffer_ = new char [settings_.queue_length];
27 
29  "UdpTransportReadThread::init:" \
30  " UdpTransportReadThread started with queue length %d\n",
31  settings_.queue_length);
32 
33  if (context_)
34  {
35  // check for an on_data_received ruleset
36  if (settings_.on_data_received_logic.length () != 0)
37  {
39  "UdpTransportReadThread::init:" \
40  " setting rules to %s\n",
41  settings_.on_data_received_logic.c_str ());
42 
43 
44 #ifndef _MADARA_NO_KARL_
45  expression::Interpreter interpreter;
47 #endif // _MADARA_NO_KARL_
48  }
49  else
50  {
52  "UdpTransportReadThread::init:" \
53  " no permanent rules were set\n");
54  }
55  }
56 }
57 
58 void
60 {
61 }
62 
63 void
65  const char * print_prefix,
66  MessageHeader * header,
67  const knowledge::KnowledgeMap & records)
68 {
69  const QoSTransportSettings &settings_ = transport_.settings_;
70 
71  int64_t buffer_remaining = (int64_t) settings_.queue_length;
72  char * buffer = buffer_.get_ptr ();
73  int result (0);
74 
75  if (!settings_.no_sending)
76  {
77  result = prep_rebroadcast (*context_, buffer, buffer_remaining,
78  settings_, print_prefix,
79  header, records,
81 
82  if (result > 0)
83  {
84  result = transport_.send_message(buffer, result);
85 
87  "%s:" \
88  " Send bandwidth = %" PRIu64 " B/s\n",
89  print_prefix,
91  }
92  }
93 }
94 
95 void
97 {
98  const QoSTransportSettings &settings_ = transport_.settings_;
99 
100  if (settings_.no_receiving) {
101  return;
102  }
103 
104  // allocate a buffer to send
105  char * buffer = buffer_.get_ptr ();
106  static const char print_prefix[] = "UdpTransportReadThread::run";
107 
109  "%s:" \
110  " entering main service loop.\n", print_prefix);
111 
112  if (buffer == 0)
113  {
115  "%s:" \
116  " Unable to allocate buffer of size " PRIu32 ". Exiting thread.\n",
117  print_prefix,
118  settings_.queue_length);
119 
120  return;
121  }
122 
124  "%s: entering a recv on the socket.\n",
125  print_prefix);
126 
127  udp::endpoint remote;
128  boost::system::error_code err;
129  size_t bytes_read = transport_.socket_.receive_from (
130  asio::buffer((void *)buffer, settings_.queue_length), remote,
131  udp::socket::message_flags{}, err);
132 
133  if (err == asio::error::would_block || bytes_read == 0)
134  {
136  "%s: no bytes to read. Proceeding to next wait\n", print_prefix);
137 
138  return;
139  } else if (err) {
141  "%s: unexpected error: %s. Proceeding to next wait\n", print_prefix,
142  err.message ().c_str ());
143 
144  return;
145  }
146 
147  if (remote.address ().to_string () != "")
148  {
151  "%s:" \
152  " received a message header of %lld bytes from %s:%d\n",
153  print_prefix,
154  (long long)bytes_read,
155  remote.address ().to_string ().c_str (), (int)remote.port ());
156  }
157  else
158  {
161  "%s:" \
162  " received %lld bytes from unknown host\n",
163  print_prefix,
164  (long long)bytes_read);
165  }
166 
167  MessageHeader * header = 0;
168 
169  std::stringstream remote_host;
170  remote_host << remote.address ().to_string ();
171  remote_host << ":";
172  remote_host << remote.port ();
173 
174  knowledge::KnowledgeMap rebroadcast_records;
175 
176  process_received_update (buffer, (uint32_t)bytes_read, transport_.id_, *context_,
178  rebroadcast_records,
179 #ifndef _MADARA_NO_KARL_
181 #endif // _MADARA_NO_KARL_
182  print_prefix,
183  remote_host.str ().c_str (), header);
184 
185  if (header)
186  {
187  if (header->ttl > 0 && rebroadcast_records.size () > 0 &&
188  settings_.get_participant_ttl () > 0)
189  {
190  --header->ttl;
191  header->ttl = std::min (
192  settings_.get_participant_ttl (), header->ttl);
193 
194  rebroadcast (print_prefix, header, rebroadcast_records);
195  }
196 
197  // delete header
198  delete header;
199  }
200 
202  "%s:" \
203  " finished iteration.\n",
204  print_prefix);
205 }
206 
207 } }
QoSTransportSettings settings_
Definition: Transport.h:133
long send_message(const char *buf, size_t size)
uint64_t get_bytes_per_second(void)
Queries the monitor for the current bandwidth utilization per second over the past window...
void rebroadcast(const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records)
Sends a rebroadcast packet.
udp::socket socket_
underlying socket
madara::knowledge::CompiledExpression on_data_received_
data received rules, defined in Transport settings
MADARA_EXPORT utility::Refcounter< logger::Logger > global_logger
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
void init(knowledge::KnowledgeBase &knowledge) override
Initializes MADARA context-related items.
BandwidthMonitor receive_monitor_
monitor for receiving bandwidth usage
Definition: Transport.h:148
virtual std::string to_string(void)
Converts the relevant fields to a printable string.
T * get_ptr(void)
get the underlying pointer
Definition: ScopedArray.inl:68
bool no_receiving
if true, never receive over transport
#define madara_logger_ptr_log(logger, level,...)
Fast version of the madara::logger::log method for Logger pointers.
Definition: Logger.h:32
This class provides a distributed knowledge base to users.
Definition: KnowledgeBase.h:45
Parses incoming expression strings into a parse tree and generates an expression tree from the parse ...
Definition: Interpreter.h:42
::std::map< std::string, KnowledgeRecord > KnowledgeMap
bool no_sending
if true, never send over transport
CompiledExpression compile(const std::string &expression)
Compiles a KaRL expression into an expression tree.
unsigned char ttl
time to live (number of rebroadcasts to perform after original send
const std::string id_
host:port identifier of this process
Definition: Transport.h:131
std::string on_data_received_logic
logic to be evaluated after every successful update
uint32_t queue_length
Length of the buffer used to store history of events.
Container for quality-of-service settings.
ThreadSafeContext & get_context(void)
Returns the ThreadSafeContext associated with this Knowledge Base.
Provides functions and classes for the distributed knowledge base.
void cleanup(void) override
Cleanup function called by thread manager.
madara::utility::ScopedArray< char > buffer_
buffer for receiving
PacketScheduler packet_scheduler_
scheduler for dropping packets to simulate network issues
Definition: Transport.h:151
Copyright (c) 2015 Carnegie Mellon University.
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
void run(void) override
The main loop internals for the read thread.
int process_received_update(const char *buffer, uint32_t bytes_read, const std::string &id, knowledge::ThreadSafeContext &context, const QoSTransportSettings &settings, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, knowledge::KnowledgeMap &rebroadcast_records, knowledge::CompiledExpression &on_data_received, const char *print_prefix, const char *remote_host, MessageHeader *&header)
Processes a received update, updates monitors, fills rebroadcast records according to settings filter...
Definition: Transport.cpp:109
Defines a robust message header which is the default for KaRL messages.
Definition: MessageHeader.h:57
UDP-based transport for knowledge.
Definition: UdpTransport.h:37
int prep_rebroadcast(knowledge::ThreadSafeContext &context, char *buffer, int64_t &buffer_remaining, const QoSTransportSettings &settings, const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records, PacketScheduler &packet_scheduler)
Preps a buffer for rebroadcasting records to other agents on the network.
Definition: Transport.cpp:737
BandwidthMonitor send_monitor_
monitor for sending bandwidth usage
Definition: Transport.h:145
unsigned char get_participant_ttl(void) const
Returns the maximum time to live participation of this transport in rebroadcasting of other agent&#39;s m...