MADARA  3.2.3
SpliceDataReaderListener.cpp
Go to the documentation of this file.
2 #include "madara/utility/LogMacros.h"
5 
6 #include <iostream>
7 #include <sstream>
8 
10  const std::string & id,
12 : id_ (id), context_ (context)
13 {
14 }
15 
17  const SpliceDataReaderListener &ref)
18 : id_ (ref.id_), context_ (ref.context_)
19 {
20 }
21 
23 {}
24 
26  Knowledge::Update & data)
27 {
28  if (data.key.val ())
29  {
30  // if we aren't evaluating a message from ourselves, process it
31  std::string key = data.key.val ();
32  long long value = data.value;
33  int result = 0;
34 
36  "SpliceDataReaderListener::handle_assignment:" \
37  " waiting to process assignment\n");
38 
39  context_.lock ();
40  unsigned long long cur_clock = context_.get_clock (key);
41  unsigned long cur_quality = context_.get_quality (key);
42 
43  // if the data we are updating had a lower clock value or less quality
44  // then that means this update is the latest value. Among
45  // other things, this means our solution will work even
46  // without FIFO channel transports
47 
48  // if the data we are updating had a lower clock value
49  // then that means this update is the latest value. Among
50  // other things, this means our solution will work even
51  // without FIFO channel transports
52  result = context_.set_if_unequal (key, value,
53  data.quality, data.clock, false);
54 
55  context_.unlock ();
56 
57  // if we actually updated the value
58  if (result == 1)
59  {
61  "SpliceDataReaderListener::handle_assignment:" \
62  " received data[%s]=%q from %s\n",
63  key.c_str (), value, data.originator.val ());
64  }
65  // if the data was already current
66  else if (result == 0)
67  {
69  "SpliceDataReaderListener::handle_assignment:" \
70  " discarded data[%s]=%q from %s as the value was already set\n",
71  key.c_str (), value, data.originator.val ());
72  }
73  else if (result == -1)
74  {
76  "SpliceDataReaderListener::handle_assignment:" \
77  " discarded data due to null key\n",
78  key.c_str (), value, data.originator.val ());
79  }
80  else if (result == -2)
81  {
83  "SpliceDataReaderListener::handle_assignment:" \
84  " discarded data[%s]=%q due to lower quality (%u vs %u)\n",
85  key.c_str (), value, cur_quality, data.quality);
86  }
87  else if (result == -3)
88  {
90  "SpliceDataReaderListener::handle_assignment:" \
91  " discarded data[%s]=%q due to older timestamp (%Q vs %Q)\n",
92  key.c_str (), value, cur_clock, data.clock);
93  }
94  }
95 }
96 
98  Knowledge::Update & data)
99 {
100  if (data.key.val ())
101  {
102  std::string key;
103  char symbol;
104  long long value;
105  std::stringstream stream (data.key.val ());
106 
108  "SpliceDataReaderListener::handle_multiassignment:" \
109  " waiting to process multiassignment\n");
110 
111  context_.lock ();
112 
114  "SpliceDataReaderListener::handle_multiassignment:" \
115  " processing multiassignment (%s)\n",
116  data.key.val ());
117 
118  while (!stream.eof ())
119  {
120  stream >> key >> symbol >> value >> symbol;
121 
122  int result = 0;
123  unsigned long long cur_clock = context_.get_clock (key);
124  unsigned long cur_quality = context_.get_quality (key);
125 
126  // if the data we are updating had a lower clock value
127  // then that means this update is the latest value. Among
128  // other things, this means our solution will work even
129  // without FIFO channel transports
130  result = context_.set_if_unequal (key, value,
131  data.quality, data.clock, false);
132 
133  // if we actually updated the value
134  if (result == 1)
135  {
137  "SpliceDataReaderListener::handle_multiassignment:" \
138  " received data[%s]=%q from %s\n",
139  key.c_str (), value, data.originator.val ());
140  }
141  // if the data was already current
142  else if (result == 0)
143  {
145  "SpliceDataReaderListener::handle_multiassignment:" \
146  " discarded data[%s]=%q from %s as the value was already set\n",
147  key.c_str (), value, data.originator.val ());
148  }
149  else if (result == -1)
150  {
152  "SpliceDataReaderListener::handle_multiassignment:" \
153  " discarded data due to null key\n",
154  key.c_str (), value, data.originator.val ());
155  }
156  else if (result == -2)
157  {
159  "SpliceDataReaderListener::handle_multiassignment:" \
160  " discarded data[%s]=%q due to lower quality (%u vs %u)\n",
161  key.c_str (), value, cur_quality, data.quality);
162  }
163  else if (result == -3)
164  {
166  "SpliceDataReaderListener::handle_multiassignment:" \
167  " discarded data[%s]=%q due to older timestamp (%Q vs %Q)\n",
168  key.c_str (), value, cur_clock, data.clock);
169  }
170  }
171 
172  context_.unlock ();
173  }
174 }
175 
176 void
178  DDS::DataReader_ptr, const DDS::SampleLostStatus &status)
179 {
180 }
181 
182 void
184  DDS::DataReader_ptr, const DDS::SampleRejectedStatus &status)
185 {
186 }
187 
188 void
190  DDS::DataReader_ptr, const DDS::RequestedIncompatibleQosStatus &status)
191 {
192 }
193 
194 void
196  DDS::DataReader_ptr reader, const DDS::RequestedDeadlineMissedStatus & status)
197 {
198 }
199 
200 void
202  DDS::DataReader_ptr reader, const DDS::LivelinessChangedStatus & status)
203 {
204 }
205 
206 void
208  DDS::DataReader_ptr reader, const DDS::SubscriptionMatchedStatus & status)
209 {
211 }
212 
213 void
215  DDS::DataReader_ptr reader)
216 {
217  DDS::SampleInfoSeq_var infoList = new DDS::SampleInfoSeq;
218  DDS::ReturnCode_t dds_result;
219  int amount;
220  DDS::Boolean result = false;
221  Knowledge::UpdateSeq_var update_data_list_ = new Knowledge::UpdateSeq;
222 
223  Knowledge::UpdateDataReader_ptr update_reader =
224  dynamic_cast<Knowledge::UpdateDataReader_ptr> (reader);
225 
226  if (update_reader == 0)
227  {
229  "SpliceDataReaderListener::on_data_available:" \
230  " Unable to create specialized reader. Leaving callback...\n");
231 
232  return;
233  }
234 
235  dds_result = update_reader->take (update_data_list_, infoList, 1,
236  DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, DDS::ANY_INSTANCE_STATE);
237 
238  amount = update_data_list_->length ();
239 
240  if (amount != 0)
241  {
242  for (int i = 0; i < amount; ++i)
243  {
244  // if we are evaluating a message from ourselves, just continue
245  // to the next one. It's also possible to receive null originators
246  // from what I can only guess is the ospl daemon messing up
247  if (!update_data_list_[i].originator.val () ||
248  id_ == update_data_list_[i].originator.val ())
249  {
251  "SpliceDataReaderListener::on_data_available:" \
252  " discarding null originator event\n");
253 
254  continue;
255  }
256 
257  if (madara::knowledge::ASSIGNMENT == update_data_list_[i].type)
258  {
260  "SpliceDataReaderListener::on_data_available:" \
261  " processing %s=%q from %s with time %Q and quality %u\n",
262  update_data_list_[i].key.val (), update_data_list_[i].value,
263  update_data_list_[i].originator.val (),
264  update_data_list_[i].clock, update_data_list_[i].quality);
265 
266  handle_assignment (update_data_list_[i]);
267  }
268  else if (madara::knowledge::MULTIPLE_ASSIGNMENT == update_data_list_[i].type)
269  {
271  "SpliceDataReaderListener::on_data_available:" \
272  " processing multassignment from %s with time %Q and quality %u\n",
273  update_data_list_[i].originator.val (),
274  update_data_list_[i].clock, update_data_list_[i].quality);
275 
276  handle_multiassignment (update_data_list_[i]);
277  }
278 
279  // otherwise the key was null, which is unusable
280  }
281  }
282  dds_result = update_reader->return_loan (update_data_list_, infoList);
283 
284 }
uint32_t get_quality(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Atomically gets quality of a variable.
void lock(void) const
Locks the mutex on this context.
This class stores variables and their values for use by any entity needing state information in a thr...
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
static struct madara::knowledge::tags::string_t string
void on_requested_incompatible_qos(DDS::DataReader_ptr, const DDS::RequestedIncompatibleQosStatus &status)
DDS callback for incompatible qos.
void unlock(void) const
Unlocks the mutex on this context.
int set_if_unequal(const std::string &key, madara::knowledge::KnowledgeRecord::Integer value, uint32_t quality, uint64_t clock, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets if the variable value will be different.
void on_sample_lost(DDS::DataReader_ptr, const DDS::SampleLostStatus &status)
DDS callback for sample lost.
void set_changed(void)
Force a change to be registered, waking up anyone waiting on entry.
void on_requested_deadline_missed(DDS::DataReader_ptr, const DDS::RequestedDeadlineMissedStatus &status)
DDS callback for deadline being missed.
void on_subscription_matched(DDS::DataReader_ptr reader, const DDS::SubscriptionMatchedStatus &status)
DDS callback for subscription matched.
uint64_t get_clock(void) const
Atomically gets the Lamport clock.
SpliceDataReaderListener(const std::string &id, knowledge::ThreadSafeContext &context)
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
Container for DDS-related callbacks (deprecated and unused)
void on_liveliness_changed(DDS::DataReader_ptr, const DDS::LivelinessChangedStatus &status)
DDS callback for sample rejected.
void on_sample_rejected(DDS::DataReader_ptr, const DDS::SampleRejectedStatus &status)
DDS callback for sample rejected.