MADARA  3.4.1
CheckpointStreamer.h
Go to the documentation of this file.
1 #ifndef MADARA_KNOWLEDGE_CHECKPOINT_STREAMER_H_
2 #define MADARA_KNOWLEDGE_CHECKPOINT_STREAMER_H_
3 
4 #include <memory>
5 #include <vector>
6 #include <thread>
7 #include <atomic>
8 #include <chrono>
9 
10 #include "madara/MadaraExport.h"
14 
23 namespace madara
24 {
25 namespace knowledge
26 {
32 class MADARA_EXPORT CheckpointStreamer : public BaseStreamer
33 {
34 public:
46  double write_hertz = 10)
47  : settings_(std::move(settings)),
48  context_(&context),
49  write_hertz_(write_hertz),
50  thread_(thread_main, (keep_running_.test_and_set(), this))
51  {
52  }
53 
65  CheckpointSettings settings, KnowledgeBase& kb, double write_hertz = 10)
66  : CheckpointStreamer(std::move(settings), kb.get_context(), write_hertz)
67  {
68  }
69 
74  void enqueue(std::string name, KnowledgeRecord record) override;
75 
76  // This object spawns a thread which holds a pointer back to this object,
77  // so it cannot be safely copied or moved.
82 
83  ~CheckpointStreamer() override;
84 
85 private:
86  static void thread_main(CheckpointStreamer* self);
87 
88  void terminate()
89  {
90  keep_running_.clear();
91  if (thread_.joinable())
92  {
93  thread_.join();
94  }
95  }
96 
99 
100  using pair_type = std::pair<std::string, KnowledgeRecord>;
101 
102  std::vector<pair_type> in_buffer;
103  std::vector<pair_type> out_buffer;
104 
105  double write_hertz_ = 10;
106 
107  std::atomic_flag keep_running_;
108  std::thread thread_;
109 };
110 }
111 } // namespace madara::knowledge
112 
113 #endif // MADARA_KNOWLEDGE_CHECKPOINT_STREAMER_H_
const ThreadSafeContext * context_
Interface for knowledge update streaming.
Definition: BaseStreamer.h:24
Holds settings for checkpoints to load or save.
Implementation of BaseStreamer which writes updates to a Madara checkpoint file.
std::pair< std::string, KnowledgeRecord > pair_type
CheckpointStreamer(const CheckpointStreamer &)=delete
CheckpointStreamer(CheckpointSettings settings, KnowledgeBase &kb, double write_hertz=10)
Constructor.
CheckpointStreamer(CheckpointStreamer &&)=delete
CheckpointStreamer(CheckpointSettings settings, ThreadSafeContext &context, double write_hertz=10)
Constructor.
CheckpointStreamer & operator=(const CheckpointStreamer &)=delete
CheckpointStreamer & operator=(CheckpointStreamer &&)=delete
This class provides a distributed knowledge base to users.
Definition: KnowledgeBase.h:45
This class encapsulates an entry in a KnowledgeBase.
This class stores variables and their values for use by any entity needing state information in a thr...
constexpr string_t string
Provides functions and classes for the distributed knowledge base.
Copyright(c) 2020 Galois.