MADARA  3.4.1
SpliceTransportReadThread.cpp
Go to the documentation of this file.
5 
6 namespace logger = madara::logger;
7 namespace utility = madara::utility;
8 
9 #include <iostream>
10 #include <sstream>
11 
13  const TransportSettings& settings,
15  Knowledge::UpdateDataReader_ptr& update_reader,
16  Knowledge::UpdateDataWriter_ptr& update_writer,
17  BandwidthMonitor& send_monitor, BandwidthMonitor& receive_monitor,
18  PacketScheduler& packet_scheduler)
19  : id_(id),
20  settings_(settings),
21  context_(&context),
22  update_reader_(update_reader),
23  update_writer_(update_writer),
24  send_monitor_(send_monitor),
25  receive_monitor_(receive_monitor),
26  packet_scheduler_(packet_scheduler)
27 {
28 }
29 
32 {
33  context_ = &(knowledge.get_context());
34 
35  // setup the receive buffer
36  if (settings_.queue_length > 0)
37  buffer_ = new char[settings_.queue_length];
38 
39  if (context_)
40  {
41  // check for an on_data_received ruleset
42  if (settings_.on_data_received_logic.length() != 0)
43  {
44 #ifndef _MADARA_NO_KARL_
46  "UdpTransportReadThread::init:"
47  " setting rules to %s\n",
48  settings_.on_data_received_logic.c_str());
49 
51  on_data_received_ = context_->compile(settings_.on_data_received_logic);
52 #endif // _MADARA_NO_KARL_
53  }
54  else
55  {
57  "UdpTransportReadThread::init:"
58  " no permanent rules were set\n");
59  }
60  }
61 }
62 
64 
66  MessageHeader* header, const knowledge::KnowledgeMap& records)
67 {
68  int64_t buffer_remaining = (int64_t)settings_.queue_length;
69  char* buffer = buffer_.get_ptr();
70  unsigned long result = prep_rebroadcast(*context_, buffer, buffer_remaining,
71  settings_, print_prefix, header, records, packet_scheduler_);
72 
73  if (result > 0)
74  {
75  ssize_t bytes_sent(result + sizeof(Knowledge::Update));
76  DDS::ReturnCode_t dds_result;
77  DDS::InstanceHandle_t handle;
78 
79  // get the maximum quality from the updates
80  uint32_t quality = knowledge::max_quality(records);
81 
83  unsigned long long cur_clock = context_->get_clock();
84 
85  Knowledge::Update data;
86 
87  data.buffer =
88  Knowledge::seq_oct(result, result, (unsigned char*)buffer_.get_ptr());
89  data.clock = cur_clock;
90  data.quality = quality;
91  data.updates = DDS::ULong(records.size());
92  data.originator = DDS::string_dup(id_.c_str());
94  data.ttl = settings_.get_rebroadcast_ttl();
95  data.timestamp = utility::get_time();
96  data.madara_id = DDS::string_dup(MADARA_IDENTIFIER);
97 
98  handle = update_writer_->register_instance(data);
99  dds_result = update_writer_->write(data, handle);
100 
102  "%s:"
103  " Sent packet of size %d\n",
104  print_prefix, bytes_sent);
105 
106  send_monitor_.add((uint32_t)bytes_sent);
107 
109  "%s:"
110  " Send bandwidth = %d B/s\n",
111  print_prefix, send_monitor_.get_bytes_per_second());
112  }
113 }
114 
116 {
117  DDS::SampleInfoSeq_var infoList = new DDS::SampleInfoSeq;
118  DDS::ReturnCode_t dds_result;
119  int amount;
120  DDS::Boolean result = false;
121  const char* print_prefix = "SpliceReadThread::svc";
122  Knowledge::UpdateSeq_var update_data_list_ = new Knowledge::UpdateSeq;
123 
124  DDS::WaitSet waitset_;
125  DDS::StatusCondition_ptr condition_;
126  // Add update datareader statuscondition to waitset
127  condition_ = update_reader_->get_statuscondition();
128  condition_->set_enabled_statuses(DDS::DATA_AVAILABLE_STATUS);
129  waitset_.attach_condition(condition_);
130 
131  ::DDS::Duration_t wait_time;
132  wait_time.sec = 3;
133  wait_time.nanosec = 0;
134 
135  // if we don't check originator for null, we get phantom sends
136  // when the program exits.
138  "%s: entering processing loop.\n", print_prefix);
139 
140  // by using conditionals, we can wait for a message for a specific time limit
141  // the conditionList would tell us which particular conditions were met, but
142  // since we've only set up the wait
143  DDS::ConditionSeq_var conditionList = new DDS::ConditionSeq();
144  result = waitset_.wait(conditionList.inout(), wait_time);
145 
147  "%s: entering a take on the DDS reader.\n", print_prefix);
148 
149  dds_result = update_reader_->take(update_data_list_, infoList, 20,
150  DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, DDS::ANY_INSTANCE_STATE);
151 
152  amount = update_data_list_->length();
153 
154  if (amount != 0)
155  {
156  for (int i = 0; i < amount; ++i)
157  {
158  // if we are evaluating a message from ourselves, just continue
159  // to the next one. It's also possible to receive null originators
160  // from what I can only guess is the ospl daemon messing up
161  if (!update_data_list_[i].originator.val())
162  {
163  // if we don't check originator for null, we get phantom sends
164  // when the program exits.
166  "%s: discarding null originator event.\n", print_prefix);
167 
168  continue;
169  }
170 
171  if (update_data_list_[i].type != madara::transport::MULTIASSIGN)
172  {
174  "%s: discarding non-assignment event.\n", print_prefix);
175  // we do not allow any other type than multiassign
176 
177  continue;
178  }
179 
180  knowledge::KnowledgeMap rebroadcast_records;
181  MessageHeader* header = 0;
182 
183  process_received_update((char*)update_data_list_[i].buffer.get_buffer(),
184  update_data_list_[i].buffer.length(), id_, *context_, settings_,
185  send_monitor_, receive_monitor_, rebroadcast_records,
186  on_data_received_, print_prefix, "", header);
187 
188  if (header)
189  {
190  if (header->ttl > 0 && rebroadcast_records.size() > 0 &&
191  settings_.get_participant_ttl() > 0)
192  {
193  --header->ttl;
194  header->ttl = std::min(settings_.get_participant_ttl(), header->ttl);
195 
196  rebroadcast(print_prefix, header, rebroadcast_records);
197  }
198 
199  // delete header
200  delete header;
201  }
202  }
203  }
204 
205  dds_result = update_reader_->return_loan(update_data_list_, infoList);
206 }
#define madara_logger_log(loggering, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
#define MADARA_IDENTIFIER
Definition: MessageHeader.h:22
const ThreadSafeContext * context_
Parses incoming expression strings into a parse tree and generates an expression tree from the parse ...
Definition: Interpreter.h:43
This class provides a distributed knowledge base to users.
Definition: KnowledgeBase.h:45
This class stores variables and their values for use by any entity needing state information in a thr...
Provides monitoring capability of a transport's bandwidth.
Defines a robust message header which is the default for KaRL messages.
Definition: MessageHeader.h:57
unsigned char ttl
time to live (number of rebroadcasts to perform after original send
Provides scheduler for dropping packets.
void cleanup(void)
Cleanup function called by thread manager.
SpliceReadThread(const std::string &id, const TransportSettings &settings, knowledge::ThreadSafeContext &context, Knowledge::UpdateDataReader_ptr &update_reader, Knowledge::UpdateDataWriter_ptr &update_writer, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, PacketScheduler &packet_scheduler)
Constructor.
void init(knowledge::KnowledgeBase &knowledge)
Initializes MADARA context-related items.
void run(void)
The main loop internals for the read thread.
void rebroadcast(const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records)
Sends a rebroadcast packet.
Holds basic transport settings.
constexpr string_t string
Provides functions and classes for the distributed knowledge base.
uint32_t max_quality(const KnowledgeRecords &records)
Returns the maximum quality within the records.
::std::map< std::string, KnowledgeRecord > KnowledgeMap
Provides knowledge logging services to files and terminals.
Definition: GlobalLogger.h:12
int process_received_update(const char *buffer, uint32_t bytes_read, const std::string &id, knowledge::ThreadSafeContext &context, const QoSTransportSettings &settings, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, knowledge::KnowledgeMap &rebroadcast_records, knowledge::CompiledExpression &on_data_received, const char *print_prefix, const char *remote_host, MessageHeader *&header)
Processes a received update, updates monitors, fills rebroadcast records according to settings filter...
Definition: Transport.cpp:109
int prep_rebroadcast(knowledge::ThreadSafeContext &context, char *buffer, int64_t &buffer_remaining, const QoSTransportSettings &settings, const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records, PacketScheduler &packet_scheduler)
Preps a buffer for rebroadcasting records to other agents on the network.
Definition: Transport.cpp:791
Provides utility functions and classes for common tasks and needs.
Definition: IteratorImpl.h:15
int64_t get_time(void)
Returns a time of day in nanoseconds If simtime feature is enabled, this may be simulation time inste...
Definition: Utility.inl:265