MADARA  3.2.3
SpliceTransportReadThread.cpp
Go to the documentation of this file.
5 
6 namespace logger = madara::logger;
7 namespace utility = madara::utility;
8 
9 #include <iostream>
10 #include <sstream>
11 
13  const std::string & id,
14  const TransportSettings & settings,
16  Knowledge::UpdateDataReader_ptr & update_reader,
17  Knowledge::UpdateDataWriter_ptr & update_writer,
18  BandwidthMonitor & send_monitor,
19  BandwidthMonitor & receive_monitor,
20  PacketScheduler & packet_scheduler)
21  : id_ (id), settings_ (settings), context_ (&context),
22  update_reader_ (update_reader),
23  update_writer_ (update_writer),
24  send_monitor_ (send_monitor),
25  receive_monitor_ (receive_monitor),
26  packet_scheduler_ (packet_scheduler)
27 {
28 }
29 
30 void
33 {
34  context_ = &(knowledge.get_context ());
35 
36  // setup the receive buffer
37  if (settings_.queue_length > 0)
38  buffer_ = new char[settings_.queue_length];
39 
40  if (context_)
41  {
42  // check for an on_data_received ruleset
43  if (settings_.on_data_received_logic.length () != 0)
44  {
45 
46 #ifndef _MADARA_NO_KARL_
48  "UdpTransportReadThread::init:" \
49  " setting rules to %s\n",
51 
54 #endif // _MADARA_NO_KARL_
55  }
56  else
57  {
59  "UdpTransportReadThread::init:" \
60  " no permanent rules were set\n");
61  }
62  }
63 }
64 
65 void
67 {
68 }
69 
70 void
72  const char * print_prefix,
73  MessageHeader * header,
74  const knowledge::KnowledgeMap & records)
75 {
76  int64_t buffer_remaining = (int64_t) settings_.queue_length;
77  char * buffer = buffer_.get_ptr ();
78  unsigned long result = prep_rebroadcast (*context_, buffer, buffer_remaining,
79  settings_, print_prefix,
80  header, records,
82 
83  if (result > 0)
84  {
85  ssize_t bytes_sent (result + sizeof (Knowledge::Update));
86  DDS::ReturnCode_t dds_result;
87  DDS::InstanceHandle_t handle;
88 
89  // get the maximum quality from the updates
90  uint32_t quality = knowledge::max_quality (records);
91 
93  unsigned long long cur_clock = context_->get_clock ();
94 
95  Knowledge::Update data;
96 
97  data.buffer = Knowledge::seq_oct (result, result,
98  (unsigned char *)buffer_.get_ptr ());
99  data.clock = cur_clock;
100  data.quality = quality;
101  data.updates = DDS::ULong (records.size ());
102  data.originator = DDS::string_dup(id_.c_str ());
103  data.type = madara::transport::MULTIASSIGN;
104  data.ttl = settings_.get_rebroadcast_ttl ();
105  data.timestamp = utility::get_time ();
106  data.madara_id = DDS::string_dup(MADARA_IDENTIFIER);
107 
108  handle = update_writer_->register_instance (data);
109  dds_result = update_writer_->write (data, handle);
110 
112  "%s:" \
113  " Sent packet of size %d\n",
114  print_prefix,
115  bytes_sent);
116 
117  send_monitor_.add ((uint32_t)bytes_sent);
118 
120  "%s:" \
121  " Send bandwidth = %d B/s\n",
122  print_prefix,
124  }
125 }
126 
127 void
129 {
130  DDS::SampleInfoSeq_var infoList = new DDS::SampleInfoSeq;
131  DDS::ReturnCode_t dds_result;
132  int amount;
133  DDS::Boolean result = false;
134  const char * print_prefix = "SpliceReadThread::svc";
135  Knowledge::UpdateSeq_var update_data_list_ = new Knowledge::UpdateSeq;
136 
137  DDS::WaitSet waitset_;
138  DDS::StatusCondition_ptr condition_;
139  // Add update datareader statuscondition to waitset
140  condition_ = update_reader_->get_statuscondition ();
141  condition_->set_enabled_statuses (DDS::DATA_AVAILABLE_STATUS);
142  waitset_.attach_condition (condition_);
143 
144  ::DDS::Duration_t wait_time;
145  wait_time.sec = 3;
146  wait_time.nanosec = 0;
147 
148  // if we don't check originator for null, we get phantom sends
149  // when the program exits.
151  "%s: entering processing loop.\n", print_prefix);
152 
153  // by using conditionals, we can wait for a message for a specific time limit
154  // the conditionList would tell us which particular conditions were met, but
155  // since we've only set up the wait
156  DDS::ConditionSeq_var conditionList = new DDS::ConditionSeq();
157  result = waitset_.wait (conditionList.inout (), wait_time);
158 
160  "%s: entering a take on the DDS reader.\n", print_prefix);
161 
162  dds_result = update_reader_->take (update_data_list_, infoList, 20,
163  DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, DDS::ANY_INSTANCE_STATE);
164 
165  amount = update_data_list_->length ();
166 
167  if (amount != 0)
168  {
169  for (int i = 0; i < amount; ++i)
170  {
171  // if we are evaluating a message from ourselves, just continue
172  // to the next one. It's also possible to receive null originators
173  // from what I can only guess is the ospl daemon messing up
174  if (!update_data_list_[i].originator.val ())
175  {
176  // if we don't check originator for null, we get phantom sends
177  // when the program exits.
179  "%s: discarding null originator event.\n", print_prefix);
180 
181  continue;
182  }
183 
184  if (update_data_list_[i].type != madara::transport::MULTIASSIGN)
185  {
187  "%s: discarding non-assignment event.\n", print_prefix);
188  // we do not allow any other type than multiassign
189 
190  continue;
191  }
192 
193  knowledge::KnowledgeMap rebroadcast_records;
194  MessageHeader * header = 0;
195 
196  process_received_update ((char *)update_data_list_[i].buffer.get_buffer (),
197  update_data_list_[i].buffer.length (), id_, *context_,
198  settings_, send_monitor_, receive_monitor_, rebroadcast_records,
199  on_data_received_, print_prefix,
200  "", header);
201 
202  if (header)
203  {
204  if (header->ttl > 0 && rebroadcast_records.size () > 0 &&
205  settings_.get_participant_ttl () > 0)
206  {
207  --header->ttl;
208  header->ttl = std::min (
209  settings_.get_participant_ttl (), header->ttl);
210 
211  rebroadcast (print_prefix, header, rebroadcast_records);
212  }
213 
214  // delete header
215  delete header;
216  }
217 
218  }
219  }
220 
221  dds_result = update_reader_->return_loan (update_data_list_, infoList);
222 }
uint32_t max_quality(const KnowledgeRecords &records)
Returns the maximum quality within the records.
BandwidthMonitor & send_monitor_
monitor for sending bandwidth usage
SpliceReadThread(const std::string &id, const TransportSettings &settings, knowledge::ThreadSafeContext &context, Knowledge::UpdateDataReader_ptr &update_reader, Knowledge::UpdateDataWriter_ptr &update_writer, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, PacketScheduler &packet_scheduler)
Constructor.
::Knowledge::UpdateDataWriter_var update_writer_
The DDS data writer that we can write to.
const QoSTransportSettings settings_
Transport settings.
#define MADARA_IDENTIFIER
Definition: MessageHeader.h:22
uint64_t get_bytes_per_second(void)
Queries the monitor for the current bandwidth utilization per second over the past window...
int64_t get_time(void)
Returns a time of day in nanoseconds If simtime feature is enabled, this may be simulation time inste...
Definition: Utility.inl:253
This class stores variables and their values for use by any entity needing state information in a thr...
Provides scheduler for dropping packets.
unsigned char get_rebroadcast_ttl(void) const
Gets the time to live for rebroadcasting (number of rebroadcasts per message).
BandwidthMonitor & receive_monitor_
monitor for receiving bandwidth usage
Provides knowledge logging services to files and terminals.
Definition: GlobalLogger.h:11
Holds basic transport settings.
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
madara::utility::ScopedArray< char > buffer_
buffer for receiving
void rebroadcast(const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records)
Sends a rebroadcast packet.
knowledge::CompiledExpression on_data_received_
data received rules, defined in Transport settings
static struct madara::knowledge::tags::string_t string
T * get_ptr(void)
get the underlying pointer
Definition: ScopedArray.inl:68
void init(knowledge::KnowledgeBase &knowledge)
Initializes MADARA context-related items.
::Knowledge::UpdateDataReader_var update_reader_
The DDS data reader that we will take from.
This class provides a distributed knowledge base to users.
Definition: KnowledgeBase.h:45
Parses incoming expression strings into a parse tree and generates an expression tree from the parse ...
Definition: Interpreter.h:42
::std::map< std::string, KnowledgeRecord > KnowledgeMap
CompiledExpression compile(const std::string &expression)
Compiles a KaRL expression into an expression tree.
void add(uint64_t size)
Adds a message to the monitor.
unsigned char ttl
time to live (number of rebroadcasts to perform after original send
void cleanup(void)
Cleanup function called by thread manager.
std::string on_data_received_logic
logic to be evaluated after every successful update
uint32_t queue_length
Length of the buffer used to store history of events.
ThreadSafeContext & get_context(void)
Returns the ThreadSafeContext associated with this Knowledge Base.
Provides utility functions and classes for common tasks and needs.
Definition: IteratorImpl.h:14
Provides monitoring capability of a transport&#39;s bandwidth.
Provides functions and classes for the distributed knowledge base.
uint64_t get_clock(void) const
Atomically gets the Lamport clock.
const std::string id_
Unique identifier for this entity (e.g., host:port)
PacketScheduler & packet_scheduler_
scheduler for mimicking target network conditions
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
int process_received_update(const char *buffer, uint32_t bytes_read, const std::string &id, knowledge::ThreadSafeContext &context, const QoSTransportSettings &settings, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, knowledge::KnowledgeMap &rebroadcast_records, knowledge::CompiledExpression &on_data_received, const char *print_prefix, const char *remote_host, MessageHeader *&header)
Processes a received update, updates monitors, fills rebroadcast records according to settings filter...
Definition: Transport.cpp:109
Defines a robust message header which is the default for KaRL messages.
Definition: MessageHeader.h:57
int prep_rebroadcast(knowledge::ThreadSafeContext &context, char *buffer, int64_t &buffer_remaining, const QoSTransportSettings &settings, const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records, PacketScheduler &packet_scheduler)
Preps a buffer for rebroadcasting records to other agents on the network.
Definition: Transport.cpp:737
knowledge::ThreadSafeContext * context_
The knowledge context that we will be updating.
void run(void)
The main loop internals for the read thread.