2 #include "madara/utility/LogMacros.h" 12 : id_ (id), context_ (context)
26 Knowledge::Update & data)
32 long long value = data.value;
36 "SpliceDataReaderListener::handle_assignment:" \
37 " waiting to process assignment\n");
53 data.quality, data.clock,
false);
61 "SpliceDataReaderListener::handle_assignment:" \
62 " received data[%s]=%q from %s\n",
63 key.c_str (), value, data.originator.val ());
69 "SpliceDataReaderListener::handle_assignment:" \
70 " discarded data[%s]=%q from %s as the value was already set\n",
71 key.c_str (), value, data.originator.val ());
73 else if (result == -1)
76 "SpliceDataReaderListener::handle_assignment:" \
77 " discarded data due to null key\n",
78 key.c_str (), value, data.originator.val ());
80 else if (result == -2)
83 "SpliceDataReaderListener::handle_assignment:" \
84 " discarded data[%s]=%q due to lower quality (%u vs %u)\n",
85 key.c_str (), value, cur_quality, data.quality);
87 else if (result == -3)
90 "SpliceDataReaderListener::handle_assignment:" \
91 " discarded data[%s]=%q due to older timestamp (%Q vs %Q)\n",
92 key.c_str (), value, cur_clock, data.clock);
98 Knowledge::Update & data)
105 std::stringstream stream (data.key.val ());
108 "SpliceDataReaderListener::handle_multiassignment:" \
109 " waiting to process multiassignment\n");
114 "SpliceDataReaderListener::handle_multiassignment:" \
115 " processing multiassignment (%s)\n",
118 while (!stream.eof ())
120 stream >> key >> symbol >> value >> symbol;
131 data.quality, data.clock,
false);
137 "SpliceDataReaderListener::handle_multiassignment:" \
138 " received data[%s]=%q from %s\n",
139 key.c_str (), value, data.originator.val ());
142 else if (result == 0)
145 "SpliceDataReaderListener::handle_multiassignment:" \
146 " discarded data[%s]=%q from %s as the value was already set\n",
147 key.c_str (), value, data.originator.val ());
149 else if (result == -1)
152 "SpliceDataReaderListener::handle_multiassignment:" \
153 " discarded data due to null key\n",
154 key.c_str (), value, data.originator.val ());
156 else if (result == -2)
159 "SpliceDataReaderListener::handle_multiassignment:" \
160 " discarded data[%s]=%q due to lower quality (%u vs %u)\n",
161 key.c_str (), value, cur_quality, data.quality);
163 else if (result == -3)
166 "SpliceDataReaderListener::handle_multiassignment:" \
167 " discarded data[%s]=%q due to older timestamp (%Q vs %Q)\n",
168 key.c_str (), value, cur_clock, data.clock);
178 DDS::DataReader_ptr,
const DDS::SampleLostStatus &status)
184 DDS::DataReader_ptr,
const DDS::SampleRejectedStatus &status)
190 DDS::DataReader_ptr,
const DDS::RequestedIncompatibleQosStatus &status)
196 DDS::DataReader_ptr reader,
const DDS::RequestedDeadlineMissedStatus & status)
202 DDS::DataReader_ptr reader,
const DDS::LivelinessChangedStatus & status)
208 DDS::DataReader_ptr reader,
const DDS::SubscriptionMatchedStatus & status)
215 DDS::DataReader_ptr reader)
217 DDS::SampleInfoSeq_var infoList =
new DDS::SampleInfoSeq;
218 DDS::ReturnCode_t dds_result;
220 DDS::Boolean result =
false;
221 Knowledge::UpdateSeq_var update_data_list_ =
new Knowledge::UpdateSeq;
223 Knowledge::UpdateDataReader_ptr update_reader =
224 dynamic_cast<Knowledge::UpdateDataReader_ptr
> (reader);
226 if (update_reader == 0)
229 "SpliceDataReaderListener::on_data_available:" \
230 " Unable to create specialized reader. Leaving callback...\n");
235 dds_result = update_reader->take (update_data_list_, infoList, 1,
236 DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, DDS::ANY_INSTANCE_STATE);
238 amount = update_data_list_->length ();
242 for (
int i = 0; i < amount; ++i)
247 if (!update_data_list_[i].originator.val () ||
248 id_ == update_data_list_[i].originator.val ())
251 "SpliceDataReaderListener::on_data_available:" \
252 " discarding null originator event\n");
260 "SpliceDataReaderListener::on_data_available:" \
261 " processing %s=%q from %s with time %Q and quality %u\n",
262 update_data_list_[i].key.val (), update_data_list_[i].value,
263 update_data_list_[i].originator.val (),
264 update_data_list_[i].clock, update_data_list_[i].quality);
271 "SpliceDataReaderListener::on_data_available:" \
272 " processing multassignment from %s with time %Q and quality %u\n",
273 update_data_list_[i].originator.val (),
274 update_data_list_[i].clock, update_data_list_[i].quality);
282 dds_result = update_reader->return_loan (update_data_list_, infoList);
uint32_t get_quality(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Atomically gets quality of a variable.
void lock(void) const
Locks the mutex on this context.
This class stores variables and their values for use by any entity needing state information in a thr...
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
knowledge::ThreadSafeContext & context_
void on_requested_incompatible_qos(DDS::DataReader_ptr, const DDS::RequestedIncompatibleQosStatus &status)
DDS callback for incompatible qos.
void handle_assignment(Knowledge::Update &data)
void unlock(void) const
Unlocks the mutex on this context.
int set_if_unequal(const std::string &key, madara::knowledge::KnowledgeRecord::Integer value, uint32_t quality, uint64_t clock, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets if the variable value will be different.
void on_sample_lost(DDS::DataReader_ptr, const DDS::SampleLostStatus &status)
DDS callback for sample lost.
void set_changed(void)
Force a change to be registered, waking up anyone waiting on entry.
void handle_multiassignment(Knowledge::Update &data)
void on_requested_deadline_missed(DDS::DataReader_ptr, const DDS::RequestedDeadlineMissedStatus &status)
DDS callback for deadline being missed.
void on_data_available(DDS::DataReader_ptr reader)
void on_subscription_matched(DDS::DataReader_ptr reader, const DDS::SubscriptionMatchedStatus &status)
DDS callback for subscription matched.
uint64_t get_clock(void) const
Atomically gets the Lamport clock.
SpliceDataReaderListener(const std::string &id, knowledge::ThreadSafeContext &context)
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
Container for DDS-related callbacks (deprecated and unused)
void on_liveliness_changed(DDS::DataReader_ptr, const DDS::LivelinessChangedStatus &status)
DDS callback for sample rejected.
~SpliceDataReaderListener()
void on_sample_rejected(DDS::DataReader_ptr, const DDS::SampleRejectedStatus &status)
DDS callback for sample rejected.