MADARA  3.4.1
CheckpointStreamer.cpp
Go to the documentation of this file.
1 
9 #include <chrono>
10 
11 #include "CheckpointStreamer.h"
12 
13 #include "madara/logger/Logger.h"
15 
16 namespace sc = std::chrono;
17 
18 namespace madara
19 {
20 namespace knowledge
21 {
23 {
24  ContextGuard context_guard(*context_);
25 
26  in_buffer.emplace_back(std::move(name), std::move(record));
27 }
28 
29 namespace
30 {
31 class CheckpointStreamerLister : public VariablesLister
32 {
33 public:
34  using pair_type = std::pair<std::string, KnowledgeRecord>;
35  using vector_type = std::vector<pair_type>;
36  using iterator_type = vector_type::const_iterator;
37 
38  CheckpointStreamerLister(const vector_type& vec) : vec_(&vec) {}
39 
40  void start(const CheckpointSettings& settings) override
41  {
42  (void)settings;
43  iter_ = vec_->begin();
44  }
45 
46  std::pair<const char*, const KnowledgeRecord*> next() override
47  {
48  std::pair<const char*, const KnowledgeRecord*> ret{nullptr, nullptr};
49 
50  if (iter_ == vec_->end())
51  {
52  return ret;
53  }
54 
55  ret.first = iter_->first.c_str();
56  ret.second = &iter_->second;
57 
58  ++iter_;
59 
60  return ret;
61  }
62 
63 private:
64  const vector_type* vec_;
65  iterator_type iter_;
66 };
67 }
68 
70 {
71  auto period = sc::microseconds(int64_t(1000000 / self->write_hertz_));
72  auto wakeup = sc::steady_clock::now() + period;
73 
74  madara_logger_log(self->context_->get_logger(), logger::LOG_MINOR,
75  "CheckpointStreamer::thread_main:"
76  " created thread at %f hertz (%d ns period)\n",
77  self->write_hertz_, sc::duration_cast<sc::nanoseconds>(period).count());
78 
79  self->settings_.variables_lister = nullptr;
80 
81  while (self->keep_running_.test_and_set())
82  {
83  {
84  ContextGuard context_guard(*self->context_);
85 
86  using std::swap;
87  swap(self->in_buffer, self->out_buffer);
88 
89  madara_logger_log(self->context_->get_logger(), logger::LOG_TRACE,
90  "CheckpointStreamer::thread_main:"
91  " woke up after %d ns and found %d updates\n",
92  sc::duration_cast<sc::nanoseconds>(period).count(),
93  self->out_buffer.size());
94  }
95 
96  if (self->out_buffer.size() > 0)
97  {
98  CheckpointStreamerLister lister{self->out_buffer};
99  self->settings_.variables_lister = &lister;
100 
101  self->context_->save_checkpoint(self->settings_);
102 
103  self->settings_.variables_lister = nullptr;
104 
105  self->out_buffer.clear();
106  }
107 
109  wakeup += period;
110  }
111 }
112 
114 {
115  terminate();
116 }
117 }
118 } // namespace madara::knowledge
iterator_type iter_
const vector_type * vec_
#define madara_logger_log(loggering, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
Implementation of BaseStreamer which writes updates to a Madara checkpoint file.
void enqueue(std::string name, KnowledgeRecord record) override
Implementation of BaseStreamer::enqueue, which stores the given parameters in an in-memory buffer,...
static void thread_main(CheckpointStreamer *self)
A thread-safe guard for a context or knowledge base.
Definition: ContextGuard.h:24
This class encapsulates an entry in a KnowledgeBase.
constexpr string_t string
Provides functions and classes for the distributed knowledge base.
Duration sleep_until(TimeValue wake)
Definition: Utility.cpp:612
Copyright(c) 2020 Galois.