16 namespace sc = std::chrono;
26 in_buffer.emplace_back(std::move(name), std::move(record));
34 using pair_type = std::pair<std::string, KnowledgeRecord>;
35 using vector_type = std::vector<pair_type>;
36 using iterator_type = vector_type::const_iterator;
38 CheckpointStreamerLister(
const vector_type& vec) :
vec_(&vec) {}
40 void start(
const CheckpointSettings& settings)
override
46 std::pair<const char*, const KnowledgeRecord*> next()
override
48 std::pair<const char*, const KnowledgeRecord*> ret{
nullptr,
nullptr};
55 ret.first =
iter_->first.c_str();
56 ret.second = &
iter_->second;
71 auto period = sc::microseconds(int64_t(1000000 / self->write_hertz_));
72 auto wakeup = sc::steady_clock::now() + period;
75 "CheckpointStreamer::thread_main:"
76 " created thread at %f hertz (%d ns period)\n",
77 self->write_hertz_, sc::duration_cast<sc::nanoseconds>(period).count());
79 self->settings_.variables_lister =
nullptr;
81 while (self->keep_running_.test_and_set())
87 swap(self->in_buffer, self->out_buffer);
90 "CheckpointStreamer::thread_main:"
91 " woke up after %d ns and found %d updates\n",
92 sc::duration_cast<sc::nanoseconds>(period).count(),
93 self->out_buffer.size());
96 if (self->out_buffer.size() > 0)
98 CheckpointStreamerLister lister{
self->out_buffer};
99 self->settings_.variables_lister = &lister;
101 self->context_->save_checkpoint(self->settings_);
103 self->settings_.variables_lister =
nullptr;
105 self->out_buffer.clear();
#define madara_logger_log(loggering, level,...)
Fast version of the madara::logger::log method.
Implementation of BaseStreamer which writes updates to a Madara checkpoint file.
std::vector< pair_type > in_buffer
void enqueue(std::string name, KnowledgeRecord record) override
Implementation of BaseStreamer::enqueue, which stores the given parameters in an in-memory buffer,...
ThreadSafeContext * context_
~CheckpointStreamer() override
static void thread_main(CheckpointStreamer *self)
A thread-safe guard for a context or knowledge base.
This class encapsulates an entry in a KnowledgeBase.
Provides functions and classes for the distributed knowledge base.
Duration sleep_until(TimeValue wake)
Copyright(c) 2020 Galois.