MADARA  3.4.1
CircularBufferConsumer.inl
Go to the documentation of this file.
1 
2 #ifndef _MADARA_KNOWLEDGE_CONTAINERS_CIRCULARBUFFERCONSUMER_INL_
3 #define _MADARA_KNOWLEDGE_CONTAINERS_CIRCULARBUFFERCONSUMER_INL_
4 
5 #include <sstream>
6 #include <math.h>
7 
13 
14 namespace madara
15 {
16 namespace knowledge
17 {
18 namespace containers
19 {
21  : context_(0), local_index_(-1)
22 {
23 }
24 
25 inline void CircularBufferConsumer::check_name(const char* func) const
26 {
27  if (name_ == "")
28  {
30  std::string("CircularBufferConsumer::") + func + ": name is empty.");
31  }
32 }
33 
34 inline void CircularBufferConsumer::check_context(const char* func) const
35 {
36  check_name(func);
37  if (!context_)
38  {
39  throw exceptions::ContextException(std::string("CircularBufferConsumer::") +
40  func + ": context is not set.");
41  }
42 }
43 
44 inline void CircularBufferConsumer::check_all(const char* func) const
45 {
46  std::string reason = "";
47  if (context_ == 0)
48  {
49  reason = "context has not been set";
50  }
51 
52  if (name_ == "")
53  {
54  if (reason.size() > 0)
55  {
56  reason += " and ";
57  }
58  reason = "name has not been set";
59  }
60 
61  if (buffer_.size() == 0)
62  {
63  if (reason.size() > 0)
64  {
65  reason += " and ";
66  }
67  reason = "size == 0";
68  }
69 
70  if (reason != "")
71  {
72  std::stringstream message;
73  message << "CircularBufferConsumer::" << func << ": ";
74  message << "Invalid access because " << reason << "\n";
75  throw exceptions::IndexException(message.str());
76  }
77 }
78 
80  const std::string& name, KnowledgeBase& knowledge)
81  : context_(&(knowledge.get_context())), name_(name), local_index_(-1)
82 {
83  check_name(__func__);
84 
85  ContextGuard context_guard(knowledge);
86  set_name(name, knowledge);
87 }
88 
90  const std::string& name, Variables& knowledge)
91  : context_(knowledge.get_context()), name_(name), local_index_(-1)
92 {
93  check_name(__func__);
94 
95  ContextGuard context_guard(*knowledge.get_context());
96  set_name(name, knowledge);
97 }
98 
100  const CircularBufferConsumer& value) const
101 {
102  return name_ == value.get_name();
103 }
104 
106  const CircularBufferConsumer& value) const
107 {
108  return name_ != value.get_name();
109 }
110 
114 {
115  KnowledgeRecord::Integer result = base + value;
116  if (buffer_.size() > 0 && base + value >= 0)
117  {
118  return (result) % (KnowledgeRecord::Integer)buffer_.size();
119  }
120  else if (buffer_.size() > 0)
121  {
122  return (KnowledgeRecord::Integer)buffer_.size() + result;
123  }
124  else
125  {
126  std::stringstream message;
127  message << "CircularBufferConsumer::increment: ";
128  message << "Result of " << base << "+" << value << " is " << result;
129  message << " which is impossible to access with size " << size() << ".\n";
130  throw exceptions::IndexException(message.str());
131  }
132 }
133 
135  void) const
136 {
137  check_all(__func__);
138 
139  ContextGuard context_guard(*context_);
140 
142 
143  if (index_diff > (KnowledgeRecord::Integer)buffer_.size())
144  {
146  }
147 
149 
150  if (remaining() > 0)
151  {
152  ++local_index_;
153  return context_->get(buffer_.vector_[(size_t)(cur)]);
154  }
155  else
156  return KnowledgeRecord();
157 }
158 
160  size_t& dropped) const
161 {
162  check_all(__func__);
163 
164  ContextGuard context_guard(*context_);
165 
167 
168  dropped = get_dropped();
169 
170  if (index_diff > (KnowledgeRecord::Integer)buffer_.size())
171  {
173  }
174 
176 
177  if (remaining() > 0)
178  {
179  ++local_index_;
180  return context_->get(buffer_.vector_[(size_t)(cur)]);
181  }
182  else
183  return KnowledgeRecord();
184 }
185 
187  KnowledgeRecord::Integer position) const
188 {
189  check_context(__func__);
190 
191  ContextGuard context_guard(*context_);
192 
193  // If buffer overflowed, update local index to last valid value - 1
194  KnowledgeRecord::Integer index_diff = (*index_ - local_index_);
195  if (index_diff > (KnowledgeRecord::Integer)buffer_.size())
196  {
198  }
199 
200  KnowledgeRecord::Integer requested_index = local_index_ + position;
201 
202  if (0 <= requested_index &&
203  (*index_ - (KnowledgeRecord::Integer)buffer_.size()) <= requested_index &&
204  requested_index <= *index_)
205  {
206  size_t index =
207  (size_t)increment(local_index_, (KnowledgeRecord::Integer)position);
208 
209  return context_->get(buffer_.vector_[index]);
210  }
211  else
212  {
213  std::stringstream message;
214  message << "CircularBufferConsumer::inspect: ";
215  message << "Invalid access for relative position " << position
216  << " when buffer index is ";
217  message << *index_ << " and local index is : " << local_index_
218  << " and size is : " << size() << "\n";
219  throw exceptions::IndexException(message.str());
220  }
221 }
222 
223 inline std::vector<KnowledgeRecord> CircularBufferConsumer::inspect(
224  KnowledgeRecord::Integer position, size_t count) const
225 {
226  check_context(__func__);
227 
228  ContextGuard context_guard(*context_);
229 
230  // If buffer overflowed, update local index to last valid value - 1
231  KnowledgeRecord::Integer index_diff = (*index_ - local_index_);
232  if (index_diff > (KnowledgeRecord::Integer)buffer_.size())
233  {
235  }
236 
237  KnowledgeRecord::Integer requested_index = local_index_ + position;
238 
239  if (0 <= requested_index &&
240  (*index_ - (KnowledgeRecord::Integer)buffer_.size()) <= requested_index &&
241  requested_index <= *index_)
242  {
245 
246  std::vector<KnowledgeRecord> result;
247 
248  for (size_t i = 0; i < count; ++i, index = increment(index, 1))
249  {
250  result.push_back(buffer_[(size_t)index]);
251  }
252 
253  return result;
254  }
255  else
256  {
257  std::stringstream message;
258  message << "CircularBufferConsumer::inspect: ";
259  message << "Invalid access for relative position " << position
260  << " when buffer index is ";
261  message << *index_ << " and local index is : " << local_index_
262  << " and size is : " << size() << "\n";
263  throw exceptions::IndexException(message.str());
264  }
265 }
266 
268 {
269  return name_;
270 }
271 
272 inline size_t CircularBufferConsumer::size(void) const
273 {
274  return buffer_.size();
275 }
276 
277 inline size_t CircularBufferConsumer::remaining(void) const
278 {
279  check_context(__func__);
280 
281  ContextGuard context_guard(*context_);
282 
283  return *index_ - local_index_;
284 }
285 
286 inline size_t CircularBufferConsumer::count(void) const
287 {
288  check_context(__func__);
289 
290  ContextGuard context_guard(*context_);
291 
292  return std::min((size_t)(*index_ + 1), buffer_.size());
293 }
294 
296 {
297  check_context(__func__);
298 
299  ContextGuard context_guard(*context_);
300 
301  buffer_.resize(-1, false);
302 }
303 
305 {
306  if (context_ && name_ != "")
307  {
308  ContextGuard context_guard(*context_);
309 
310  local_index_ = *index_;
311  }
312  else
313  {
315  "CircularBufferConsumer::resync: "
316  " context is null or name hasn't been set.");
317  }
318 }
319 
321  const std::string& name, KnowledgeBase& knowledge)
322 {
323  if (name != "")
324  {
325  ContextGuard context_guard(knowledge);
326  name_ = name;
327  context_ = &(knowledge.get_context());
328  index_.set_name(name + ".index", knowledge);
329  buffer_.set_name(name, knowledge);
330  local_index_ = -1;
331  }
332  else
333  {
335  "CircularBufferConsumer::set_name: empty name provided.");
336  }
337 }
338 
340  const std::string& name, Variables& knowledge)
341 {
342  if (name != "")
343  {
344  ContextGuard context_guard(*knowledge.get_context());
345  name_ = name;
346  context_ = knowledge.get_context();
347  index_.set_name(name + ".index", knowledge);
348  buffer_.set_name(name, knowledge);
349  local_index_ = -1;
350  }
351  else
352  {
354  "CircularBufferConsumer::set_name: empty name provided.");
355  }
356 }
357 
359 {
360  check_context(__func__);
361 
362  local_index_ = index;
363 }
364 
365 inline std::vector<KnowledgeRecord> CircularBufferConsumer::consume_latest(
366  size_t count) const
367 {
368  check_all(__func__);
369 
370  ContextGuard context_guard(*context_);
371 
372  std::vector<KnowledgeRecord> result;
373 
375 
376  count = std::min(count, (size_t)(*index_ - local_index_));
377 
378  for (size_t i = 0; i < count; ++i, cur = increment(cur, -1))
379  {
380  result.push_back(buffer_[(size_t)cur]);
381  }
382 
383  // note the difference here is that reading the latest will change index
384  local_index_ = *index_;
385 
386  return result;
387 }
388 
389 inline std::vector<KnowledgeRecord> CircularBufferConsumer::consume_latest(
390  size_t count, size_t& dropped) const
391 {
392  check_all(__func__);
393 
394  ContextGuard context_guard(*context_);
395 
396  std::vector<KnowledgeRecord> result;
397 
398  dropped = get_dropped();
399 
401 
402  count = std::min(count, (size_t)(*index_ - local_index_));
403 
404  for (size_t i = 0; i < count; ++i, cur = increment(cur, -1))
405  {
406  result.push_back(buffer_[(size_t)cur]);
407  }
408 
409  // note the difference here is that reading the latest will change index
410  local_index_ = *index_;
411 
412  return result;
413 }
414 
415 inline std::vector<KnowledgeRecord> CircularBufferConsumer::consume_earliest(
416  size_t count) const
417 {
418  check_all(__func__);
419 
420  ContextGuard context_guard(*context_);
421  std::vector<KnowledgeRecord> result;
422 
423  KnowledgeRecord::Integer index_diff = (*index_ - local_index_);
424 
425  // If buffer overflowed, update local index to last valid value - 1
426  if (index_diff > (KnowledgeRecord::Integer)buffer_.size())
427  {
429  index_diff = (KnowledgeRecord::Integer)buffer_.size();
430  }
431 
432  count = std::min(count, (size_t)index_diff);
433 
434  // start is either 0 or index_ + 1
436  index_diff < (KnowledgeRecord::Integer)buffer_.size()
437  ? increment(local_index_, 1)
439 
440  for (size_t i = 0; i < count; ++i, cur = increment(cur, 1))
441  {
442  result.push_back(buffer_[(size_t)cur]);
443  }
444 
445  local_index_ += (KnowledgeRecord::Integer)result.size();
446 
447  return result;
448 }
449 
450 inline std::vector<KnowledgeRecord> CircularBufferConsumer::consume_earliest(
451  size_t count, size_t& dropped) const
452 {
453  check_all(__func__);
454 
455  ContextGuard context_guard(*context_);
456  std::vector<KnowledgeRecord> result;
457 
458  KnowledgeRecord::Integer index_diff = (*index_ - local_index_);
459 
460  dropped = get_dropped();
461 
462  count = std::min(count, (size_t)index_diff);
463 
464  // start is either 0 or index_ + 1
466  index_diff < (KnowledgeRecord::Integer)buffer_.size()
467  ? increment(local_index_, 1)
469 
470  for (size_t i = 0; i < count; ++i, cur = increment(cur, 1))
471  {
472  result.push_back(buffer_[(size_t)cur]);
473  }
474 
475  local_index_ += (KnowledgeRecord::Integer)result.size();
476 
477  return result;
478 }
479 
480 inline size_t CircularBufferConsumer::get_dropped(void) const
481 {
482  check_all(__func__);
483 
484  ContextGuard context_guard(*context_);
485 
486  size_t difference = remaining();
487  size_t buffer_size = size();
488 
489  if (difference > buffer_size)
490  {
491  return difference - buffer_size;
492  }
493  else
494  {
495  return 0;
496  }
497 }
498 
499 inline std::vector<KnowledgeRecord> CircularBufferConsumer::peek_latest(
500  size_t count) const
501 {
502  check_name(__func__);
503 
504  ContextGuard context_guard(*context_);
505 
506  count = std::min(count, this->count());
507 
508  std::vector<KnowledgeRecord> result;
509 
511 
512  for (size_t i = 0; i < count; ++i, cur = increment(cur, -1))
513  {
514  result.push_back(buffer_[(size_t)cur]);
515  }
516 
517  return result;
518 }
519 
521  void) const
522 {
523  check_all(__func__);
524 
525  ContextGuard context_guard(*context_);
526 
528  index = increment(index, 0);
529 
530  if (count() > 0)
531  return context_->get(buffer_.vector_[(size_t)index]);
532  else
533  return KnowledgeRecord();
534 }
535 
536 } // end containers namespace
537 } // end knowledge namespace
538 } // end madara namespace
539 
540 #endif // _MADARA_KNOWLEDGE_CONTAINERS_CIRCULARBUFFERCONSUMER_INL_
const ThreadSafeContext * context_
madara::knowledge::KnowledgeRecord KnowledgeRecord
An exception for attempting to access an invalid context1.
An exception for out-of-bounds accessing in arrays/vectors.
An exception for setting an invalid name in MADARA.
Definition: NameException.h:16
A thread-safe guard for a context or knowledge base.
Definition: ContextGuard.h:24
This class provides a distributed knowledge base to users.
Definition: KnowledgeBase.h:45
This class encapsulates an entry in a KnowledgeBase.
madara::knowledge::KnowledgeRecord get(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings()) const
Atomically returns the current value of a variable.
Provides an interface for external functions into the MADARA KaRL variable settings.
Definition: Variables.h:53
This container stores a thread-safe, personalized consumer for CircularBuffer instances.
std::string get_name(void) const
Returns the name of the variable.
size_t count(void) const
Returns the number of records in the CircularBufferConsumer.
size_t remaining(void) const
Returns the number of records remaining that have not been consumed with get_latest,...
KnowledgeRecord consume(void) const
Consumes the record at the local index (not the producer index)
void set_name(const std::string &name, KnowledgeBase &knowledge)
Sets the variable name that this refers to.
std::vector< KnowledgeRecord > consume_latest(size_t count) const
Consumes the record at the local index (not the producer index)
knowledge::KnowledgeRecord inspect(KnowledgeRecord::Integer position) const
Retrieves a record at a position relative to local index.
size_t get_dropped(void) const
Returns the number of known drops since last consume.
void resync(void)
Sets the local index to the current buffer index.
KnowledgeRecord::Integer increment(KnowledgeRecord::Integer base, KnowledgeRecord::Integer value) const
Increments the base by the value, using size as a boundary.
std::vector< KnowledgeRecord > consume_earliest(size_t count) const
Consumes earliest records from the local index in the buffer.
Integer index_
Index for latest item in circular buffer.
bool operator!=(const CircularBufferConsumer &value) const
Checks for inequality.
void set_index(KnowledgeRecord::Integer index)
Sets the local index to an arbitrary position.
knowledge::KnowledgeRecord peek_latest(void) const
Peeks at the most recently added record.
ThreadSafeContext * context_
guard for access and changes
size_t size(void) const
Returns the maximum size of the CircularBufferConsumer.
KnowledgeRecord::Integer local_index_
Index for latest item read by.
void resize(void)
Resizes the buffer size to the producer's buffer size.
bool operator==(const CircularBufferConsumer &value) const
Checks for equality.
void set_name(const std::string &var_name, KnowledgeBase &knowledge)
Sets the variable name that this refers to.
Definition: Integer.inl:54
std::vector< VariableReference > vector_
Values of the array.
Definition: Vector.h:616
void set_name(const std::string &var_name, KnowledgeBase &knowledge, int size=-1)
Sets the variable name that this refers to.
Definition: Vector.inl:29
void resize(int size=-1, bool delete_vars=true)
Resizes the vector.
Definition: Vector.cpp:200
size_t size(void) const
Returns the size of the local vector.
Definition: Vector.inl:23
Provides container classes for fast knowledge base access and mutation.
Definition: Barrier.h:27
constexpr string_t string
Provides functions and classes for the distributed knowledge base.
Copyright(c) 2020 Galois.