15 namespace sc = std::chrono;
29 "CheckpointReader::start:"
39 "CheckpointReader::start:"
40 " could not open file %s for reading. "
41 "Check that file exists and that permissions are appropriate.\n",
48 int length =
file.tellg();
52 "CheckpointReader::start:"
53 " file contains %d bytes.\n",
63 "CheckpointReader::start:"
64 " %d byte file header is greater than CheckpointSettings"
65 " buffer_size of %d.\n",
72 buffer <<
"CheckpointReader::start: ";
73 buffer << bytes <<
" is greater than CheckpointSettings ";
74 buffer <<
"max_buffer_size of ";
76 buffer <<
". Increase buffer_size or max_buffer_size ";
77 buffer <<
" to stop this exception.";
85 "CheckpointReader::start:"
86 " setting max_buffer to %d.\n",
94 "CheckpointReader::start:"
95 " setting max_buffer to %d.\n",
107 std::stringstream message;
108 message <<
"CheckpointReader::start: ";
111 message <<
" does not have enough room for an appropriate header";
119 "CheckpointReader::start:"
120 " reading file: %d bytes read.\n",
129 "CheckpointReader::start:"
130 " invalid file or wrong version. No contextual change.\n");
146 "CheckpointReader::start:"
147 " read File meta. Meta.size=%d. Meta.states=%d\n",
186 "CheckpointReader::next:"
187 " done at state=%d of meta.states=%d\n",
194 "CheckpointReader::next:"
195 " reading 64bit unsigned size at %d byte file offset\n",
204 std::stringstream message;
205 message <<
"CheckpointReader::next: ";
208 message <<
" does not have enough room for a checkpoint";
226 "CheckpointReader::next:"
227 " %d bytes is greater than existing buffer size of %d\n",
235 buffer <<
"CheckpointReader::next: ";
236 buffer << bytes <<
" is greater than CheckpointSettings ";
237 buffer <<
"max_buffer_size of ";
239 buffer <<
". Increase buffer_size or max_buffer_size ";
240 buffer <<
" to stop this exception.";
249 "CheckpointReader::start:"
250 " setting max_buffer to %d.\n",
266 "CheckpointReader::next:"
267 " %d state checkpoint size is %d\n",
276 "CheckpointReader::next:"
277 " reading %d bytes for full checkpoint\n",
282 std::stringstream message;
283 message <<
"CheckpointReader::next: ";
286 message <<
" does not have enough room for ";
288 message <<
" bytes noted in header";
296 "CheckpointReader::next:"
303 "CheckpointReader::next:"
304 " decoding with %d buffer filters with initial size of "
305 "%d bytes and total buffer of %d bytes\n",
317 "CheckpointReader::next: "
318 "decode () returned a negative encoding size. Bad filter/encode.");
326 "CheckpointReader::next: "
327 "Not enough room in buffer for message header");
331 "CheckpointReader::next:"
332 " Reading a checkpoint header with %d byte buffer remaining\n",
347 uint64_t updates_size =
351 "CheckpointReader::next:"
352 " read Checkpoint header. header.size=%d, updates.size=%d\n",
363 "CheckpointReader::next: "
364 "Not enough room in buffer for checkpoint");
368 "CheckpointReader::next:"
369 " state=%d, initial_state=%d, last_state=%d\n",
382 "CheckpointReader::next:"
383 " not a valid state, incrementing by %d bytes.\n",
407 "CheckpointReader::next:"
408 " read record (%d of %d): %s\n",
414 bool prefix_found =
false;
419 "CheckpointReader::next:"
420 " checking record %s against prefix %s\n",
427 "CheckpointReader::next:"
428 " record has the correct prefix.\n");
437 "CheckpointReader::next:"
438 " record does not have the correct prefix. Rejected.\n");
446 return {key, record};
454 uint64_t first_toi = 0UL - 1;
455 uint64_t prev_toi = 0UL - 1;
457 while (self->keep_running_.test_and_set())
459 auto cur =
self->reader_->next();
465 uint64_t cur_toi = cur.second.toi();
469 " record has toi %lu. prev: %lu. first: %lu,. start: %lu\n",
470 cur_toi, prev_toi, first_toi, start_time);
472 if (first_toi == 0UL - 1)
475 prev_toi = first_toi;
476 #ifdef MADARA_FEATURE_SIMTIME
477 if (self->settings_.playback_simtime)
479 utility::sim_time_notify(first_toi, NAN);
480 start_time = first_toi;
485 uint64_t target_time = cur_toi - first_toi + start_time;
488 "CheckpointPlayer: sleep_until: %lu\n", target_time);
494 self->context_->update_record_from_external(
495 cur.first, cur.second, self->update_settings_);
505 if (cur.first.empty())
512 std::cerr <<
"play_until: " << cur.second.toi() <<
" " << target_toi
515 if (cur.second.toi() >= target_toi)
#define madara_logger_ptr_log(loggering, level,...)
Fast version of the madara::logger::log method for Logger pointers.
An exception for bad file interactions.
An exception for attempting to access an invalid context1.
An exception for general memory errors like out-of-memory.
Plays back a checkpoint over time, based on recorded TOI.
KnowledgeUpdateSettings update_settings_
std::unique_ptr< CheckpointReader > reader_
ThreadSafeContext * context_
bool play_until(uint64_t target_toi)
Loads values from checkpoint until it reaches or exceeds toi given.
static void thread_main(CheckpointPlayer *self)
CheckpointSettings & checkpoint_settings
transport::MessageHeader checkpoint_header
void start()
Begin by reading any header information.
std::pair< std::string, KnowledgeRecord > next()
Get the next update from the checkpoint file.
utility::ScopedArray< char > buffer
size_t max_buffer_size
the max size the buffer can grow to
std::string originator
the originator id of the checkpoint
std::string version
the MADARA version
size_t buffer_size
the size of the buffer needed for the checkpoint
std::vector< std::string > prefixes
A list of prefixes to save/load.
uint64_t last_state
the last state number of interest (useful for loading ranges of checkpoint states.
uint64_t initial_lamport_clock
initial lamport clock saved in the checkpoint
int decode(char *source, int size, int max_size) const
Calls decode on the the buffer filter chain.
uint64_t last_timestamp
final wallclock time saved in the checkpoint
uint64_t states
the number of states checkpointed in the file stream
uint64_t initial_state
the initial state number of interest (useful for loading ranges of checkpoint states).
std::string filename
path to files
uint64_t initial_timestamp
initial wallclock time saved in the checkpoint
filters::BufferFilters buffer_filters
buffer filters.
uint64_t last_lamport_clock
final lamport clock saved in the checkpoint
This class encapsulates an entry in a KnowledgeBase.
uint64_t clock
last modification lamport clock time
void set_toi(uint64_t new_toi)
const char * read(const char *buffer, int64_t &buffer_remaining)
Reads a KnowledgeRecord instance from a buffer and updates the amount of buffer room remaining.
int update_record_from_external(const std::string &key, const knowledge::KnowledgeRecord &rhs, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings(true))
Atomically sets if the variable value meets update conditions.
T * get_ptr(void)
get the underlying pointer
T * get(void)
get the underlying pointer
Provides functions and classes for the distributed knowledge base.
T get(const KnowledgeRecord &kr)
Get the value of a KnowlegeRecord.
MADARA_EXPORT utility::Refcounter< logger::Logger > global_logger
uint64_t endian_swap(uint64_t value)
Converts a host format uint64_t into big endian.
std::string to_string_version(uint32_t version)
Converts a MADARA uint32_t version number to human-readable.
int64_t get_time(void)
Returns a time of day in nanoseconds If simtime feature is enabled, this may be simulation time inste...
Duration sleep_until(TimeValue wake)
MADARA_EXPORT bool begins_with(const std::string &input, const std::string &prefix)
Check if input contains prefix at the beginning.
Copyright(c) 2020 Galois.