MADARA  3.4.1
CheckpointSettings.h
Go to the documentation of this file.
1 #ifndef _MADARA_KNOWLEDGE_CHECKPOINTSETTINGS_H_
2 #define _MADARA_KNOWLEDGE_CHECKPOINTSETTINGS_H_
3 
12 #include <string>
13 #include <map>
14 #include <memory>
15 
17 #include "madara/utility/Utility.h"
20 #include <stdio.h>
21 
22 namespace madara
23 {
24 namespace knowledge
25 {
26 class ThreadSafeContext;
27 class VariablesLister;
28 
36 {
37 public:
42 
47  : buffer_size(1024000),
48  clear_knowledge(false),
49  filename(""),
51  last_timestamp(0),
54  states(0),
56  override_timestamp(false),
57  override_lamport(false),
58  keep_open(false),
59  initial_state(0),
60  last_state(-1),
61  reset_checkpoint(true),
62  ignore_header_check(false)
63  {
64  }
65 
89  CheckpointSettings(size_t t_buffer_size, bool t_clear_knowledge,
90  std::string t_filename = "", uint64_t t_initial_timestamp = 0,
91  uint64_t t_last_timestamp = 0, uint64_t t_initial_lamport_clock = 0,
92  uint64_t t_last_lamport_clock = 0, std::string t_originator = "",
93  const std::vector<std::string>& t_prefixes = {}, uint64_t t_states = 0,
94  std::string t_version = "", bool t_override_timestamp = false,
95  bool t_override_lamport = false, bool t_keep_open = false,
96  uint64_t t_initial_state = 0, uint64_t t_last_state = (uint64_t)-1,
97  bool t_reset_checkpoint = true, bool t_ignore_header_check = false,
98  VariablesLister* t_variables_lister = nullptr,
99  size_t t_max_buffer_size = 2000000000)
100  : buffer_size(t_buffer_size),
101  clear_knowledge(t_clear_knowledge),
102  filename(t_filename),
103  initial_timestamp(t_initial_timestamp),
104  last_timestamp(t_last_timestamp),
105  initial_lamport_clock(t_initial_lamport_clock),
106  last_lamport_clock(t_last_lamport_clock),
107  originator(t_originator),
108  prefixes(t_prefixes),
109  states(t_states),
110  version(t_version),
111  override_timestamp(t_override_timestamp),
112  override_lamport(t_override_lamport),
113  keep_open(t_keep_open),
114  initial_state(t_initial_state),
115  last_state(t_last_state),
116  reset_checkpoint(t_reset_checkpoint),
117  ignore_header_check(t_ignore_header_check),
118  variables_lister(t_variables_lister),
119  max_buffer_size(t_max_buffer_size)
120  {
121  }
122 
127  CheckpointSettings(const CheckpointSettings& rhs) = default;
128  /*
129  : buffer_size (rhs.buffer_size),
130  clear_knowledge (rhs.clear_knowledge),
131  filename (rhs.filename),
132  initial_timestamp (rhs.initial_timestamp),
133  last_timestamp (rhs.last_timestamp),
134  initial_lamport_clock (rhs.initial_lamport_clock),
135  last_lamport_clock (rhs.last_lamport_clock),
136  originator (rhs.originator),
137  prefixes (rhs.prefixes),
138  states (rhs.states),
139  version (rhs.version),
140  override_timestamp (rhs.override_timestamp),
141  override_lamport (rhs.override_lamport),
142  buffer_filters (rhs.buffer_filters),
143  keep_open (rhs.keep_open),
144  initial_state (rhs.initial_state),
145  last_state (rhs.last_state),
146  reset_checkpoint (rhs.reset_checkpoint),
147  ignore_header_check (rhs.ignore_header_check),
148  checkpoint_file (rhs.checkpoint_file)
149  {
150  }*/
151 
155  ~CheckpointSettings() = default;
156 
164  int encode(char* source, int size, int max_size) const
165  {
166  // encode from front to back
167  for (filters::BufferFilters::const_iterator i = buffer_filters.begin();
168  i != buffer_filters.end(); ++i)
169  {
171  "CheckpointSettings::encode: size before encode: "
172  " %d of %d\n",
173  size, max_size);
174 
175  size = (*i)->encode(source, size, max_size);
176 
178  "CheckpointSettings::encode: size after encode: "
179  " %d of %d\n",
180  size, max_size);
181 
182  if (max_size > size + 20)
183  {
184  memmove(source + 20, source, size);
185 
187  header.read(*i);
188  header.size = (uint64_t)size;
189 
190  int64_t buffer_remaining = 20;
191 
192  header.write((char*)source, buffer_remaining);
193 
195 
197  "CheckpointSettings::encode: header: "
198  "%s:%s within size %d\n",
199  header.id, utility::to_string_version(header.version).c_str(),
200  size);
201  }
202  else
203  {
204  std::stringstream buffer;
205  buffer << "CheckpointSettings::encode: ";
206  buffer << (size + 20) << " 20 byte size encoding cannot fit in ";
207  buffer << max_size << " byte buffer\n";
208 
209  throw exceptions::MemoryException(buffer.str());
210  }
211  }
212 
213  return size;
214  }
215 
223  int decode(char* source, int size, int max_size) const
224  {
225  if (buffer_filters.size() == 0 && !ignore_header_check)
226  {
227  // if we don't have buffer filters, do a check to see if we should
229  int64_t local_buffer_size =
231 
232  header.read((char*)source, local_buffer_size);
233 
234  header.id[4] = 0;
235 
236  std::string header_id(header.id);
237 
238  // id is either karl or KaRL. If it's anything else, then error
239  if (header_id == "karl" || header_id == "KaRL")
240  {
242  "CheckpointSettings::decode: header: "
243  " Detected %s\n",
244  header.id);
245  }
246  else
247  {
249  "CheckpointSettings::decode: header: "
250  " Detected %s, which is not a message or checkpoint header\n",
251  header.id);
252 
253  return 0;
254  }
255  }
256 
257  // decode from back to front
258  for (filters::BufferFilters::const_reverse_iterator i =
259  buffer_filters.rbegin();
260  i != buffer_filters.rend(); ++i)
261  {
263  {
265  int64_t local_buffer_size =
267 
268  header.read((char*)source, local_buffer_size);
269 
270  if (header.size > (uint64_t)max_size)
271  {
272  std::stringstream buffer;
273  buffer << "CheckpointSettings::decode: ";
274  buffer << header.size << " byte size encoding cannot fit in ";
275  buffer << max_size << " byte buffer\n";
276 
277  throw exceptions::MemoryException(buffer.str());
278  }
279 
281  "CheckpointSettings::decode: header: "
282  " %s:%s\n",
283  header.id, utility::to_string_version(header.version).c_str());
284 
285  if (*i == 0)
286  {
288  "CheckpointSettings::decode: filter is null somehow\n");
289 
290  return 0;
291  }
292  else
293  {
296  "CheckpointSettings::decode: filter is not null\n");
297  }
298 
299  if (ignore_header_check || header.check_filter(*i))
300  {
302  "CheckpointSettings::decode: buffer filter %s is a match\n",
303  header.id);
304  }
305  else
306  {
308  "CheckpointSettings::decode: buffer filter %s doesn't match."
309  " Returning 0.\n",
310  header.id);
311 
312  return 0;
313  }
314 
316  "CheckpointSettings::decode: size before decode: "
317  " %d of %d (header.size=%d)\n",
318  size, max_size, (int)header.size);
319 
320  size =
321  (*i)->decode(source + filters::BufferFilterHeader::encoded_size(),
322  (int)header.size, max_size);
323 
325  "CheckpointSettings::decode: size after decode: "
326  " %d of %d (header.size=%d)\n",
327  size, max_size, (int)header.size);
328 
329  if (size > 0)
330  {
331  memmove(source, source + filters::BufferFilterHeader::encoded_size(),
332  size);
333  }
334  }
335  else
336  {
337  std::stringstream buffer;
338  buffer << "CheckpointSettings::decode: ";
339  buffer << size << " byte size encoding cannot fit in ";
340  buffer << max_size << " byte buffer\n";
341 
342  throw exceptions::MemoryException(buffer.str());
343  }
344  }
345 
346  return size;
347  }
348 
352  size_t buffer_size;
353 
358 
363 
368 
372  uint64_t last_timestamp;
373 
378 
383 
388 
392  std::vector<std::string> prefixes;
393 
397  uint64_t states;
398 
403 
409 
415 
420 
426  bool keep_open;
427 
433  uint64_t initial_state;
434 
440  uint64_t last_state;
441 
447 
453 
458  bool playback_simtime = false;
459 
469 
473  size_t max_buffer_size = 2000000000;
474 
475 private:
480  std::shared_ptr<FILE> checkpoint_file;
481 };
482 
484 {
485 public:
486  virtual ~VariablesLister() = default;
487 
488  virtual void start(const CheckpointSettings& settings) = 0;
489  virtual std::pair<const char*, const KnowledgeRecord*> next() = 0;
490 };
491 }
492 }
493 #endif //_MADARA_KNOWLEDGE_CHECKPOINTSETTINGS_H_
#define madara_logger_ptr_log(loggering, level,...)
Fast version of the madara::logger::log method for Logger pointers.
Definition: Logger.h:41
An exception for general memory errors like out-of-memory.
Defines a buffer filter header.
uint64_t size
the size of this header plus the updates
static uint64_t encoded_size(void)
Returns the size of the encoded BufferFilterHeader class.
char * write(char *buffer, int64_t &buffer_remaining)
Writes a BufferFilterHeader instance to a buffer and updates the amount of buffer room remaining.
void read(filters::BufferFilter *filter)
Reads relevant fields from a filter.
bool check_filter(filters::BufferFilter *filter)
Checks compatability between the header and the filter.
Holds settings for checkpoints to load or save.
bool playback_simtime
If true, update simtime during playback to match recorded TOI.
size_t max_buffer_size
the max size the buffer can grow to
std::string originator
the originator id of the checkpoint
CheckpointSettings(size_t t_buffer_size, bool t_clear_knowledge, std::string t_filename="", uint64_t t_initial_timestamp=0, uint64_t t_last_timestamp=0, uint64_t t_initial_lamport_clock=0, uint64_t t_last_lamport_clock=0, std::string t_originator="", const std::vector< std::string > &t_prefixes={}, uint64_t t_states=0, std::string t_version="", bool t_override_timestamp=false, bool t_override_lamport=false, bool t_keep_open=false, uint64_t t_initial_state=0, uint64_t t_last_state=(uint64_t) -1, bool t_reset_checkpoint=true, bool t_ignore_header_check=false, VariablesLister *t_variables_lister=nullptr, size_t t_max_buffer_size=2000000000)
Constructor.
bool ignore_header_check
If true, do not perform a header check.
bool override_timestamp
use the timestamps in this class instead of current wallclock time when writing context or checkpoint...
std::string version
the MADARA version
friend ThreadSafeContext
Allow for ThreadSafeContext to update private data members.
bool keep_open
if true, keep the file open to avoid open/close overhead when programmatically iterating through chec...
size_t buffer_size
the size of the buffer needed for the checkpoint
std::shared_ptr< FILE > checkpoint_file
a thread-safe ref-counted file handle for quick access to an open checkpoint binary file
CheckpointSettings(const CheckpointSettings &rhs)=default
Copy constructor.
std::vector< std::string > prefixes
A list of prefixes to save/load.
bool clear_knowledge
If true, during loads, clear the KnowledgeBase first.
uint64_t last_state
the last state number of interest (useful for loading ranges of checkpoint states.
uint64_t initial_lamport_clock
initial lamport clock saved in the checkpoint
VariablesLister * variables_lister
Object which will be used to extract variables for checkpoint saving.
int decode(char *source, int size, int max_size) const
Calls decode on the the buffer filter chain.
uint64_t last_timestamp
final wallclock time saved in the checkpoint
uint64_t states
the number of states checkpointed in the file stream
int encode(char *source, int size, int max_size) const
Calls encode on the the buffer filter chain.
bool reset_checkpoint
If true, resets the checkpoint to start a new diff from this point forward.
uint64_t initial_state
the initial state number of interest (useful for loading ranges of checkpoint states).
uint64_t initial_timestamp
initial wallclock time saved in the checkpoint
filters::BufferFilters buffer_filters
buffer filters.
bool override_lamport
use the lamport clocks in this class instead of KB clock when writing context or checkpoints
~CheckpointSettings()=default
Destructor.
uint64_t last_lamport_clock
final lamport clock saved in the checkpoint
virtual std::pair< const char *, const KnowledgeRecord * > next()=0
virtual void start(const CheckpointSettings &settings)=0
std::list< BufferFilter * > BufferFilters
Definition: BufferFilter.h:65
constexpr string_t string
Provides functions and classes for the distributed knowledge base.
T get(const KnowledgeRecord &kr)
Get the value of a KnowlegeRecord.
Definition: GetRecord.h:121
MADARA_EXPORT utility::Refcounter< logger::Logger > global_logger
Provides utility functions and classes for common tasks and needs.
Definition: IteratorImpl.h:15
std::string to_string_version(uint32_t version)
Converts a MADARA uint32_t version number to human-readable.
Definition: Utility.cpp:64
std::string get_version(void)
Gets the MADARA version number.
Definition: Utility.cpp:23
Copyright(c) 2020 Galois.