33 #ifdef MADARA_CONDITION_MUTEX_CONSTRUCTOR
37 #ifndef _MADARA_NO_KARL_
39 interpreter_(new
madara::expression::Interpreter())
51 #ifndef _MADARA_NO_KARL_
66 MADARA_GUARD_TYPE guard(
mutex_);
71 key_ptr = &key_actual;
81 return &
map_[*key_ptr];
89 MADARA_GUARD_TYPE guard(
mutex_);
95 key_ptr = &key_actual;
105 auto iter =
map_.lower_bound(*key_ptr);
106 if (iter ==
map_.end() || iter->first != *key_ptr)
108 iter =
map_.emplace_hint(iter, std::piecewise_construct,
109 std::forward_as_tuple(*key_ptr), std::forward_as_tuple());
120 MADARA_GUARD_TYPE guard(
mutex_);
126 key_ptr = &key_actual;
136 KnowledgeMap::const_iterator found =
map_.find(*key_ptr);
144 MADARA_GUARD_TYPE guard(
mutex_);
154 record->quality = record->write_quality;
170 MADARA_GUARD_TYPE guard(
mutex_);
180 record->quality = record->write_quality;
194 const unsigned char* value,
size_t size,
197 MADARA_GUARD_TYPE guard(
mutex_);
207 record->quality = record->write_quality;
221 const unsigned char* value,
size_t size,
224 MADARA_GUARD_TYPE guard(
mutex_);
234 record->quality = record->write_quality;
250 int return_value = 0;
251 MADARA_GUARD_TYPE guard(
mutex_);
260 return_value = record->
read_file(filename);
261 record->quality = record->write_quality;
268 return return_value = -1;
281 MADARA_GUARD_TYPE guard(
mutex_);
286 key_ptr = &key_actual;
292 KnowledgeMap::iterator found =
map_.find(*key_ptr);
297 if (found !=
map_.end())
298 return map_[*key_ptr].quality;
312 MADARA_GUARD_TYPE guard(
mutex_);
317 key_ptr = &key_actual;
323 KnowledgeMap::iterator found =
map_.find(*key_ptr);
328 if (found !=
map_.end())
329 return map_[*key_ptr].write_quality;
338 uint32_t quality,
bool force_update,
344 MADARA_GUARD_TYPE guard(
mutex_);
349 key_ptr = &key_actual;
359 KnowledgeMap::iterator found =
map_.find(*key_ptr);
364 if (found ==
map_.end() || force_update || quality > found->second.quality)
365 map_[*key_ptr].quality = quality;
368 return map_[*key_ptr].quality;
378 MADARA_GUARD_TYPE guard(
mutex_);
383 key_ptr = &key_actual;
390 map_[*key_ptr].write_quality = quality;
406 MADARA_GUARD_TYPE guard(
mutex_);
411 key_ptr = &key_actual;
421 KnowledgeMap::iterator found =
map_.find(*key_ptr);
432 if (quality < found->second.quality)
437 else if (quality == found->second.quality && clock < found->second.clock)
441 else if (found->second == rhs)
446 auto ret =
map_.emplace(std::piecewise_construct,
447 std::forward_as_tuple(*key_ptr), std::make_tuple());
454 if (result != -2 && record.
quality != quality)
458 if (clock > record.
clock)
459 record.
clock = clock;
492 MADARA_GUARD_TYPE guard(
mutex_);
497 key_ptr = &key_actual;
507 KnowledgeMap::iterator found =
map_.find(*key_ptr);
518 if (quality < found->second.quality)
523 else if (quality == found->second.quality && clock < found->second.clock)
527 else if (found->second == rhs)
532 auto ret =
map_.emplace(std::piecewise_construct,
533 std::forward_as_tuple(*key_ptr), std::make_tuple());
540 if (result != -2 && record.
quality != quality)
544 if (clock > record.
clock)
545 record.
clock = clock;
570 const std::string& value, uint32_t quality, uint64_t clock,
578 MADARA_GUARD_TYPE guard(
mutex_);
583 key_ptr = &key_actual;
593 KnowledgeMap::iterator found =
map_.find(*key_ptr);
604 if (quality < found->second.quality)
609 else if (quality == found->second.quality && clock < found->second.clock)
613 else if (found->second == rhs)
618 auto ret =
map_.emplace(std::piecewise_construct,
619 std::forward_as_tuple(*key_ptr), std::make_tuple());
626 if (result != -2 && record.
quality != quality)
630 if (clock > record.
clock)
631 record.
clock = clock;
664 MADARA_GUARD_TYPE guard(
mutex_);
669 key_ptr = &key_actual;
679 KnowledgeMap::iterator found =
map_.find(*key_ptr);
686 if (rhs.
quality < found->second.quality)
691 else if (rhs.
quality == found->second.quality &&
692 rhs.
clock < found->second.clock)
696 found->second.set_full(rhs);
703 auto ret =
map_.emplace(std::piecewise_construct,
704 std::forward_as_tuple(*key_ptr), std::forward_as_tuple(rhs));
709 ret.first->second = rhs;
716 if (rhs.
clock >= this->clock_)
736 MADARA_GUARD_TYPE guard(
mutex_);
744 if (rhs.
quality < record->quality)
749 else if (rhs.
quality == record->quality && rhs.
clock < record->clock)
759 if (rhs.
clock >= this->clock_)
771 changed_.MADARA_CONDITION_NOTIFY_ONE();
777 MADARA_GUARD_TYPE guard(
mutex_);
778 for (KnowledgeMap::const_iterator i =
map_.begin(); i !=
map_.end(); ++i)
780 if (i->second.exists())
783 i->second.to_string(
", ").c_str());
793 MADARA_GUARD_TYPE guard(
mutex_);
794 std::stringstream buffer;
798 for (KnowledgeMap::const_iterator i =
map_.begin(); i !=
map_.end(); ++i)
803 buffer << record_delimiter;
809 buffer << key_val_delimiter;
811 if (i->second.is_string_type())
815 else if (i->second.type() == i->second.DOUBLE_ARRAY ||
816 i->second.type() == i->second.INTEGER_ARRAY)
822 buffer << i->second.to_string(array_delimiter);
824 if (i->second.is_string_type())
828 else if (i->second.type() == i->second.DOUBLE_ARRAY ||
829 i->second.type() == i->second.INTEGER_ARRAY)
838 target = buffer.str();
848 MADARA_GUARD_TYPE guard(
mutex_);
852 size_t begin_exp = 0;
854 std::stringstream builder;
857 for (std::string::size_type i = 0; i < statement.size(); ++i)
860 if (statement[i] ==
'{')
867 else if (statement[i] ==
'}')
872 statement.substr(begin_exp + 1, i - begin_exp - 1);
874 builder << this->
get(results);
883 builder << statement[i];
891 "KARL COMPILE ERROR : Improperly matched braces in %s\n",
895 return builder.str();
898 #ifndef _MADARA_NO_KARL_
908 MADARA_GUARD_TYPE guard(
mutex_);
913 key_ptr = &key_actual;
933 MADARA_GUARD_TYPE guard(
mutex_);
938 key_ptr = &key_actual;
957 MADARA_GUARD_TYPE guard(
mutex_);
962 key_ptr = &key_actual;
975 #ifdef _MADARA_PYTHON_CALLBACKS_
982 MADARA_GUARD_TYPE guard(
mutex_);
987 key_ptr = &key_actual;
1012 #ifndef _MADARA_NO_KARL_
1016 MADARA_GUARD_TYPE guard(
mutex_);
1021 key_ptr = &key_actual;
1040 MADARA_GUARD_TYPE guard(
mutex_);
1045 key_ptr = &key_actual;
1060 "ThreadSafeContext::compile:"
1062 expression.c_str());
1064 MADARA_GUARD_TYPE guard(
mutex_);
1066 ce.
logic = expression;
1075 MADARA_GUARD_TYPE guard(
mutex_);
1082 MADARA_GUARD_TYPE guard(
mutex_);
1092 unsigned int start,
unsigned int end, std::vector<KnowledgeRecord>& target)
1097 MADARA_GUARD_TYPE guard(
mutex_);
1101 target.resize(end - start + 1);
1103 for (
unsigned int i = 0; start <= end; ++start, ++i)
1105 std::stringstream buffer;
1108 target[i] =
get(buffer.str());
1112 return target.size();
1116 std::map<std::string, knowledge::KnowledgeRecord>& target)
1121 bool matches_found(
false);
1124 if (subject[subject.size() - 1] ==
'*')
1125 subject.resize(subject.size() - 1);
1128 std::string::size_type subject_size = subject.size();
1129 const char* subject_ptr = subject.c_str();
1132 MADARA_GUARD_TYPE guard(
mutex_);
1135 if (expression.size() == 0)
1139 for (KnowledgeMap::iterator i =
map_.begin(); i !=
map_.end(); ++i)
1141 if (i->first.size() >= subject_size)
1143 int result = strncmp(i->first.c_str(), subject_ptr, subject_size);
1147 target[i->first] = i->second;
1148 matches_found =
true;
1150 else if (matches_found)
1159 return target.size();
1166 KnowledgeMap::iterator i =
map_.lower_bound(prefix);
1169 if (i !=
map_.end())
1172 KnowledgeMap::iterator first_match = i;
1173 KnowledgeMap::iterator after_matches =
map_.end();
1174 VariableReferences::iterator match;
1176 size_t num_matches = 0;
1178 size_t prefix_length = prefix.length();
1181 while (i !=
map_.end() && i->first.compare(0, prefix_length, prefix) == 0)
1194 matches.resize(num_matches);
1200 match = matches.begin();
1203 while (i != after_matches)
1221 std::vector<std::string>& next_keys,
1222 std::map<std::string, knowledge::KnowledgeRecord>& result,
bool just_keys)
1229 bool matches_found(
false);
1233 MADARA_GUARD_TYPE guard(
mutex_);
1235 KnowledgeMap::iterator i =
map_.begin();
1239 i =
map_.lower_bound(prefix);
1242 for (; i !=
map_.end(); ++i)
1259 matches_found =
true;
1271 result[i->first] = i->second;
1275 size_t prefix_end = prefix.length() + delimiter.length();
1278 i->first.substr(prefix.length(), delimiter.length());
1280 if (current_delimiter == delimiter && i->first.length() > prefix_end)
1283 size_t key_end = i->first.find(delimiter, prefix_end);
1287 i->first.substr(prefix_end, key_end - prefix_end));
1288 if (current_key != last_key)
1290 next_keys.push_back(current_key);
1291 last_key = current_key;
1297 return result.size();
1304 MADARA_GUARD_TYPE guard(
mutex_);
1306 std::pair<KnowledgeMap::iterator, KnowledgeMap::iterator> iters(
1309 map_.erase(iters.first, iters.second);
1313 std::pair<VariableReferenceMap::iterator, VariableReferenceMap::iterator>
1317 if (prefix.compare(0, prefix.size(), changed.first->first, prefix.size()) ==
1320 changed.second = changed.first;
1323 for (++changed.second; (prefix.compare(0, prefix.size(),
1324 changed.second->first, prefix.size()) == 0) &&
1336 std::pair<VariableReferenceMap::iterator, VariableReferenceMap::iterator>
1342 0, prefix.size(), local_changed.first->first, prefix.size()) == 0)
1344 local_changed.second = local_changed.first;
1347 for (++local_changed.second;
1348 (prefix.compare(0, prefix.size(), local_changed.second->first,
1349 prefix.size()) == 0) &&
1351 ++local_changed.second)
1360 std::pair<KnowledgeMap::iterator, KnowledgeMap::iterator>
1363 std::pair<KnowledgeMap::iterator, KnowledgeMap::iterator> ret(
1367 if (prefix.size() > 0)
1369 ssize_t psz = prefix.size();
1372 ret.second = ret.first =
map_.lower_bound(prefix);
1375 while (ret.second !=
map_.end() &&
1376 ret.second->first.compare(0, psz, prefix) == 0)
1382 std::pair<KnowledgeMap::const_iterator, KnowledgeMap::const_iterator>
1385 std::pair<KnowledgeMap::const_iterator, KnowledgeMap::const_iterator> ret(
1389 if (prefix.size() > 0)
1391 ssize_t psz = prefix.size();
1394 ret.second = ret.first =
map_.lower_bound(prefix);
1397 while (ret.second !=
map_.end() &&
1398 ret.second->first.compare(0, psz, prefix) == 0)
1407 MADARA_GUARD_TYPE guard(
mutex_);
1409 std::pair<KnowledgeMap::const_iterator, KnowledgeMap::const_iterator> iters(
1419 MADARA_GUARD_TYPE guard(
mutex_);
1421 std::pair<KnowledgeMap::const_iterator, KnowledgeMap::const_iterator> iters(
1426 for (; iters.first != iters.second; ++iters.first)
1428 ret.emplace_hint(ret.end(), iters.first->first.substr(prefix.size()),
1429 iters.first->second);
1438 "ThreadSafeContext::copy:"
1439 " copying a context\n");
1444 "ThreadSafeContext::copy:"
1445 " clearing knowledge in target context\n");
1454 std::pair<KnowledgeMap::const_iterator, KnowledgeMap::const_iterator>
1457 if (predicate.suffix ==
"")
1460 "ThreadSafeContext::copy:"
1461 " matching predicate.prefix=%s\n",
1462 predicate.prefix.c_str());
1464 for (; iters.first != iters.second; ++iters.first)
1467 "ThreadSafeContext::copy:"
1468 " looking for %s\n",
1469 iters.first->first.c_str());
1471 auto where =
map_.lower_bound(iters.first->first);
1473 if (where ==
map_.end() || where->first != iters.first->first)
1476 "ThreadSafeContext::copy:"
1478 iters.first->first.c_str());
1480 where =
map_.emplace_hint(
1481 where, iters.first->first, iters.first->second);
1486 "ThreadSafeContext::copy:"
1487 " overwriting %s\n",
1488 iters.first->first.c_str());
1490 where->second = iters.first->second;
1499 "ThreadSafeContext::copy:"
1500 " matching predicate.suffix=%s\n",
1501 predicate.suffix.c_str());
1503 for (; iters.first != iters.second; ++iters.first)
1508 "ThreadSafeContext::copy:"
1509 " looking for %s\n",
1510 iters.first->first.c_str());
1512 auto where =
map_.lower_bound(iters.first->first);
1514 if (where ==
map_.end() || where->first != iters.first->first)
1517 "ThreadSafeContext::copy:"
1519 iters.first->first.c_str());
1521 where =
map_.emplace_hint(
1522 where, iters.first->first, iters.first->second);
1527 "ThreadSafeContext::copy:"
1528 " overwriting %s\n",
1529 iters.first->first.c_str());
1531 where->second = iters.first->second;
1543 std::pair<KnowledgeMap::const_iterator, KnowledgeMap::const_iterator> iters(
1544 source.
map_.begin(), source.
map_.end());
1546 for (; iters.first != iters.second; ++iters.first)
1549 KnowledgeMap::value_type(iters.first->first, iters.first->second));
1557 const CopySet& copy_set,
bool clean_copy,
1565 if (copy_set.size() == 0)
1567 for (KnowledgeMap::const_iterator i = source.
map_.begin();
1568 i != source.
map_.end(); ++i)
1570 map_[i->first] = (i->second);
1577 for (CopySet::const_iterator key = copy_set.begin(); key != copy_set.end();
1581 KnowledgeMap::const_iterator i = source.
map_.find(key->first);
1584 if (i != source.
map_.end())
1586 map_[i->first] = (i->second);
1607 "ThreadSafeContext::save_context:"
1608 " opening file %s\n",
1612 FILE* file = fopen(settings.
filename.c_str(),
"wb");
1626 int64_t buffer_remaining(max_buffer);
1629 "ThreadSafeContext::save_context:"
1630 " allocating %d byte buffer\n",
1635 char* current = buffer.
get_ptr();
1638 "ThreadSafeContext::save_context:"
1639 " generating file meta\n");
1650 current = meta.
write(current, buffer_remaining);
1661 current = checkpoint_header.
write(current, buffer_remaining);
1664 "ThreadSafeContext::save_context:"
1665 " writing records\n");
1668 MADARA_GUARD_TYPE guard(
mutex_);
1670 for (KnowledgeMap::const_iterator i =
map_.begin(); i !=
map_.end(); ++i)
1672 if (i->second.exists())
1678 "ThreadSafeContext::save_context:"
1679 " we have %d prefixes to check against.\n",
1682 bool prefix_found =
false;
1683 for (
size_t j = 0; j < settings.
prefixes.size() && !prefix_found; ++j)
1686 "ThreadSafeContext::save_context:"
1687 " checking record %s against prefix %s.\n",
1688 i->first.c_str(), settings.
prefixes[j].c_str());
1693 "ThreadSafeContext::save_context:"
1694 " record has the correct prefix.\n");
1696 prefix_found =
true;
1703 "ThreadSafeContext::save_context:"
1704 " record has the wrong prefix. Rejected.\n");
1710 auto pre_write = current;
1711 current = i->second.write(current, i->first, buffer_remaining);
1712 size_t encoded_size = current - pre_write;
1716 checkpoint_header.
size += encoded_size;
1721 current = checkpoint_header.
write(
1727 (
int)checkpoint_header.
size, (
int)max_buffer);
1732 "encode() returned -1 size. Cannot "
1733 "save a negative sized checkpoint.");
1741 meta.
size = (uint64_t)total;
1746 fseek(file, 0, SEEK_SET);
1749 "ThreadSafeContext::save_context:"
1750 " encoding with buffer filters: %d:%d bytes written to offset %d.\n",
1751 (
int)meta.
size, (
int)checkpoint_header.
size,
1762 "ThreadSafeContext::save_context:"
1763 " couldn't open context file: %s.\n",
1784 "ThreadSafeContext::save_as_karl:"
1785 " opening file %s\n",
1788 int64_t bytes_written(0);
1789 std::stringstream buffer;
1796 MADARA_GUARD_TYPE guard(
mutex_);
1798 for (KnowledgeMap::const_iterator i =
map_.begin(); i !=
map_.end(); ++i)
1800 if (i->second.exists())
1806 "ThreadSafeContext::save_as_karl:"
1807 " we have %d prefixes to check against.\n",
1810 bool prefix_found =
false;
1811 for (
size_t j = 0; j < settings.
prefixes.size() && !prefix_found; ++j)
1814 "ThreadSafeContext::save_as_karl:"
1815 " checking record %s against prefix %s.\n",
1816 i->first.c_str(), settings.
prefixes[j].c_str());
1821 "ThreadSafeContext::save_as_karl:"
1822 " the record has the correct prefix.\n");
1824 prefix_found =
true;
1831 "ThreadSafeContext::save_as_karl:"
1832 " the record does not have a correct prefix.\n");
1841 if (!i->second.is_binary_file_type())
1844 if (i->second.is_string_type())
1849 else if (i->second.type() ==
1857 buffer << i->second;
1858 if (i->second.is_string_type())
1863 else if (i->second.type() ==
1873 buffer <<
"#read_file ('";
1893 path, (
void*)&(*i->second.file_value_)[0], i->second.size());
1907 char* result_copy =
new char[settings.
buffer_size];
1908 memcpy(result_copy, result.c_str(), result.size() + 1);
1910 int size = settings.
encode(
1911 result_copy, (
int)result.size(), (
int)settings.
buffer_size);
1916 "ThreadSafeContext::save_as_karl: "
1917 "encode() returned -1. Incorrect filter.");
1920 file.write(result_copy, size);
1921 bytes_written = (int64_t)size;
1925 file.write(result.c_str(), result.size());
1926 bytes_written = (int64_t)result.size();
1934 "ThreadSafeContext::save_as_karl:"
1935 " couldn't open karl file: %s.\n",
1941 return bytes_written;
1956 "ThreadSafeContext::save_as_json:"
1957 " opening file %s\n",
1960 int64_t bytes_written(0);
1962 std::stringstream buffer;
1964 file.open(settings.
filename.c_str());
1969 MADARA_GUARD_TYPE guard(
mutex_);
1973 for (KnowledgeMap::const_iterator i =
map_.begin(); i !=
map_.end(); ++i)
1975 if (i->second.exists())
1981 "ThreadSafeContext::save_as_json:"
1982 " we have %d prefixes to check against.\n",
1985 bool prefix_found =
false;
1986 for (
size_t j = 0; j < settings.
prefixes.size() && !prefix_found; ++j)
1989 "ThreadSafeContext::save_as_json:"
1990 " checking record %s against prefix %s.\n",
1991 i->first.c_str(), settings.
prefixes[j].c_str());
1996 "ThreadSafeContext::save_as_json:"
1997 " the record has the correct prefix.\n");
1999 prefix_found =
true;
2006 "ThreadSafeContext::save_as_json:"
2007 " the record does not have a correct prefix.\n");
2017 if (!i->second.is_binary_file_type())
2020 if (i->second.is_string_type())
2025 else if (i->second.type() ==
2033 buffer << i->second;
2034 if (i->second.is_string_type())
2039 else if (i->second.type() ==
2049 buffer <<
"#read_file ('";
2069 path, (
void*)&(*i->second.file_value_)[0], i->second.size());
2075 KnowledgeMap::const_iterator j(i);
2077 if (++j !=
map_.end())
2087 bytes_written = (int64_t)result.size();
2094 "ThreadSafeContext::save_as_json:"
2095 " couldn't open json file: %s.\n",
2101 return bytes_written;
2108 checkpoint_settings.
filename = filename;
2118 "ThreadSafeContext::load_context:"
2119 " opening file %s for just header info\n",
2123 FILE* file = fopen(filename.c_str(),
"rb");
2125 int64_t total_read(0);
2129 int64_t max_buffer(102800);
2130 int64_t buffer_remaining(max_buffer);
2133 const char* current = buffer.
get_ptr();
2136 "ThreadSafeContext::load_context:"
2137 " reading file meta data\n");
2139 total_read = fread(buffer.
get_ptr(), 1, max_buffer, file);
2140 buffer_remaining = (int64_t)total_read;
2147 current = meta.
read(current, buffer_remaining);
2153 "ThreadSafeContext::load_context:"
2154 " invalid file or wrong version. No contextual change.\n");
2162 "ThreadSafeContext::load_context:"
2163 " could not open file %s for reading. "
2164 "Check that file exists and that permissions are appropriate.\n",
2169 checkpoint_settings.
filename = filename;
2179 "ThreadSafeContext::evaluate_file:"
2180 " opening file %s\n",
2181 checkpoint_settings.
filename.c_str());
2183 #ifndef _MADARA_NO_KARL_
2187 return evaluate(expression, update_settings);
2198 "ThreadSafeContext::file_to_string:"
2199 " opening file %s\n",
2200 checkpoint_settings.
filename.c_str());
2202 FILE* file = fopen(checkpoint_settings.
filename.c_str(),
"rb");
2204 int64_t total_read(0);
2214 int64_t max_buffer(checkpoint_settings.
buffer_size);
2218 total_read = fread(buffer.
get_ptr(), 1, max_buffer, file);
2221 "ThreadSafeContext::file_to_string:"
2222 " reading file: %d bytes read. Decoding...\n",
2226 int size = checkpoint_settings.
decode(
2227 buffer.
get_ptr(), (
int)total_read, (
int)max_buffer);
2232 "ThreadSafeContext::file_to_string: "
2233 "decode () returned a negative encoding size. Bad filter/encode.");
2237 "ThreadSafeContext::file_to_string:"
2238 " decoded %d bytes. Converting to string.\n",
2244 "ThreadSafeContext::file_to_string:"
2245 " reading file: %d bytes read.\n",
2249 "ThreadSafeContext::file_to_string:"
2250 " file_contents: %s.\n",
2280 auto cur = reader.
next();
2281 if (cur.first.empty())
2286 cur.second.clock =
clock_;
2296 char* current = buffer.
get_ptr();
2297 const char* meta_reader = current;
2301 file.seekg(0, file.beg);
2307 "ThreadSafeContext::save_checkpoint:"
2308 " failed to read existing file header: size=%d\n",
2312 "ThreadSafeContext::save_checkpoint:"
2313 "Checkpoint file appears to have been corrupted. Bad header.");
2316 meta_reader = meta.
read(meta_reader, buffer_remaining);
2319 "ThreadSafeContext::save_checkpoint:"
2320 " init file meta: size=%d, states=%d\n",
2326 "ThreadSafeContext::save_checkpoint:"
2327 " setting file meta id to %s\n",
2342 "ThreadSafeContext::save_checkpoint:"
2343 " meta.size=%d, chkpt.header.size=%d \n",
2344 (
int)meta.
size, (
int)checkpoint_header.
size);
2358 checkpoint_header.
clock = clock_;
2361 return checkpoint_start;
2369 char* current = buffer.
get_ptr();
2372 "ThreadSafeContext::save_checkpoint:"
2373 " creating file meta. file.meta.size=%d, state.size=%d\n",
2385 current = meta.
write(current, buffer_remaining);
2393 checkpoint_header.
clock = clock_;
2396 current = checkpoint_header.
write(current, buffer_remaining);
2413 "ThreadSafeContext::save_checkpoint:"
2414 " we have %d prefixes to check against.\n",
2417 bool prefix_found =
false;
2418 for (
size_t j = 0; j < settings.
prefixes.size() && !prefix_found; ++j)
2421 "ThreadSafeContext::save_checkpoint:"
2422 " checking record %s against prefix %s.\n",
2423 name.c_str(), settings.
prefixes[j].c_str());
2427 prefix_found =
true;
2434 "ThreadSafeContext::save_checkpoint:"
2435 " record has the wrong prefix. Rejected.\n");
2445 "ThreadSafeContext::save_checkpoint:"
2446 " estimated encoded size of update=%d bytes\n",
2449 if (encoded_size > buffer_remaining)
2452 "ThreadSafeContext::save_checkpoint: "
2453 "%d buffer size is not big enough. Partial encoded size "
2454 "already hit %d bytes. CheckpointSettings.buffer_size is "
2455 "needs to be larger.");
2458 auto pre_write = current;
2459 current = record->
write(current, name, buffer_remaining);
2460 encoded_size = current - pre_write;
2463 checkpoint_header.
size += (uint64_t)encoded_size;
2466 "ThreadSafeContext::save_checkpoint:"
2467 " chkpt.header.size=%d, current->buffer delta=%d\n",
2468 (
int)checkpoint_header.
size, (
int)(current - buffer.
get_ptr()));
2474 class ContextLocalModifiedsLister :
public VariablesLister
2477 ContextLocalModifiedsLister(
const ThreadSafeContext& context)
2482 void start(
const CheckpointSettings& settings)
override
2488 if (settings.reset_checkpoint)
2494 std::pair<const char*, const KnowledgeRecord*> next()
override
2496 std::pair<const char*, const KnowledgeRecord*> ret{
nullptr,
nullptr};
2508 ret.first =
iter_->first;
2509 ret.second =
iter_->second.get_record_unsafe();
2530 ContextLocalModifiedsLister default_lister(context);
2536 lister = &default_lister;
2539 lister->
start(settings);
2542 auto e = lister->
next();
2543 if (e.second ==
nullptr)
2548 auto record = e.second;
2551 checkpoint_header, current, buffer, buffer_remaining);
2560 int64_t total_written(0);
2563 int64_t buffer_remaining(max_buffer);
2567 settings, file, meta, checkpoint_header, buffer_remaining, buffer);
2575 "ThreadSafeContext::save_checkpoint:"
2576 " fseek set to %d\n",
2577 (
int)(checkpoint_start));
2581 file.seekp(checkpoint_start);
2584 char* current = checkpoint_header.
write(buffer.
get_ptr(), buffer_remaining);
2587 "ThreadSafeContext::save_checkpoint:"
2588 " chkpt.header.size=%d, current->buffer delta=%d\n",
2590 (
int)(current - buffer.
get_ptr()));
2593 current, buffer, buffer_remaining);
2598 "ThreadSafeContext::save_checkpoint:"
2599 " writing final data to checkpoint for state #%d\n",
2603 "ThreadSafeContext::save_checkpoint:"
2604 " chkpt.header: size=%d, updates=%d\n",
2605 (
int)checkpoint_header.
size, (
int)checkpoint_header.
updates);
2607 int total_encoded = 0;
2609 if (buffer_remaining != max_buffer)
2611 total_written = (int64_t)(current - buffer.
get_ptr());
2614 "ThreadSafeContext::save_checkpoint:"
2615 " encoding %d bytes in checkpoint\n",
2616 (
int)total_written);
2618 current = checkpoint_header.
write(buffer.
get_ptr(), buffer_remaining);
2621 total_encoded = settings.
encode(
2622 buffer.
get_ptr(), (
int)total_written, (
int)max_buffer);
2624 if (total_encoded < 0)
2627 "ThreadSafeContext::save_context: "
2628 "encode () returned a negative encoding size. Bad filter/encode.");
2632 "ThreadSafeContext::save_checkpoint:"
2633 " encoded %d bytes in buffer. Writing to disk at offset %d bytes\n",
2634 (
int)total_encoded, (
int)checkpoint_start);
2641 file.write(buffer.
get_ptr(), total_encoded);
2644 meta.
size += (uint64_t)total_encoded;
2647 "ThreadSafeContext::save_checkpoint:"
2648 " meta.size updated to %d bytes\n",
2649 total_encoded, (
int)meta.
size);
2652 buffer_remaining = max_buffer;
2654 file.seekp(0, file.beg);
2657 "ThreadSafeContext::save_checkpoint:"
2658 " new file meta: size=%d, states=%d, "
2659 " lastchkpt.start=%d, lastchkpt.size=%d, encoded=%d\n",
2660 (
int)meta.
size, (
int)meta.
states, (
int)checkpoint_start,
2661 (
int)checkpoint_header.
size, (
int)total_encoded);
2667 "ThreadSafeContext::save_checkpoint:"
2668 " updating file meta data at beginning in the file\n");
2669 buffer_remaining = max_buffer;
2670 current = meta.
write(buffer.
get_ptr(), buffer_remaining);
2689 int64_t buffer_remaining(max_buffer);
2693 checkpoint_header, buffer_remaining, buffer);
2696 "ThreadSafeContext::save_checkpoint:"
2697 " writing diff records\n");
2700 current, buffer, buffer_remaining);
2702 char* final_position = current;
2703 int full_buffer = final_position - buffer.
get_ptr();
2706 "ThreadSafeContext::save_checkpoint:"
2707 " final_position indicates a buffer of %d bytes,"
2708 " encode buffer is %d bytes\n",
2709 full_buffer, full_buffer - file_header_size);
2713 checkpoint_header.
write(buffer.
get_ptr() + file_header_size, max_buffer);
2718 (
int)(full_buffer - file_header_size), (
int)max_buffer);
2723 "ThreadSafeContext::save_checkpoint: "
2724 "encode () returned a negative encoding size. Bad filter/encode.");
2727 meta.
size = (uint64_t)total;
2733 file.seekp(0, file.beg);
2736 "ThreadSafeContext::save_checkpoint:"
2737 " encoding with buffer filters: meta.size=%d, chkpt.size=%d, "
2738 "encoded=%d, chkpt.offset=%d.\n",
2739 (
int)meta.
size, (
int)checkpoint_header.
size, (
int)total,
2744 file.write(buffer.
get(), (
size_t)total + (
size_t)file_header_size);
2747 "ThreadSafeContext::save_checkpoint:"
2748 " wrote: %d bytes to file from beginning.\n",
2759 "ThreadSafeContext::save_checkpoint:"
2760 " opening file %s\n",
2774 *
this,
logger_,
clock_, settings, file, meta, checkpoint_header);
2779 "ThreadSafeContext::save_checkpoint:"
2780 " checkpoint doesn't exist. Creating.\n");
2794 "ThreadSafeContext::save_checkpoint:"
2795 " couldn't create checkpoint file: %s.\n",
2802 *
this,
logger_,
clock_, settings, file, meta, checkpoint_header);
2805 return checkpoint_header.
size;
#define madara_logger_checked_ptr_log(loggering, level,...)
Fast version of the madara::logger::log method for Logger pointers.
VariableReferenceMap map_
std::unique_ptr< ContextGuard > guard_
const ThreadSafeContext * context_
VariableReferenceMap::iterator iter_
madara::knowledge::KnowledgeRecord KnowledgeRecord
An exception for attempting to access an invalid context1.
An exception for general memory errors like out-of-memory.
An abstract base class defines a simple abstract implementation of an expression tree node.
virtual madara::knowledge::KnowledgeRecord evaluate(const madara::knowledge::KnowledgeUpdateSettings &settings)=0
Evaluates the expression tree.
madara::knowledge::KnowledgeRecord evaluate(const madara::knowledge::KnowledgeUpdateSettings &settings=knowledge::KnowledgeUpdateSettings())
Evaluates the expression tree.
ExpressionTree interpret(madara::knowledge::ThreadSafeContext &context, const std::string &input)
Compiles an expression into an expression tree.
Class for iterating binary checkpoint files.
void start()
Begin by reading any header information.
std::pair< std::string, KnowledgeRecord > next()
Get the next update from the checkpoint file.
bool is_open() const
Check if underlying file is open.
int64_t get_total_read() const
Get total number of bytes read so far during iteration.
Holds settings for checkpoints to load or save.
std::string originator
the originator id of the checkpoint
bool override_timestamp
use the timestamps in this class instead of current wallclock time when writing context or checkpoint...
size_t buffer_size
the size of the buffer needed for the checkpoint
std::vector< std::string > prefixes
A list of prefixes to save/load.
bool clear_knowledge
If true, during loads, clear the KnowledgeBase first.
uint64_t initial_lamport_clock
initial lamport clock saved in the checkpoint
VariablesLister * variables_lister
Object which will be used to extract variables for checkpoint saving.
int decode(char *source, int size, int max_size) const
Calls decode on the the buffer filter chain.
uint64_t last_timestamp
final wallclock time saved in the checkpoint
int encode(char *source, int size, int max_size) const
Calls encode on the the buffer filter chain.
std::string filename
path to files
uint64_t initial_timestamp
initial wallclock time saved in the checkpoint
filters::BufferFilters buffer_filters
buffer filters.
bool override_lamport
use the lamport clocks in this class instead of KB clock when writing context or checkpoints
Compiled, optimized KaRL logic.
std::string logic
the logic that was compiled
madara::expression::ExpressionTree expression
the expression tree
This class stores a function definition.
This class encapsulates an entry in a KnowledgeBase.
void set_xml(const char *new_value, size_t size)
sets the value to an xml string
void set_full(const KnowledgeRecord &new_value)
Sets the value and meta data from another KnowledgeRecord.
char * write(char *buffer, int64_t &buffer_remaining) const
Writes a KnowledgeRecord instance to a buffer and updates the amount of buffer room remaining.
uint64_t clock
last modification lamport clock time
void set_value(const KnowledgeRecord &new_value)
Sets the value from another KnowledgeRecord, does not copy toi, clock, and write_quality.
void set_toi(uint64_t new_toi)
void set_jpeg(const unsigned char *new_value, size_t size)
sets the value to a jpeg
bool exists(void) const
Checks if record exists (i.e., is not uncreated)
void set_text(const char *new_value, size_t size)
sets the value to a plaintext string
int read_file(const std::string &filename, uint32_t read_as_type=0)
reads a file and sets the type appropriately according to the extension
uint32_t quality
priority of the update
int64_t get_encoded_size(const std::string &key) const
Returns the encoded size of the record.
void set_file(const unsigned char *new_value, size_t size)
sets the value to an unknown file type
Settings for applying knowledge updates.
bool expand_variables
Toggle for always attempting to expand variables (true) or never expanding variables (false)
Settings for applying knowledge updates.
bool always_overwrite
Toggle for always overwriting records, regardless of quality, clock values, etc.
This class stores variables and their values for use by any entity needing state information in a thr...
CompiledExpression compile(const std::string &expression)
Compiles a KaRL expression into an expression tree.
~ThreadSafeContext(void)
Destructor.
int64_t save_as_json(const std::string &filename) const
Saves the context to a file as JSON.
uint32_t get_quality(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Atomically gets quality of a variable.
std::string file_to_string(CheckpointSettings &checkpoint_settings)
Loads and returns a karl script from a file with encode/decode.
bool clear(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Clears a variable.
void to_string(std::string &target, const std::string &array_delimiter=",", const std::string &record_delimiter=";\n", const std::string &key_val_delimiter="=") const
Saves all keys and values into a string, using the underlying knowledge::KnowledgeRecord::to_string f...
int read_file(const std::string &key, const std::string &filename, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically reads a file into a variable.
ThreadSafeContext()
Constructor.
madara::knowledge::KnowledgeRecord get(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings()) const
Atomically returns the current value of a variable.
madara::expression::Interpreter * interpreter_
KaRL interpreter.
KnowledgeRecord evaluate_file(CheckpointSettings &checkpoint_settings, const KnowledgeUpdateSettings &update_settings=KnowledgeUpdateSettings(true, true, true, false))
Loads and evaluates a karl script from a file.
logger::Logger * logger_
Logger for printing.
void delete_prefix(const std::string &prefix, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Deletes keys starting with the given prefix.
void define_function(const std::string &name, knowledge::KnowledgeRecord(*func)(FunctionArguments &, Variables &), const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Defines an external function.
void mark_modified(const VariableReference &variable, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Marks the variable reference as updated for the purposes of sending or checkpointing knowledge (for g...
const VariableReferenceMap & get_local_modified(void) const
Retrieves a list of modified local variables.
MADARA_CONDITION_TYPE changed_
int update_record_from_external(const std::string &key, const knowledge::KnowledgeRecord &rhs, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings(true))
Atomically sets if the variable value meets update conditions.
knowledge::KnowledgeRecord evaluate(CompiledExpression expression, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Evaluate a compiled expression.
int set_text(const std::string &key, const char *value, size_t size, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets the value of a variable to an XML string.
void get_matches(const std::string &prefix, const std::string &suffix, VariableReferences &matches)
Creates an iteration of VariableReferences to all keys matching the prefix and suffix.
FunctionMap functions_
map of function names to functions
madara::knowledge::KnowledgeMap map_
Hash table containing variable names and values.
int set_xml(const std::string &key, const char *value, size_t size, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets the value of a variable to an XML string.
knowledge::KnowledgeMap to_map_stripped(const std::string &prefix) const
Creates a map with Knowledge Records that begin with the given prefix.
void set_changed(void)
Force a change to be registered, waking up anyone waiting on entry.
int set_jpeg(const std::string &key, const unsigned char *value, size_t size, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets the value of a variable to a JPEG image.
uint32_t get_write_quality(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Atomically gets write quality of this process for a variable.
int64_t save_checkpoint(const std::string &filename, const std::string &id="") const
Saves a checkpoint of a list of changes to a file.
VariableReference get_ref(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Atomically returns a reference to the variable.
int set_file(const std::string &key, const unsigned char *value, size_t size, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets the value of a variable to an arbitrary string.
int64_t save_context(const std::string &filename, const std::string &id="") const
Saves the context to a file.
void mark_and_signal(VariableReference ref, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
method for marking a record modified and signaling changes
KnowledgeRecord * get_record(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Retrieves a knowledge record from the key.
std::pair< KnowledgeMap::const_iterator, KnowledgeMap::const_iterator > get_prefix_range(const std::string &prefix) const
size_t to_map(const std::string &subject, std::map< std::string, knowledge::KnowledgeRecord > &target)
Fills a variable map with Knowledge Records that match an expression.
Function * retrieve_function(const std::string &name, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Retrieves an external function.
std::string expand_statement(const std::string &statement) const
Expands a string with variable expansion.
VariableReferenceMap changed_map_
int64_t load_context(const std::string &filename, std::string &id, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings(true, true, true, false))
Loads the context from a file.
size_t to_vector(const std::string &subject, unsigned int start, unsigned int end, std::vector< KnowledgeRecord > &target)
Fills a vector with Knowledge Records that begin with a common subject and have a finite range of int...
int set_if_unequal(const std::string &key, madara::knowledge::KnowledgeRecord::Integer value, uint32_t quality, uint64_t clock, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets if the variable value will be different.
VariableReferenceMap local_changed_map_
void copy(const ThreadSafeContext &source, const KnowledgeRequirements &reqs, const KnowledgeUpdateSettings &settings)
Copies variables and values from source to this context.
void print(unsigned int level) const
Atomically prints all variables and values in the context.
std::vector< std::string > expansion_splitters_
uint32_t set_quality(const std::string &key, uint32_t quality, bool force_update, const KnowledgeReferenceSettings &settings)
Atomically sets quality of this process for a variable.
void set_write_quality(const std::string &key, uint32_t quality, const KnowledgeReferenceSettings &settings)
Atomically sets write quality of this process for a variable.
int64_t save_as_karl(const std::string &filename) const
Saves the context to a file as karl assignments, rather than binary.
Optimized reference to a variable within the knowledge base.
bool is_valid(void) const
Checks to see if the variable reference has been initialized.
KnowledgeRecord * get_record_unsafe(void) const
Returns a pointer to the variable's KnowledgeRecord Do not use this pointer unless you've locked the ...
KnowledgeMap::value_type * pair_ptr
virtual std::pair< const char *, const KnowledgeRecord * > next()=0
virtual void start(const CheckpointSettings &settings)=0
Provides an interface for external functions into the MADARA KaRL variable settings.
A multi-threaded logger for logging to one or more destinations.
T * get_ptr(void)
get the underlying pointer
T * get(void)
get the underlying pointer
Provides functions and classes for the distributed knowledge base.
std::map< std::string, bool > CopySet
Typedef for set of copyable keys.
T get(const KnowledgeRecord &kr)
Get the value of a KnowlegeRecord.
static void checkpoint_do_incremental(const ThreadSafeContext &context, logger::Logger *logger_, uint64_t clock_, const CheckpointSettings &settings, std::fstream &file, FileHeader &meta, transport::MessageHeader &checkpoint_header)
std::map< const char *, VariableReference, utility::ComparisonLessThan > VariableReferenceMap
a map of variable references
static char * init_checkpoint_header(logger::Logger *logger_, uint64_t clock_, const CheckpointSettings &settings, FileHeader &meta, transport::MessageHeader &checkpoint_header, int64_t &buffer_remaining, utility::ScopedArray< char > &buffer)
static void checkpoint_write_records(const ThreadSafeContext &context, logger::Logger *logger_, const CheckpointSettings &settings, transport::MessageHeader &checkpoint_header, char *¤t, utility::ScopedArray< char > &buffer, int64_t &buffer_remaining)
static uint64_t update_checkpoint_header(logger::Logger *logger_, uint64_t clock_, const CheckpointSettings &settings, std::fstream &file, FileHeader &meta, transport::MessageHeader &checkpoint_header, int64_t &buffer_remaining, utility::ScopedArray< char > &buffer)
static void checkpoint_write_record(logger::Logger *logger_, const std::string &name, const KnowledgeRecord *record, const CheckpointSettings &settings, transport::MessageHeader &checkpoint_header, char *¤t, utility::ScopedArray< char > &buffer, int64_t &buffer_remaining)
static void checkpoint_do_initial(const ThreadSafeContext &context, logger::Logger *logger_, uint64_t clock_, const CheckpointSettings &settings, std::fstream &file, FileHeader &meta, transport::MessageHeader &checkpoint_header)
std::vector< KnowledgeRecord > FunctionArguments
::std::map< std::string, KnowledgeRecord > KnowledgeMap
std::vector< VariableReference > VariableReferences
a vector of variable references
Provides knowledge logging services to files and terminals.
MADARA_EXPORT utility::Refcounter< logger::Logger > global_logger
ssize_t write_file(const std::string &filename, void *buffer, size_t size)
Writes a file with provided contents.
MADARA_EXPORT bool ends_with(const std::string &input, const std::string &ending)
Check if input contains a pattern at the end.
int64_t get_time(void)
Returns a time of day in nanoseconds If simtime feature is enabled, this may be simulation time inste...
MADARA_EXPORT void strncpy_safe(char *dst, const char *src, size_t dst_size)
Performs a strncpy in a way that will compile without warnings.
MADARA_EXPORT std::string extract_path(const std::string &name)
Extracts the path of a filename.
MADARA_EXPORT bool begins_with(const std::string &input, const std::string &prefix)
Check if input contains prefix at the beginning.
Copyright(c) 2020 Galois.
Holds settings requirements for knowledge, usually in copying.
bool clear_knowledge
If true, during loads, clear the KnowledgeBase first.
std::vector< MatchPredicate > predicates
A vector of acceptable predicates to match (prefix and suffix).