MADARA  3.4.1
NativeCircularBufferConsumer.inl
Go to the documentation of this file.
1 
2 #ifndef _MADARA_KNOWLEDGE_CONTAINERS_NATIVECIRCULARBUFFERCONSUMER_INL_
3 #define _MADARA_KNOWLEDGE_CONTAINERS_NATIVECIRCULARBUFFERCONSUMER_INL_
4 
5 #include <sstream>
6 #include <math.h>
7 
13 
14 namespace madara
15 {
16 namespace knowledge
17 {
18 namespace containers
19 {
21  : context_(0), local_index_(-1)
22 {
23 }
24 
26  const char* func, const char* name)
27 {
28  if (name == nullptr || name[0] == '\0')
29  {
31  std::string("NativeCircularBufferConsumer::") + func +
32  ": name is empty.");
33  }
34 }
35 
36 inline void NativeCircularBufferConsumer::check_context(const char* func) const
37 {
38  if (!context_)
39  {
41  std::string("NativeCircularBufferConsumer::") + func +
42  ": context is not set.");
43  }
44 
45  if (!ref_.is_valid())
46  {
48  std::string("NativeCircularBufferConsumer::") + func +
49  ": underlying record is not set.");
50  }
51 }
52 
54  const std::string& name, KnowledgeBase& knowledge)
55  : context_(&(knowledge.get_context())), local_index_(0UL - 1)
56 {
57  check_name(__func__, name.c_str());
58 
59  ContextGuard context_guard(knowledge);
60 
61  set_name(name, knowledge);
62 }
63 
65  const std::string& name, Variables& knowledge)
66  : context_(knowledge.get_context()), local_index_(0UL - 1)
67 {
68  check_name(__func__, name.c_str());
69 
70  ContextGuard context_guard(*knowledge.get_context());
71 
72  set_name(name, knowledge);
73 }
74 
76  const NativeCircularBufferConsumer& value) const
77 {
78  return context_ == value.context_ && get_name() == value.get_name();
79 }
80 
82  const NativeCircularBufferConsumer& value) const
83 {
84  return !operator==(value);
85 }
86 
87 template<typename T>
88 void NativeCircularBufferConsumer::consume(T& value, size_t& dropped) const
89 {
90  //value = consume(dropped).to_any<T>();
91  value = knowledge_cast<T>(consume(dropped));
92 }
93 
94 template<typename T>
96  size_t count, std::vector<T>& values) const
97 {
98  // iterate over the returned records
99  for (auto record : peek_latest(count))
100  {
101  // add them to the values
102  if (record.is_valid())
103  //values.push_back(record.to_any<T>());
104  values.push_back(knowledge_cast<T>(record));
105  }
106 }
107 
108 inline std::vector<KnowledgeRecord> NativeCircularBufferConsumer::peek_latest(
109  size_t count) const
110 {
111  ContextGuard context_guard(*context_);
112 
114 
115  size_t newest_index = rec.get_history_newest_index();
116 
117  if (local_index_ + count > newest_index + 1)
118  {
119  count = (newest_index + 1) - local_index_;
120  }
121 
122  std::vector<KnowledgeRecord> ret_vec = rec.get_newest(count);
123 
124  std::reverse(ret_vec.begin(), ret_vec.end());
125 
126  return ret_vec;
127 }
128 
131 {
132  ContextGuard context_guard(*context_);
133 
135 
136  size_t newest_index = rec.get_history_newest_index();
137 
138  if (local_index_ >= newest_index + 1)
139  {
140  return KnowledgeRecord();
141  }
142 
143  return rec.get_newest();
144 }
145 
146 template<typename T>
148  size_t count, std::vector<T>& values) const
149 {
150  // iterate over the returned records
151  for (auto record : consume_latest(count))
152  {
153  // add them to the values
154  if (record.is_valid())
155  values.push_back(knowledge_cast<T>(record));
156  }
157 }
158 
159 template<typename T>
161  size_t count, std::vector<T>& values, size_t& dropped) const
162 {
163  dropped = get_dropped();
164 
165  // iterate over the returned records
166  for (auto record : consume_latest(count))
167  {
168  // add them to the values
169  if (record.is_valid())
170  values.push_back(knowledge_cast<T>(record));
171  }
172 }
173 
174 inline std::vector<KnowledgeRecord>
176 {
177  ContextGuard context_guard(*context_);
178 
180 
181  size_t newest_index = rec.get_history_newest_index();
182 
183  std::vector<KnowledgeRecord> result;
184 
185  if (local_index_ + count > newest_index + 1)
186  {
187  count = (newest_index + 1) - local_index_;
188  }
189 
190  local_index_ = newest_index + 1;
191 
192  std::vector<KnowledgeRecord> ret_vec = rec.get_newest(count);
193 
194  std::reverse(ret_vec.begin(), ret_vec.end());
195 
196  return ret_vec;
197 }
198 
201 {
202  ContextGuard context_guard(*context_);
203 
205 
206  size_t newest_index = rec.get_history_newest_index();
207 
208  KnowledgeRecord ret;
209 
210  if (local_index_ < newest_index + 1)
211  {
212  ret = rec.get_newest();
213  }
214 
215  local_index_ = newest_index + 1;
216 
217  return ret;
218 }
219 
220 inline std::vector<KnowledgeRecord>
222  size_t count, size_t& dropped) const
223 {
224  ContextGuard context_guard(*context_);
225 
227 
228  size_t newest_index = rec.get_history_newest_index();
229 
230  dropped = get_dropped();
231 
232  if (local_index_ + count > newest_index)
233  {
234  count = newest_index - local_index_;
235  }
236 
237  local_index_ = newest_index;
238 
239  return rec.get_newest(count);
240 }
241 
243  void) const
244 {
245  size_t dropped = 0;
246 
247  return consume(dropped);
248 }
249 
251  size_t& dropped) const
252 {
253  ContextGuard context_guard(*context_);
254 
256 
257  size_t oldest_index = rec.get_history_oldest_index();
258 
259  if (remaining() == 0)
260  {
261  return KnowledgeRecord();
262  }
263 
264  if (local_index_ < oldest_index)
265  {
266  dropped = oldest_index - local_index_;
267  local_index_ = oldest_index;
268  }
269  else
270  {
271  dropped = 0;
272  }
273 
274  KnowledgeRecord ret;
275 
276  rec.get_history_range(&ret, local_index_, 1);
277  ++local_index_;
278 
279  return ret;
280 }
281 
282 template<typename T>
284  size_t count, std::vector<T>& values) const
285 {
286  for (auto record : consume_many(count))
287  {
288  //values.push_back(record.to_any<T>());
289  values.push_back(knowledge_cast<T>(record));
290  }
291 }
292 
293 inline std::vector<KnowledgeRecord> NativeCircularBufferConsumer::consume_many(
294  size_t count, size_t& dropped) const
295 {
296  check_context(__func__);
297 
298  ContextGuard context_guard(*context_);
299 
300  dropped = get_dropped();
301 
302  std::vector<KnowledgeRecord> ret_vec;
303  for (size_t consume_counter = 0; consume_counter < count; ++consume_counter)
304  {
305  ret_vec.emplace_back(consume());
306  }
307 
308  return ret_vec;
309 }
310 
311 inline std::vector<KnowledgeRecord> NativeCircularBufferConsumer::consume_many(
312  size_t count) const
313 {
314  check_context(__func__);
315 
316  ContextGuard context_guard(*context_);
317 
318  std::vector<KnowledgeRecord> ret_vec;
319  for (size_t consume_counter = 0; consume_counter < count; ++consume_counter)
320  {
321  ret_vec.emplace_back(consume());
322  }
323 
324  return ret_vec;
325 }
326 
327 template<typename T>
329  KnowledgeRecord::Integer position, T& value) const
330 {
331  //value = inspect(position).to_any<T>();
332  value = knowledge_cast<T>(inspect(position));
333 }
334 
336  KnowledgeRecord::Integer position) const
337 {
338  check_context(__func__);
339 
340  ContextGuard context_guard(*context_);
341 
343 
344  KnowledgeRecord ret;
345  rec.get_history_range(&ret, local_index_ + position, 1);
346 
347  return ret;
348 }
349 
350 template<typename T>
352  size_t count, std::vector<T>& values) const
353 {
354  // iterate over the returned records
355  for (auto record : inspect(position, count))
356  {
357  //values.push_back(record.to_any<T>());
358  values.push_back(knowledge_cast<T>(record));
359  }
360 }
361 
362 inline std::vector<KnowledgeRecord> NativeCircularBufferConsumer::inspect(
363  KnowledgeRecord::Integer position, size_t count) const
364 {
365  check_context(__func__);
366 
367  ContextGuard context_guard(*context_);
368 
370 
371  size_t oldest_index = rec.get_history_oldest_index();
372 
373  if (local_index_ < oldest_index)
374  {
375  // Messages were dropped
376  local_index_ = oldest_index;
377  }
378 
379  KnowledgeRecord ret;
380  std::vector<KnowledgeRecord> ret_vec;
381  for (size_t inspect_counter = 0; inspect_counter < count; ++inspect_counter)
382  {
383  if (rec.get_history_range(
384  &ret, local_index_ + position + inspect_counter, 1))
385  {
386  ret_vec.emplace_back(std::move(ret));
387  }
388  }
389 
390  return ret_vec;
391 }
392 
394 {
395  return ref_.get_name();
396 }
397 
398 inline size_t NativeCircularBufferConsumer::size(void) const
399 {
400  check_context(__func__);
401 
402  ContextGuard context_guard(*context_);
403 
405 
406  return rec.get_history_capacity();
407 }
408 
410 {
411  check_context(__func__);
412 
413  ContextGuard context_guard(*context_);
414 
416 
417  size_t newest_index = rec.get_history_newest_index();
418 
419  if (local_index_ > newest_index)
420  {
421  return 0;
422  }
423 
424  return newest_index + 1 - local_index_;
425 }
426 
427 inline size_t NativeCircularBufferConsumer::count(void) const
428 {
429  check_context(__func__);
430 
431  ContextGuard context_guard(*context_);
432 
434  return rec.get_history_size();
435 }
436 
438  const std::string& name, ThreadSafeContext& context)
439 {
440  check_name(__func__, name.c_str());
441 
442  ContextGuard context_guard(context);
443 
444  context_ = &context;
445  ref_ = context_->get_ref(name);
446  local_index_ = 0;
447 }
448 
450  const std::string& name, KnowledgeBase& knowledge)
451 {
452  set_name(name, knowledge.get_context());
453 }
454 
456  const std::string& name, Variables& knowledge)
457 {
458  set_name(name, *knowledge.get_context());
459 }
460 
462 {
463  check_context(__func__);
464 
465  ContextGuard context_guard(*context_);
466 
467  local_index_ = index;
468 }
469 
471 {
472  check_context(__func__);
473 
474  ContextGuard context_guard(*context_);
475 
477 
478  size_t oldest_index = rec.get_history_oldest_index();
479 
480  if (local_index_ < oldest_index)
481  {
482  return oldest_index - local_index_;
483  }
484 
485  return 0;
486 }
487 
489 {
490  check_context(__func__);
491 
492  ContextGuard context_guard(*context_);
493 
494  return *ref_.get_record_unsafe();
495 }
496 
497 } // end containers namespace
498 } // end knowledge namespace
499 } // end madara namespace
500 
501 #endif // _MADARA_KNOWLEDGE_CONTAINERS_NATIVECIRCULARBUFFERCONSUMER_INL_
const ThreadSafeContext * context_
madara::knowledge::KnowledgeRecord KnowledgeRecord
An exception for attempting to access an invalid context1.
An exception for setting an invalid name in MADARA.
Definition: NameException.h:16
A thread-safe guard for a context or knowledge base.
Definition: ContextGuard.h:24
This class provides a distributed knowledge base to users.
Definition: KnowledgeBase.h:45
This class encapsulates an entry in a KnowledgeBase.
size_t get_history_capacity() const
Return the maximum amount of history this record can hold.
size_t get_history_newest_index() const
Gets the absolute index of the newest element in stored history.
size_t get_history_oldest_index() const
Gets the absolute index of the oldest element in stored history.
size_t get_history_range(OutputIterator out, size_t index, size_t count) const
Copy the given absolute range of history to the output iterator given.
size_t get_history_size() const
Return the amount of history this record holds.
size_t get_newest(OutputIterator out, size_t count) const
Copy the count newest stored history entries of this record to the given output iterator,...
This class stores variables and their values for use by any entity needing state information in a thr...
VariableReference get_ref(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Atomically returns a reference to the variable.
const char * get_name(void) const
Returns the name of the variable.
bool is_valid(void) const
Checks to see if the variable reference has been initialized.
KnowledgeRecord * get_record_unsafe(void) const
Returns a pointer to the variable's KnowledgeRecord Do not use this pointer unless you've locked the ...
Provides an interface for external functions into the MADARA KaRL variable settings.
Definition: Variables.h:53
This class provides an interface similar to CircularBufferConsumer, which uses the internal history b...
VariableReference ref_
Reference to underlying record we are reading.
void inspect(KnowledgeRecord::Integer position, T &value) const
Retrieves a record at a position relative to local index.
void consume_many(size_t count, std::vector< T > &values) const
Consumes (earliest) records from the local index.
void set_index(size_t index)
Sets the local index to an arbitrary position.
std::string get_name(void) const
Returns the name of the variable.
madara::knowledge::KnowledgeRecord consume(void) const
Consumes the record at the local index (not the producer index)
madara::knowledge::KnowledgeRecord consume_latest(void) const
Consumes the latest the record at the local index (not the producer index).
void set_name(const std::string &name, KnowledgeBase &knowledge)
Sets the variable name that this refers to.
KnowledgeRecord get_record() const
Get the KnowledgeRecord this container refers to.
ThreadSafeContext * context_
Variable context that we are modifying.
size_t get_dropped(void) const
Returns the number of known drops since last consume.
bool operator==(const NativeCircularBufferConsumer &value) const
Checks for equality.
size_t count(void) const
Returns the number of records in the NativeCircularBufferConsumer.
size_t remaining(void) const
Returns the number of records remaining that have not been consumed.
madara::knowledge::KnowledgeRecord peek_latest(void) const
Peeks, but does not consume, the latest the record at the local index (not the producer index).
size_t size(void) const
Returns the maximum size of the NativeCircularBufferConsumer.
bool operator!=(const NativeCircularBufferConsumer &value) const
Checks for inequality.
Provides container classes for fast knowledge base access and mutation.
Definition: Barrier.h:27
constexpr string_t string
Provides functions and classes for the distributed knowledge base.
Copyright(c) 2020 Galois.