MADARA  3.4.1
CheckpointPlayer.cpp
Go to the documentation of this file.
1 #include <fstream>
2 #include <chrono>
3 
10 
12 
13 #include "CheckpointPlayer.h"
14 
15 namespace sc = std::chrono;
16 
17 namespace madara
18 {
19 namespace knowledge
20 {
22 {
23  if (stage != 0)
24  {
25  return;
26  }
27 
29  "CheckpointReader::start:"
30  " opening file %s\n",
32 
33  file.open(
34  checkpoint_settings.filename.c_str(), std::ios::in | std::ios::binary);
35 
36  if (!file)
37  {
39  "CheckpointReader::start:"
40  " could not open file %s for reading. "
41  "Check that file exists and that permissions are appropriate.\n",
43  stage = 9;
44  return;
45  }
46 
47  file.seekg(0, file.end);
48  int length = file.tellg();
49  file.seekg(0, file.beg);
50 
52  "CheckpointReader::start:"
53  " file contains %d bytes.\n",
54  (int)length);
55 
56  {
57  size_t bytes = FileHeader::encoded_size();
58 
59  // if bytes is bigger than the buffer
61  {
63  "CheckpointReader::start:"
64  " %d byte file header is greater than CheckpointSettings"
65  " buffer_size of %d.\n",
66  (int)length, (int)checkpoint_settings.buffer_size);
67 
68  // if max buffer does not have enough room
70  {
71  std::stringstream buffer;
72  buffer << "CheckpointReader::start: ";
73  buffer << bytes << " is greater than CheckpointSettings ";
74  buffer << "max_buffer_size of ";
76  buffer << ". Increase buffer_size or max_buffer_size ";
77  buffer << " to stop this exception.";
78 
80  }
81  else // if max_buffer_size is enough room
82  {
83  max_buffer = bytes;
85  "CheckpointReader::start:"
86  " setting max_buffer to %d.\n",
87  (int)max_buffer);
88  }
89  }
90  else
91  {
94  "CheckpointReader::start:"
95  " setting max_buffer to %d.\n",
96  (int)max_buffer);
97  }
98  } // end scope
99 
101 
102  buffer = new char[max_buffer];
103  current = buffer.get_ptr();
104 
105  if (!file.read(buffer.get(), FileHeader::encoded_size()))
106  {
107  std::stringstream message;
108  message << "CheckpointReader::start: ";
109  message << "file ";
110  message << checkpoint_settings.filename;
111  message << " does not have enough room for an appropriate header";
112  throw exceptions::FileException(message.str());
113  }
114  total_read = file.tellg();
115 
116  buffer_remaining = (int64_t)total_read;
117 
119  "CheckpointReader::start:"
120  " reading file: %d bytes read.\n",
121  (int)total_read);
122 
124 
127  {
129  "CheckpointReader::start:"
130  " invalid file or wrong version. No contextual change.\n");
131  stage = 9;
132  return;
133  }
134 
135  // if there was something in the file, and it was the right header
136 
138 
144 
146  "CheckpointReader::start:"
147  " read File meta. Meta.size=%d. Meta.states=%d\n",
148  (int)meta.size, (int)meta.states);
149 
155  if (meta.states == 0)
156  {
157  stage = 9;
158  return;
159  }
160 
161  stage = 1;
162  state = 0;
163 }
164 
165 std::pair<std::string, KnowledgeRecord> CheckpointReader::next()
166 {
167  if (stage == 0)
168  {
169  start();
170  }
171 
172  if (stage == 9)
173  {
174  return {};
175  }
176 
177  // Outer loop for progressing through stages
178  for (;;)
179  {
180  // We're iterating to next state
181  if (stage == 1)
182  {
184  {
186  "CheckpointReader::next:"
187  " done at state=%d of meta.states=%d\n",
188  (int)state, (int)meta.states);
189  stage = 9;
190  return {};
191  }
192 
194  "CheckpointReader::next:"
195  " reading 64bit unsigned size at %d byte file offset\n",
196  (int)checkpoint_start);
197 
198  // set the file pointer to the checkpoint header start
199  // fseek (file, (long)checkpoint_start, SEEK_SET);
200  file.seekg(checkpoint_start, file.beg);
201 
202  if (!file.read((char*)&checkpoint_size, sizeof(checkpoint_size)))
203  {
204  std::stringstream message;
205  message << "CheckpointReader::next: ";
206  message << "file ";
207  message << checkpoint_settings.filename;
208  message << " does not have enough room for a checkpoint";
209  throw exceptions::FileException(message.str());
210  }
211 
212  // total_read = fread (&checkpoint_size,
213  // 1, sizeof (checkpoint_size), file);
214 
215  total_read = sizeof(checkpoint_size);
216 
218 
219  {
220  size_t bytes = checkpoint_size;
221 
222  // if bytes is larger than the existing buffer size
223  if(bytes > (size_t)max_buffer)
224  {
226  "CheckpointReader::next:"
227  " %d bytes is greater than existing buffer size of %d\n",
228  (int)checkpoint_size, (int)max_buffer);
229 
230  // if the buffer needed is bigger than the maximum size
233  {
234  std::stringstream buffer;
235  buffer << "CheckpointReader::next: ";
236  buffer << bytes << " is greater than CheckpointSettings ";
237  buffer << "max_buffer_size of ";
239  buffer << ". Increase buffer_size or max_buffer_size ";
240  buffer << " to stop this exception.";
241 
242  throw exceptions::MemoryException(buffer.str());
243  }
244  else // max buffer needs to be set to the max size
245  {
246  //max_buffer = checkpoint_settings.max_buffer_size;
249  "CheckpointReader::start:"
250  " setting max_buffer to %d.\n",
251  (int)max_buffer);
252  }
253  }
254 
256  buffer = new char[max_buffer];
257  } // end scope
258 
259 
260  if (checkpoint_settings.buffer_filters.size() > 0)
261  {
263  }
264 
266  "CheckpointReader::next:"
267  " %d state checkpoint size is %d\n",
268  (int)state, (int)checkpoint_size);
269 
270  // set the file pointer to the checkpoint header start
271  file.seekg(checkpoint_start, file.beg);
272 
274 
276  "CheckpointReader::next:"
277  " reading %d bytes for full checkpoint\n",
278  (int)checkpoint_size);
279 
280  if (!file.read(buffer.get(), checkpoint_size))
281  {
282  std::stringstream message;
283  message << "CheckpointReader::next: ";
284  message << "file ";
285  message << checkpoint_settings.filename;
286  message << " does not have enough room for ";
287  message << checkpoint_size;
288  message << " bytes noted in header";
289  throw exceptions::FileException(message.str());
290  }
291  total_read = (int64_t)checkpoint_size;
292 
293  current = buffer.get_ptr();
294 
296  "CheckpointReader::next:"
297  " read %d bytes\n",
298  (int)total_read);
299 
300  buffer_remaining = (int64_t)total_read;
301 
303  "CheckpointReader::next:"
304  " decoding with %d buffer filters with initial size of "
305  "%d bytes and total buffer of %d bytes\n",
307  (int)max_buffer);
308 
309  // call decode with any buffer filters
311  current, (int)total_read, (int)max_buffer);
312 
313  if (buffer_remaining <= 0)
314  {
315  stage = 9;
317  "CheckpointReader::next: "
318  "decode () returned a negative encoding size. Bad filter/encode.");
319  }
320 
321  if (buffer_remaining <
323  {
324  stage = 9;
326  "CheckpointReader::next: "
327  "Not enough room in buffer for message header");
328  }
329 
331  "CheckpointReader::next:"
332  " Reading a checkpoint header with %d byte buffer remaining\n",
333  (int)buffer_remaining);
334 
336 
337  if (state == 0)
338  {
340  }
341 
342  if (state == meta.states - 1)
343  {
345  }
346 
347  uint64_t updates_size =
349 
351  "CheckpointReader::next:"
352  " read Checkpoint header. header.size=%d, updates.size=%d\n",
353  (int)checkpoint_header.size, (int)updates_size);
354 
360  if (updates_size > (uint64_t)buffer_remaining)
361  {
363  "CheckpointReader::next: "
364  "Not enough room in buffer for checkpoint");
365  } // end if allocation is needed
366 
368  "CheckpointReader::next:"
369  " state=%d, initial_state=%d, last_state=%d\n",
372 
375  {
376  stage = 2;
377  update = 0;
378  }
379  else
380  {
382  "CheckpointReader::next:"
383  " not a valid state, incrementing by %d bytes.\n",
384  (int)updates_size);
385 
386  current += updates_size;
387  }
388  ++state;
389  }
390 
391  // We're iterating to next update
392  if (stage == 2)
393  {
395  {
396  stage = 1;
397  continue;
398  }
399 
400  std::string key;
402  record.clock = checkpoint_header.clock;
404  current = (char*)record.read(current, key, buffer_remaining);
405 
407  "CheckpointReader::next:"
408  " read record (%d of %d): %s\n",
409  (int)update, (int)checkpoint_header.updates, key.c_str());
410 
411  // check if the prefix is allowed
412  if (checkpoint_settings.prefixes.size() > 0)
413  {
414  bool prefix_found = false;
415  for (size_t j = 0;
416  j < checkpoint_settings.prefixes.size() && !prefix_found; ++j)
417  {
419  "CheckpointReader::next:"
420  " checking record %s against prefix %s\n",
421  key.c_str(), checkpoint_settings.prefixes[j].c_str());
422 
424  key, checkpoint_settings.prefixes[j]))
425  {
427  "CheckpointReader::next:"
428  " record has the correct prefix.\n");
429 
430  prefix_found = true;
431  } // end if prefix success
432  } // end for all prefixes
433 
434  if (!prefix_found)
435  {
437  "CheckpointReader::next:"
438  " record does not have the correct prefix. Rejected.\n");
439 
440  ++update;
441  continue;
442  } // end if prefix found
443  } // end if there are prefixes in the checkpoint settings
444 
445  ++update;
446  return {key, record};
447  } // end for all updates
448  }
449 }
450 
452 {
453  uint64_t start_time = utility::get_time();
454  uint64_t first_toi = 0UL - 1;
455  uint64_t prev_toi = 0UL - 1;
456 
457  while (self->keep_running_.test_and_set())
458  {
459  auto cur = self->reader_->next();
460  if (cur.first == "")
461  {
462  break;
463  }
464 
465  uint64_t cur_toi = cur.second.toi();
466 
468  "CheckpointPlayer:"
469  " record has toi %lu. prev: %lu. first: %lu,. start: %lu\n",
470  cur_toi, prev_toi, first_toi, start_time);
471 
472  if (first_toi == 0UL - 1)
473  {
474  first_toi = cur_toi;
475  prev_toi = first_toi;
476 #ifdef MADARA_FEATURE_SIMTIME
477  if (self->settings_.playback_simtime)
478  {
479  utility::sim_time_notify(first_toi, NAN);
480  start_time = first_toi;
481  }
482 #endif
483  }
484 
485  uint64_t target_time = cur_toi - first_toi + start_time;
486 
488  "CheckpointPlayer: sleep_until: %lu\n", target_time);
489 
490  utility::sleep_until(target_time);
491 
492  prev_toi = cur_toi;
493 
494  self->context_->update_record_from_external(
495  cur.first, cur.second, self->update_settings_);
496  }
497 }
498 
499 bool CheckpointPlayer::play_until(uint64_t target_toi)
500 {
501  init_reader();
502  for (;;)
503  {
504  auto cur = reader_->next();
505  if (cur.first.empty())
506  {
507  return false;
508  }
509 
511  cur.first, cur.second, update_settings_);
512  std::cerr << "play_until: " << cur.second.toi() << " " << target_toi
513  << std::endl;
514 
515  if (cur.second.toi() >= target_toi)
516  {
517  return true;
518  }
519  }
520 }
521 }
522 } // namespace madara::knowledge
#define madara_logger_ptr_log(loggering, level,...)
Fast version of the madara::logger::log method for Logger pointers.
Definition: Logger.h:41
An exception for bad file interactions.
Definition: FileException.h:16
An exception for attempting to access an invalid context1.
An exception for general memory errors like out-of-memory.
static uint64_t encoded_size(void)
Returns the size of the encoded BufferFilterHeader class.
Plays back a checkpoint over time, based on recorded TOI.
KnowledgeUpdateSettings update_settings_
std::unique_ptr< CheckpointReader > reader_
bool play_until(uint64_t target_toi)
Loads values from checkpoint until it reaches or exceeds toi given.
static void thread_main(CheckpointPlayer *self)
CheckpointSettings & checkpoint_settings
transport::MessageHeader checkpoint_header
void start()
Begin by reading any header information.
std::pair< std::string, KnowledgeRecord > next()
Get the next update from the checkpoint file.
utility::ScopedArray< char > buffer
size_t max_buffer_size
the max size the buffer can grow to
std::string originator
the originator id of the checkpoint
std::string version
the MADARA version
size_t buffer_size
the size of the buffer needed for the checkpoint
std::vector< std::string > prefixes
A list of prefixes to save/load.
uint64_t last_state
the last state number of interest (useful for loading ranges of checkpoint states.
uint64_t initial_lamport_clock
initial lamport clock saved in the checkpoint
int decode(char *source, int size, int max_size) const
Calls decode on the the buffer filter chain.
uint64_t last_timestamp
final wallclock time saved in the checkpoint
uint64_t states
the number of states checkpointed in the file stream
uint64_t initial_state
the initial state number of interest (useful for loading ranges of checkpoint states).
uint64_t initial_timestamp
initial wallclock time saved in the checkpoint
filters::BufferFilters buffer_filters
buffer filters.
uint64_t last_lamport_clock
final lamport clock saved in the checkpoint
static uint32_t encoded_size(void)
Returns the size of the encoded FileHeader class, which may be different from sizeof (FileHeader) bec...
Definition: FileHeader.cpp:26
uint64_t initial_timestamp
the timestamp for the initial checkpointing
Definition: FileHeader.h:107
char originator[64]
the originator of the message (host:port)
Definition: FileHeader.h:127
uint64_t size
the size of this header plus the updates
Definition: FileHeader.h:97
static bool file_header_test(const char *buffer)
Tests the buffer for a normal message identifier.
Definition: FileHeader.h:89
uint64_t last_timestamp
the timestamp for the last checkpoint
Definition: FileHeader.h:112
uint64_t states
the number of states checkpointed in the file stream
Definition: FileHeader.h:102
virtual const char * read(const char *buffer, int64_t &buffer_remaining)
Reads a FileHeader instance from a buffer and updates the amount of buffer room remaining.
Definition: FileHeader.cpp:31
uint32_t karl_version
Version of KaRL installed when file was created.
Definition: FileHeader.h:122
This class encapsulates an entry in a KnowledgeBase.
uint64_t clock
last modification lamport clock time
const char * read(const char *buffer, int64_t &buffer_remaining)
Reads a KnowledgeRecord instance from a buffer and updates the amount of buffer room remaining.
int update_record_from_external(const std::string &key, const knowledge::KnowledgeRecord &rhs, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings(true))
Atomically sets if the variable value meets update conditions.
virtual const char * read(const char *buffer, int64_t &buffer_remaining)
Reads a MessageHeader instance from a buffer and updates the amount of buffer room remaining.
uint32_t updates
the number of knowledge variable updates in the message
uint64_t clock
the clock of the sender when the message was generated
uint64_t size
the size of this header plus the updates
static uint32_t static_encoded_size(void)
Returns the size of the encoded MessageHeader class.
virtual uint32_t encoded_size(void) const
Returns the size of the encoded MessageHeader class, which may be different from sizeof (MessageHeade...
T * get_ptr(void)
get the underlying pointer
Definition: ScopedArray.inl:64
T * get(void)
get the underlying pointer
Definition: ScopedArray.inl:78
constexpr string_t string
constexpr binary_t binary
Provides functions and classes for the distributed knowledge base.
T get(const KnowledgeRecord &kr)
Get the value of a KnowlegeRecord.
Definition: GetRecord.h:121
MADARA_EXPORT utility::Refcounter< logger::Logger > global_logger
uint64_t endian_swap(uint64_t value)
Converts a host format uint64_t into big endian.
Definition: Utility.inl:134
std::string to_string_version(uint32_t version)
Converts a MADARA uint32_t version number to human-readable.
Definition: Utility.cpp:64
int64_t get_time(void)
Returns a time of day in nanoseconds If simtime feature is enabled, this may be simulation time inste...
Definition: Utility.inl:265
Duration sleep_until(TimeValue wake)
Definition: Utility.cpp:612
MADARA_EXPORT bool begins_with(const std::string &input, const std::string &prefix)
Check if input contains prefix at the beginning.
Definition: Utility.inl:638
Copyright(c) 2020 Galois.