MADARA  3.4.1
SpliceDataReaderListener.cpp
Go to the documentation of this file.
2 #include "madara/utility/LogMacros.h"
5 
6 #include <iostream>
7 #include <sstream>
8 
11  : id_(id), context_(context)
12 {
13 }
14 
16  const SpliceDataReaderListener& ref)
17  : id_(ref.id_), context_(ref.context_)
18 {
19 }
20 
22 
24  Knowledge::Update& data)
25 {
26  if (data.key.val())
27  {
28  // if we aren't evaluating a message from ourselves, process it
29  std::string key = data.key.val();
30  long long value = data.value;
31  int result = 0;
32 
34  "SpliceDataReaderListener::handle_assignment:"
35  " waiting to process assignment\n");
36 
37  context_.lock();
38  unsigned long long cur_clock = context_.get_clock(key);
39  unsigned long cur_quality = context_.get_quality(key);
40 
41  // if the data we are updating had a lower clock value or less quality
42  // then that means this update is the latest value. Among
43  // other things, this means our solution will work even
44  // without FIFO channel transports
45 
46  // if the data we are updating had a lower clock value
47  // then that means this update is the latest value. Among
48  // other things, this means our solution will work even
49  // without FIFO channel transports
50  result =
51  context_.set_if_unequal(key, value, data.quality, data.clock, false);
52 
53  context_.unlock();
54 
55  // if we actually updated the value
56  if (result == 1)
57  {
59  "SpliceDataReaderListener::handle_assignment:"
60  " received data[%s]=%q from %s\n",
61  key.c_str(), value, data.originator.val());
62  }
63  // if the data was already current
64  else if (result == 0)
65  {
67  "SpliceDataReaderListener::handle_assignment:"
68  " discarded data[%s]=%q from %s as the value was already set\n",
69  key.c_str(), value, data.originator.val());
70  }
71  else if (result == -1)
72  {
74  "SpliceDataReaderListener::handle_assignment:"
75  " discarded data due to null key\n",
76  key.c_str(), value, data.originator.val());
77  }
78  else if (result == -2)
79  {
81  "SpliceDataReaderListener::handle_assignment:"
82  " discarded data[%s]=%q due to lower quality (%u vs %u)\n",
83  key.c_str(), value, cur_quality, data.quality);
84  }
85  else if (result == -3)
86  {
88  "SpliceDataReaderListener::handle_assignment:"
89  " discarded data[%s]=%q due to older timestamp (%Q vs %Q)\n",
90  key.c_str(), value, cur_clock, data.clock);
91  }
92  }
93 }
94 
96  Knowledge::Update& data)
97 {
98  if (data.key.val())
99  {
100  std::string key;
101  char symbol;
102  long long value;
103  std::stringstream stream(data.key.val());
104 
106  "SpliceDataReaderListener::handle_multiassignment:"
107  " waiting to process multiassignment\n");
108 
109  context_.lock();
110 
112  "SpliceDataReaderListener::handle_multiassignment:"
113  " processing multiassignment (%s)\n",
114  data.key.val());
115 
116  while (!stream.eof())
117  {
118  stream >> key >> symbol >> value >> symbol;
119 
120  int result = 0;
121  unsigned long long cur_clock = context_.get_clock(key);
122  unsigned long cur_quality = context_.get_quality(key);
123 
124  // if the data we are updating had a lower clock value
125  // then that means this update is the latest value. Among
126  // other things, this means our solution will work even
127  // without FIFO channel transports
128  result =
129  context_.set_if_unequal(key, value, data.quality, data.clock, false);
130 
131  // if we actually updated the value
132  if (result == 1)
133  {
135  "SpliceDataReaderListener::handle_multiassignment:"
136  " received data[%s]=%q from %s\n",
137  key.c_str(), value, data.originator.val());
138  }
139  // if the data was already current
140  else if (result == 0)
141  {
143  "SpliceDataReaderListener::handle_multiassignment:"
144  " discarded data[%s]=%q from %s as the value was already set\n",
145  key.c_str(), value, data.originator.val());
146  }
147  else if (result == -1)
148  {
150  "SpliceDataReaderListener::handle_multiassignment:"
151  " discarded data due to null key\n",
152  key.c_str(), value, data.originator.val());
153  }
154  else if (result == -2)
155  {
157  "SpliceDataReaderListener::handle_multiassignment:"
158  " discarded data[%s]=%q due to lower quality (%u vs %u)\n",
159  key.c_str(), value, cur_quality, data.quality);
160  }
161  else if (result == -3)
162  {
164  "SpliceDataReaderListener::handle_multiassignment:"
165  " discarded data[%s]=%q due to older timestamp (%Q vs %Q)\n",
166  key.c_str(), value, cur_clock, data.clock);
167  }
168  }
169 
170  context_.unlock();
171  }
172 }
173 
175  DDS::DataReader_ptr, const DDS::SampleLostStatus& status)
176 {
177 }
178 
180  DDS::DataReader_ptr, const DDS::SampleRejectedStatus& status)
181 {
182 }
183 
185  DDS::DataReader_ptr, const DDS::RequestedIncompatibleQosStatus& status)
186 {
187 }
188 
190  DDS::DataReader_ptr reader,
191  const DDS::RequestedDeadlineMissedStatus& status)
192 {
193 }
194 
196  DDS::DataReader_ptr reader, const DDS::LivelinessChangedStatus& status)
197 {
198 }
199 
201  DDS::DataReader_ptr reader, const DDS::SubscriptionMatchedStatus& status)
202 {
203  context_.set_changed();
204 }
205 
207  DDS::DataReader_ptr reader)
208 {
209  DDS::SampleInfoSeq_var infoList = new DDS::SampleInfoSeq;
210  DDS::ReturnCode_t dds_result;
211  int amount;
212  DDS::Boolean result = false;
213  Knowledge::UpdateSeq_var update_data_list_ = new Knowledge::UpdateSeq;
214 
215  Knowledge::UpdateDataReader_ptr update_reader =
216  dynamic_cast<Knowledge::UpdateDataReader_ptr>(reader);
217 
218  if (update_reader == 0)
219  {
221  "SpliceDataReaderListener::on_data_available:"
222  " Unable to create specialized reader. Leaving callback...\n");
223 
224  return;
225  }
226 
227  dds_result = update_reader->take(update_data_list_, infoList, 1,
228  DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, DDS::ANY_INSTANCE_STATE);
229 
230  amount = update_data_list_->length();
231 
232  if (amount != 0)
233  {
234  for (int i = 0; i < amount; ++i)
235  {
236  // if we are evaluating a message from ourselves, just continue
237  // to the next one. It's also possible to receive null originators
238  // from what I can only guess is the ospl daemon messing up
239  if (!update_data_list_[i].originator.val() ||
240  id_ == update_data_list_[i].originator.val())
241  {
243  "SpliceDataReaderListener::on_data_available:"
244  " discarding null originator event\n");
245 
246  continue;
247  }
248 
249  if (madara::knowledge::ASSIGNMENT == update_data_list_[i].type)
250  {
252  "SpliceDataReaderListener::on_data_available:"
253  " processing %s=%q from %s with time %Q and quality %u\n",
254  update_data_list_[i].key.val(), update_data_list_[i].value,
255  update_data_list_[i].originator.val(), update_data_list_[i].clock,
256  update_data_list_[i].quality);
257 
258  handle_assignment(update_data_list_[i]);
259  }
261  update_data_list_[i].type)
262  {
264  "SpliceDataReaderListener::on_data_available:"
265  " processing multassignment from %s with time %Q and quality %u\n",
266  update_data_list_[i].originator.val(), update_data_list_[i].clock,
267  update_data_list_[i].quality);
268 
269  handle_multiassignment(update_data_list_[i]);
270  }
271 
272  // otherwise the key was null, which is unusable
273  }
274  }
275  dds_result = update_reader->return_loan(update_data_list_, infoList);
276 }
#define madara_logger_log(loggering, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
const ThreadSafeContext * context_
This class stores variables and their values for use by any entity needing state information in a thr...
Container for DDS-related callbacks (deprecated and unused)
void on_sample_rejected(DDS::DataReader_ptr, const DDS::SampleRejectedStatus &status)
DDS callback for sample rejected.
void on_requested_incompatible_qos(DDS::DataReader_ptr, const DDS::RequestedIncompatibleQosStatus &status)
DDS callback for incompatible qos.
void on_subscription_matched(DDS::DataReader_ptr reader, const DDS::SubscriptionMatchedStatus &status)
DDS callback for subscription matched.
void on_sample_lost(DDS::DataReader_ptr, const DDS::SampleLostStatus &status)
DDS callback for sample lost.
void on_liveliness_changed(DDS::DataReader_ptr, const DDS::LivelinessChangedStatus &status)
DDS callback for sample rejected.
SpliceDataReaderListener(const std::string &id, knowledge::ThreadSafeContext &context)
void on_requested_deadline_missed(DDS::DataReader_ptr, const DDS::RequestedDeadlineMissedStatus &status)
DDS callback for deadline being missed.
constexpr string_t string