25 #ifdef MADARA_CONDITION_MUTEX_CONSTRUCTOR
29 #ifndef _MADARA_NO_KARL_
31 interpreter_ (new
madara::expression::Interpreter ())
42 #ifndef _MADARA_NO_KARL_ 44 #endif // _MADARA_NO_KARL_ 60 MADARA_GUARD_TYPE guard (
mutex_);
65 key_ptr = &key_actual;
75 return &
map_[*key_ptr];
86 MADARA_GUARD_TYPE guard (
mutex_);
92 key_ptr = &key_actual;
101 auto iter =
map_.lower_bound(*key_ptr);
102 if (iter ==
map_.end() || iter->first != *key_ptr) {
103 iter =
map_.emplace_hint(iter, std::piecewise_construct,
104 std::forward_as_tuple(*key_ptr), std::forward_as_tuple());
118 MADARA_GUARD_TYPE guard (
mutex_);
126 key_ptr = &key_actual;
131 if (*key_ptr ==
"") {
135 KnowledgeMap::const_iterator found =
map_.find (*key_ptr);
143 const char * value,
size_t size,
147 MADARA_GUARD_TYPE guard (
mutex_);
154 record->write_quality < record->quality)
158 record->quality = record->write_quality;
172 const char * value,
size_t size,
175 MADARA_GUARD_TYPE guard (
mutex_);
182 record->write_quality < record->quality)
186 record->quality = record->write_quality;
200 const unsigned char * value,
size_t size,
203 MADARA_GUARD_TYPE guard (
mutex_);
210 record->write_quality < record->quality)
214 record->quality = record->write_quality;
228 const unsigned char * value,
size_t size,
231 MADARA_GUARD_TYPE guard (
mutex_);
238 record->write_quality < record->quality)
242 record->quality = record->write_quality;
259 int return_value = 0;
260 MADARA_GUARD_TYPE guard (
mutex_);
267 record->write_quality < record->quality)
270 return_value = record->
read_file (filename);
271 record->quality = record->write_quality;
276 return return_value = -1;
291 MADARA_GUARD_TYPE guard (
mutex_);
296 key_ptr = &key_actual;
302 KnowledgeMap::iterator found =
map_.find (*key_ptr);
307 if (found !=
map_.end ())
308 return map_[*key_ptr].quality;
324 MADARA_GUARD_TYPE guard (
mutex_);
329 key_ptr = &key_actual;
335 KnowledgeMap::iterator found =
map_.find (*key_ptr);
340 if (found !=
map_.end ())
341 return map_[*key_ptr].write_quality;
358 MADARA_GUARD_TYPE guard (
mutex_);
363 key_ptr = &key_actual;
373 KnowledgeMap::iterator found =
map_.find (*key_ptr);
378 if (found ==
map_.end () || force_update || quality > found->second.quality)
379 map_[*key_ptr].quality = quality;
382 return map_[*key_ptr].quality;
394 MADARA_GUARD_TYPE guard (
mutex_);
399 key_ptr = &key_actual;
406 map_[*key_ptr].write_quality = quality;
416 uint32_t quality, uint64_t clock,
424 MADARA_GUARD_TYPE guard (
mutex_);
429 key_ptr = &key_actual;
439 KnowledgeMap::iterator found =
map_.find (*key_ptr);
450 if (quality < found->second.quality)
455 else if (quality == found->second.quality &&
456 clock < found->second.clock)
460 else if (found->second == rhs)
463 auto ret =
map_.emplace(std::piecewise_construct,
464 std::forward_as_tuple(*key_ptr),
472 if (result != -2 && record.
quality != quality)
476 if (clock > record.
clock)
477 record.
clock = clock;
502 uint32_t quality, uint64_t clock,
510 MADARA_GUARD_TYPE guard (
mutex_);
515 key_ptr = &key_actual;
525 KnowledgeMap::iterator found =
map_.find (*key_ptr);
536 if (quality < found->second.quality)
541 else if (quality == found->second.quality &&
542 clock < found->second.clock)
546 else if (found->second == rhs)
549 auto ret =
map_.emplace(std::piecewise_construct,
550 std::forward_as_tuple(*key_ptr),
558 if (result != -2 && record.
quality != quality)
562 if (clock > record.
clock)
563 record.
clock = clock;
588 uint32_t quality, uint64_t clock,
596 MADARA_GUARD_TYPE guard (
mutex_);
601 key_ptr = &key_actual;
611 KnowledgeMap::iterator found =
map_.find (*key_ptr);
622 if (quality < found->second.quality)
627 else if (quality == found->second.quality &&
628 clock < found->second.clock)
632 else if (found->second == rhs)
635 auto ret =
map_.emplace(std::piecewise_construct,
636 std::forward_as_tuple(*key_ptr),
644 if (result != -2 && record.
quality != quality)
648 if (clock > record.
clock)
649 record.
clock = clock;
682 MADARA_GUARD_TYPE guard (
mutex_);
687 key_ptr = &key_actual;
697 KnowledgeMap::iterator found =
map_.find (*key_ptr);
704 if (rhs.
quality < found->second.quality)
709 else if (rhs.
quality == found->second.quality &&
710 rhs.
clock < found->second.clock)
714 found->second.set_value (rhs);
721 auto ret =
map_.emplace(std::piecewise_construct,
722 std::forward_as_tuple(*key_ptr),
733 if (rhs.
clock >= this->clock_)
755 MADARA_GUARD_TYPE guard (
mutex_);
763 if (rhs.
quality < record->quality)
768 else if (rhs.
quality == record->quality &&
769 rhs.
clock < record->clock)
779 if (rhs.
clock >= this->clock_)
792 changed_.MADARA_CONDITION_NOTIFY_ONE ();
798 unsigned int level)
const 800 MADARA_GUARD_TYPE guard (
mutex_);
801 for (KnowledgeMap::const_iterator i =
map_.begin ();
805 if (i->second.exists ())
808 i->first.c_str (), i->second.to_string (
", ").c_str ());
821 MADARA_GUARD_TYPE guard (
mutex_);
822 std::stringstream buffer;
826 for (KnowledgeMap::const_iterator i =
map_.begin ();
833 buffer << record_delimiter;
839 buffer << key_val_delimiter;
841 if (i->second.is_string_type ())
845 else if (i->second.type () == i->second.DOUBLE_ARRAY ||
846 i->second.type () == i->second.INTEGER_ARRAY)
852 buffer << i->second.to_string (array_delimiter);
854 if (i->second.is_string_type ())
858 else if (i->second.type () == i->second.DOUBLE_ARRAY ||
859 i->second.type () == i->second.INTEGER_ARRAY)
868 target = buffer.str ();
879 MADARA_GUARD_TYPE guard (
mutex_);
883 size_t begin_exp = 0;
885 std::stringstream builder;
888 for (std::string::size_type i = 0; i < statement.size (); ++i)
891 if (statement[i] ==
'{')
898 else if (statement[i] ==
'}')
903 statement.substr (begin_exp + 1, i - begin_exp - 1);
905 builder << this->
get (results);
914 builder << statement[i];
922 "KARL COMPILE ERROR : Improperly matched braces in %s\n",
926 return builder.str ();
929 #ifndef _MADARA_NO_KARL_ 941 MADARA_GUARD_TYPE guard (
mutex_);
946 key_ptr = &key_actual;
967 MADARA_GUARD_TYPE guard (
mutex_);
972 key_ptr = &key_actual;
993 MADARA_GUARD_TYPE guard (
mutex_);
998 key_ptr = &key_actual;
1011 #ifdef _MADARA_PYTHON_CALLBACKS_ 1014 boost::python::object callable,
1020 MADARA_GUARD_TYPE guard (
mutex_);
1025 key_ptr = &key_actual;
1056 MADARA_GUARD_TYPE guard (
mutex_);
1061 key_ptr = &key_actual;
1082 MADARA_GUARD_TYPE guard (
mutex_);
1087 key_ptr = &key_actual;
1105 "ThreadSafeContext::compile:" \
1106 " compiling %s\n", expression.c_str ());
1108 MADARA_GUARD_TYPE guard (
mutex_);
1110 ce.
logic = expression;
1121 MADARA_GUARD_TYPE guard (
mutex_);
1130 MADARA_GUARD_TYPE guard (
mutex_);
1137 #endif // _MADARA_NO_KARL_ 1144 std::vector <KnowledgeRecord> & target)
1149 MADARA_GUARD_TYPE guard (
mutex_);
1153 target.resize (end - start + 1);
1155 for (
unsigned int i = 0; start <= end; ++start, ++i)
1157 std::stringstream buffer;
1160 target[i] =
get (buffer.str ());
1164 return target.size ();
1171 std::map <std::string, knowledge::KnowledgeRecord> & target)
1176 bool matches_found (
false);
1179 if (subject[subject.size () - 1] ==
'*')
1180 subject.resize (subject.size () - 1);
1183 std::string::size_type subject_size = subject.size ();
1184 const char * subject_ptr = subject.c_str ();
1187 MADARA_GUARD_TYPE guard (
mutex_);
1190 if (expression.size () == 0)
1194 for (KnowledgeMap::iterator i =
map_.begin ();
1195 i !=
map_.end (); ++i)
1197 if (i->first.size () >= subject_size)
1199 int result = strncmp (i->first.c_str (), subject_ptr, subject_size);
1203 target[i->first] = i->second;
1204 matches_found =
true;
1206 else if (matches_found)
1216 return target.size ();
1227 KnowledgeMap::iterator i =
map_.lower_bound (prefix);
1230 if (i !=
map_.end ())
1233 KnowledgeMap::iterator first_match = i;
1234 KnowledgeMap::iterator after_matches =
map_.end ();
1235 VariableReferences::iterator match;
1237 size_t num_matches = 0;
1239 size_t prefix_length = prefix.length ();
1242 while (i !=
map_.end () &&
1243 i->first.compare (0, prefix_length, prefix) == 0)
1256 matches.resize (num_matches);
1262 match = matches.begin ();
1265 while (i != after_matches)
1269 match->assign (&*i);
1287 std::vector <std::string> & next_keys,
1288 std::map <std::string, knowledge::KnowledgeRecord> & result,
1296 bool matches_found (
false);
1300 MADARA_GUARD_TYPE guard (
mutex_);
1302 KnowledgeMap::iterator i =
map_.begin ();
1306 i =
map_.lower_bound (prefix);
1309 for (; i !=
map_.end (); ++i)
1326 matches_found =
true;
1338 result[i->first] = i->second;
1342 size_t prefix_end = prefix.length () + delimiter.length ();
1344 std::string current_delimiter = i->first.substr (prefix.length (), delimiter.length ());
1346 if (current_delimiter == delimiter && i->first.length () > prefix_end)
1349 size_t key_end = i->first.find (delimiter, prefix_end);
1353 i->first.substr (prefix_end, key_end - prefix_end));
1354 if (current_key != last_key)
1356 next_keys.push_back (current_key);
1357 last_key = current_key;
1365 return result.size ();
1374 MADARA_GUARD_TYPE guard (
mutex_);
1376 std::pair<KnowledgeMap::iterator, KnowledgeMap::iterator>
1379 map_.erase (iters.first, iters.second);
1383 std::pair<VariableReferenceMap::iterator, VariableReferenceMap::iterator>
1387 if (prefix.compare (0, prefix.size(), changed.first->first, prefix.size()) == 0)
1389 changed.second = changed.first;
1392 for (++changed.second;
1393 (prefix.compare (0, prefix.size(), changed.second->first, prefix.size()) == 0) &&
1395 ++changed.second) {}
1403 std::pair<VariableReferenceMap::iterator, VariableReferenceMap::iterator>
1409 if (prefix.compare (0, prefix.size(), local_changed.first->first, prefix.size()) == 0)
1411 local_changed.second = local_changed.first;
1414 for (++local_changed.second;
1415 (prefix.compare (0, prefix.size(), local_changed.second->first, prefix.size()) == 0) &&
1417 ++local_changed.second) {}
1424 std::pair<KnowledgeMap::iterator,
1425 KnowledgeMap::iterator>
1429 std::pair<KnowledgeMap::iterator, KnowledgeMap::iterator>
1433 if(prefix.size() > 0)
1435 ssize_t psz = prefix.size();
1438 ret.second = ret.first =
map_.lower_bound(prefix);
1441 while(ret.second !=
map_.end() &&
1442 ret.second->first.compare(0, psz, prefix) == 0)
1448 std::pair<KnowledgeMap::const_iterator,
1449 KnowledgeMap::const_iterator>
1453 std::pair<KnowledgeMap::const_iterator, KnowledgeMap::const_iterator>
1457 if(prefix.size() > 0)
1459 ssize_t psz = prefix.size ();
1462 ret.second = ret.first =
map_.lower_bound(prefix);
1465 while(ret.second !=
map_.end() &&
1466 ret.second->first.compare(0, psz, prefix) == 0)
1477 MADARA_GUARD_TYPE guard (
mutex_);
1479 std::pair<KnowledgeMap::const_iterator, KnowledgeMap::const_iterator>
1491 MADARA_GUARD_TYPE guard (
mutex_);
1493 std::pair<KnowledgeMap::const_iterator, KnowledgeMap::const_iterator>
1498 for(;iters.first != iters.second; ++iters.first)
1500 ret.emplace_hint(ret.end(), iters.first->first.substr(prefix.size()),
1501 iters.first->second);
1512 "ThreadSafeContext::copy:" \
1513 " copying a context\n");
1518 "ThreadSafeContext::copy:" \
1519 " clearing knowledge in target context\n");
1528 std::pair<KnowledgeMap::const_iterator, KnowledgeMap::const_iterator>
1531 if (predicate.suffix ==
"")
1534 "ThreadSafeContext::copy:" \
1535 " matching predicate.prefix=%s\n", predicate.prefix.c_str ());
1537 for(;iters.first != iters.second; ++iters.first)
1540 "ThreadSafeContext::copy:" \
1541 " looking for %s\n", iters.first->first.c_str ());
1543 auto where =
map_.lower_bound(iters.first->first);
1545 if (where ==
map_.end() || where->first != iters.first->first)
1548 "ThreadSafeContext::copy:" \
1549 " inserting %s\n", iters.first->first.c_str ());
1551 where =
map_.emplace_hint(where,
1552 iters.first->first, iters.first->second);
1557 "ThreadSafeContext::copy:" \
1558 " overwriting %s\n", iters.first->first.c_str ());
1560 where->second = iters.first->second;
1567 "ThreadSafeContext::copy:" \
1568 " matching predicate.suffix=%s\n", predicate.suffix.c_str ());
1570 for(;iters.first != iters.second; ++iters.first)
1576 "ThreadSafeContext::copy:" \
1577 " looking for %s\n", iters.first->first.c_str ());
1579 auto where =
map_.lower_bound(iters.first->first);
1581 if (where ==
map_.end() || where->first != iters.first->first)
1584 "ThreadSafeContext::copy:" \
1585 " inserting %s\n", iters.first->first.c_str ());
1587 where =
map_.emplace_hint(where,
1588 iters.first->first, iters.first->second);
1593 "ThreadSafeContext::copy:" \
1594 " overwriting %s\n", iters.first->first.c_str ());
1596 where->second = iters.first->second;
1607 std::pair<KnowledgeMap::const_iterator, KnowledgeMap::const_iterator>
1608 iters (source.
map_.begin (), source.
map_.end ());
1610 for(;iters.first != iters.second; ++iters.first)
1612 map_.insert (
map_.begin (), KnowledgeMap::value_type(
1613 iters.first->first, iters.first->second));
1629 if (copy_set.size () == 0)
1631 for (KnowledgeMap::const_iterator i = source.
map_.begin ();
1632 i != source.
map_.end (); ++i)
1634 map_[i->first] = (i->second);
1640 for (CopySet::const_iterator key = copy_set.begin ();
1641 key != copy_set.end (); ++key)
1644 KnowledgeMap::const_iterator i = source.
map_.find (key->first);
1647 if (i != source.
map_.end ())
1649 map_[i->first] = (i->second);
1672 "ThreadSafeContext::save_context:" \
1673 " opening file %s\n", settings.
filename.c_str ());
1676 FILE * file = fopen (settings.
filename.c_str (),
"wb");
1680 strncpy (meta.originator, settings.
originator.c_str (),
1681 sizeof (meta.originator) < settings.
originator.size () + 1 ?
1682 sizeof (meta.originator) : settings.
originator.size () + 1);
1689 int64_t buffer_remaining (max_buffer);
1692 "ThreadSafeContext::save_context:" \
1693 " allocating %d byte buffer\n",
1698 char * current = buffer.
get_ptr ();
1701 "ThreadSafeContext::save_context:" \
1702 " generating file meta\n");
1704 meta.size += checkpoint_header.encoded_size ();
1705 checkpoint_header.size = checkpoint_header.encoded_size ();
1713 current = meta.write (current, buffer_remaining);
1721 checkpoint_header.clock =
clock_;
1724 current = checkpoint_header.write (current, buffer_remaining);
1727 "ThreadSafeContext::save_context:" \
1728 " writing records\n");
1731 MADARA_GUARD_TYPE guard (
mutex_);
1733 for (KnowledgeMap::const_iterator i =
map_.begin ();
1734 i !=
map_.end (); ++i)
1736 if (i->second.exists ())
1742 "ThreadSafeContext::save_context:" \
1743 " we have %d prefixes to check against.\n",
1746 bool prefix_found =
false;
1748 j < settings.
prefixes.size () && !prefix_found; ++j)
1751 "ThreadSafeContext::save_context:" \
1752 " checking record %s against prefix %s.\n",
1760 "ThreadSafeContext::save_context:" \
1761 " record has the correct prefix.\n");
1763 prefix_found =
true;
1770 "ThreadSafeContext::save_context:" \
1771 " record has the wrong prefix. Rejected.\n");
1778 int64_t encoded_size = i->second.get_encoded_size (i->first);
1779 ++checkpoint_header.updates;
1780 meta.size += encoded_size;
1781 checkpoint_header.size += encoded_size;
1783 current = i->second.write (current, i->first, buffer_remaining);
1788 current = meta.write (buffer.
get_ptr (), max_buffer);
1789 current = checkpoint_header.write (current, max_buffer);
1792 int total = settings.
encode ((
unsigned char *)buffer.
get_ptr (),
1793 (int)meta.size, (
int)max_buffer);
1796 fseek (file, 0, SEEK_SET);
1799 "ThreadSafeContext::save_context:" \
1800 " encoding with buffer filters: %d:%d bytes written.\n",
1801 (
int)meta.size, (
int)checkpoint_header.size);
1803 fwrite (buffer.
get_ptr (), (size_t)total, 1, file);
1810 "ThreadSafeContext::save_context:" \
1811 " couldn't open context file: %s.\n",
1838 "ThreadSafeContext::save_as_karl:" \
1839 " opening file %s\n", settings.
filename.c_str ());
1841 int64_t bytes_written (0);
1842 std::stringstream buffer;
1844 file.open (settings.
filename.c_str ());
1846 if (file.is_open ())
1849 MADARA_GUARD_TYPE guard (
mutex_);
1851 for (KnowledgeMap::const_iterator i =
map_.begin ();
1852 i !=
map_.end (); ++i)
1854 if (i->second.exists ())
1860 "ThreadSafeContext::save_as_karl:" \
1861 " we have %d prefixes to check against.\n",
1864 bool prefix_found =
false;
1866 j < settings.
prefixes.size () && !prefix_found; ++j)
1869 "ThreadSafeContext::save_as_karl:" \
1870 " checking record %s against prefix %s.\n",
1878 "ThreadSafeContext::save_as_karl:" \
1879 " the record has the correct prefix.\n");
1881 prefix_found =
true;
1888 "ThreadSafeContext::save_as_karl:" \
1889 " the record does not have a correct prefix.\n");
1898 if (!i->second.is_binary_file_type ())
1901 if (i->second.is_string_type ())
1913 buffer << i->second;
1914 if (i->second.is_string_type ())
1928 buffer <<
"#read_file ('";
1948 (
void *)&(*i->second.file_value_)[0], i->second.size ());
1962 bytes_written = (int64_t) result.size ();
1969 "ThreadSafeContext::save_as_karl:" \
1970 " couldn't open karl file: %s.\n",
1976 return bytes_written;
1996 "ThreadSafeContext::save_as_json:" \
1997 " opening file %s\n", settings.
filename.c_str ());
1999 int64_t bytes_written (0);
2001 std::stringstream buffer;
2003 file.open (settings.
filename.c_str ());
2005 if (file.is_open ())
2008 MADARA_GUARD_TYPE guard (
mutex_);
2012 for (KnowledgeMap::const_iterator i =
map_.begin ();
2013 i !=
map_.end (); ++i)
2015 if (i->second.exists ())
2021 "ThreadSafeContext::save_as_json:" \
2022 " we have %d prefixes to check against.\n",
2025 bool prefix_found =
false;
2027 j < settings.
prefixes.size () && !prefix_found; ++j)
2030 "ThreadSafeContext::save_as_json:" \
2031 " checking record %s against prefix %s.\n",
2039 "ThreadSafeContext::save_as_json:" \
2040 " the record has the correct prefix.\n");
2042 prefix_found =
true;
2049 "ThreadSafeContext::save_as_json:" \
2050 " the record does not have a correct prefix.\n");
2060 if (!i->second.is_binary_file_type ())
2063 if (i->second.is_string_type ())
2075 buffer << i->second;
2076 if (i->second.is_string_type ())
2090 buffer <<
"#read_file ('";
2110 (
void *)&(*i->second.file_value_)[0], i->second.size ());
2117 KnowledgeMap::const_iterator j (i);
2119 if (++j !=
map_.end ())
2129 bytes_written = (int64_t) result.size ();
2136 "ThreadSafeContext::save_as_json:" \
2137 " couldn't open json file: %s.\n",
2143 return bytes_written;
2154 checkpoint_settings.
filename = filename;
2167 "ThreadSafeContext::load_context:" \
2168 " opening file %s for just header info\n", filename.c_str ());
2171 FILE * file = fopen (filename.c_str (),
"rb");
2173 int64_t total_read (0);
2177 int64_t max_buffer (102800);
2178 int64_t buffer_remaining (max_buffer);
2181 const char * current = buffer.
get_ptr ();
2184 "ThreadSafeContext::load_context:" \
2185 " reading file meta data\n");
2187 total_read = fread (buffer.
get_ptr (),
2188 1, max_buffer, file);
2189 buffer_remaining = (int64_t)total_read;
2196 current = meta.
read (current, buffer_remaining);
2202 "ThreadSafeContext::load_context:" \
2203 " invalid file. No contextual change.\n");
2211 "ThreadSafeContext::load_context:" \
2212 " could not open file %s for reading. " 2213 "Check that file exists and that permissions are appropriate.\n",
2218 checkpoint_settings.
filename = filename;
2230 "ThreadSafeContext::load_context:" \
2231 " opening file %s\n", checkpoint_settings.
filename.c_str ());
2233 FILE * file = fopen (checkpoint_settings.
filename.c_str (),
"rb");
2235 int64_t total_read (0);
2245 int64_t max_buffer (checkpoint_settings.
buffer_size);
2246 int64_t buffer_remaining (max_buffer);
2249 const char * current = buffer.
get_ptr ();
2251 total_read = fread (buffer.
get_ptr (),
2252 1, max_buffer, file);
2253 buffer_remaining = (int64_t)total_read;
2256 "ThreadSafeContext::load_context:" \
2257 " reading file: %d bytes read.\n",
2261 checkpoint_settings.
decode ((
unsigned char *)buffer.
get_ptr (),
2262 (int)(total_read), (int)max_buffer);
2269 current = meta.
read (current, buffer_remaining);
2279 "ThreadSafeContext::load_context:" \
2280 " read File meta. Meta.size=%d\n", (
int)meta.
size);
2289 for (uint64_t state = 0; state < meta.
states &&
2290 state <= checkpoint_settings.
last_state; ++state)
2292 if (buffer_remaining > (int64_t)
2297 current = checkpoint_header.
read (current, buffer_remaining);
2304 if (state == meta.
states - 1)
2309 uint64_t updates_size = checkpoint_header.
size -
2313 "ThreadSafeContext::load_context:" \
2314 " read Checkpoint header. header.size=%d, updates.size=%d\n",
2315 (
int)checkpoint_header.
size, (
int)updates_size);
2322 if (updates_size > (uint64_t)buffer_remaining)
2329 new char[updates_size];
2330 memcpy (new_buffer.
get_ptr (), current,
2331 (size_t)buffer_remaining);
2334 total_read += fread (new_buffer.
get_ptr () + buffer_remaining, 1,
2336 - (uint64_t)buffer_remaining
2340 max_buffer = updates_size;
2341 buffer_remaining = checkpoint_header.
size 2343 current = new_buffer.
get_ptr ();
2344 buffer = new_buffer;
2348 "ThreadSafeContext::load_context:" \
2349 " state=%d, initial_state=%d, last_state=%d\n",
2353 if (state <= checkpoint_settings.
last_state &&
2356 for (uint32_t update = 0;
2357 update < checkpoint_header.
updates; ++update)
2361 current = record.
read (current, key, buffer_remaining);
2364 "ThreadSafeContext::load_context:" \
2365 " read record (%d of %d): %s\n",
2366 (
int)update, (
int)checkpoint_header.
updates, key.c_str ());
2369 if (checkpoint_settings.
prefixes.size () > 0)
2371 bool prefix_found =
false;
2372 for (
size_t j = 0; j < checkpoint_settings.
prefixes.size ()
2373 && !prefix_found; ++j)
2376 "ThreadSafeContext::load_context:" \
2377 " checking record %s against prefix %s\n",
2378 key.c_str (), checkpoint_settings.
prefixes[j].c_str ());
2381 key, checkpoint_settings.
prefixes[j]))
2384 "ThreadSafeContext::load_context:" \
2385 " record has the correct prefix.\n");
2387 prefix_found =
true;
2394 "ThreadSafeContext::load_context:" \
2395 " record does not have the correct prefix. Rejected.\n");
2407 "ThreadSafeContext::load_context:" \
2408 " not a valid state, incrementing by %d bytes.\n",
2411 current += updates_size;
2415 if (buffer_remaining == 0 && (uint64_t)total_read < meta.
size)
2417 buffer_remaining = max_buffer;
2419 total_read += fread (buffer.
get_ptr (), 1, buffer_remaining, file);
2427 "ThreadSafeContext::load_context:" \
2428 " invalid file. No contextual change.\n");
2436 "ThreadSafeContext::load_context:" \
2437 " could not open file %s for reading. " 2438 "Check that file exists and that permissions are appropriate.\n",
2439 checkpoint_settings.
filename.c_str ());
2451 "ThreadSafeContext::save_checkpoint:" \
2452 " opening file %s\n", settings.
filename.c_str ());
2454 int64_t total_written (0);
2455 FILE * file = fopen (settings.
filename.c_str (),
"rb+");
2463 int64_t buffer_remaining (max_buffer);
2466 char * current = buffer.
get_ptr ();
2467 const char * meta_reader = current;
2470 fseek (file, 0, SEEK_SET);
2471 size_t ret = fread (current, meta.encoded_size (), 1, file);
2474 "ThreadSafeContext::save_checkpoint:" \
2475 " failed to read existing file header: size=%d\n",
2476 (
int)meta.encoded_size ());
2481 meta_reader = meta.read (meta_reader, buffer_remaining);
2484 "ThreadSafeContext::save_checkpoint:" \
2485 " init file meta: size=%d, states=%d\n",
2486 (
int)meta.size, (
int)meta.states);
2491 "ThreadSafeContext::save_checkpoint:" \
2492 " setting file meta id to %s\n",
2495 strncpy (meta.originator, settings.
originator.c_str (),
2496 sizeof (meta.originator) < settings.
originator.size () + 1 ?
2497 sizeof (meta.originator) : settings.
originator.size () + 1);
2501 uint64_t checkpoint_start = meta.size;
2506 "ThreadSafeContext::save_checkpoint:" \
2507 " meta.size=%d, chkpt.header.size=%d \n",
2508 (
int)meta.size, (
int)checkpoint_header.
size);
2527 if (local_records.size () != 0)
2532 "ThreadSafeContext::save_checkpoint:" \
2533 " fseek set to %d\n",
2534 (
int)(checkpoint_start));
2537 fseek (file, (
long)checkpoint_start, SEEK_SET);
2540 current = checkpoint_header.
write (buffer.
get_ptr (), buffer_remaining);
2543 "ThreadSafeContext::save_checkpoint:" \
2544 " chkpt.header.size=%d, current->buffer delta=%d\n",
2546 (int)(current - buffer.
get_ptr ()));
2550 MADARA_GUARD_TYPE guard (
mutex_);
2552 for (
const auto &e : local_records)
2554 auto record = e.second.get_record_unsafe();
2556 if (record->exists ())
2561 bool prefix_found =
false;
2563 j < settings.
prefixes.size () && !prefix_found; ++j)
2566 record->to_string (), settings.
prefixes[j]))
2568 prefix_found =
true;
2577 int64_t encoded_size = record->get_encoded_size (e.first);
2580 "ThreadSafeContext::save_checkpoint:" \
2581 " estimated encoded size of update=%d bytes\n",
2584 if (encoded_size > buffer_remaining)
2587 (
size_t)(max_buffer - buffer_remaining), 1, file);
2588 total_written += (int64_t)(max_buffer - buffer_remaining);
2589 buffer_remaining = max_buffer;
2592 "ThreadSafeContext::save_checkpoint:" \
2593 " encoded_size larger than remaining buffer. Flushing\n");
2595 if (encoded_size > max_buffer)
2601 buffer =
new char[encoded_size];
2602 max_buffer = encoded_size;
2603 buffer_remaining = max_buffer;
2607 "ThreadSafeContext::save_checkpoint:" \
2608 " encoded_size larger than entire buffer. Reallocating\n");
2612 current = record->write (current, e.first, buffer_remaining);
2614 checkpoint_header.
size += (uint64_t)encoded_size;
2618 "ThreadSafeContext::save_checkpoint:" \
2619 " chkpt.header.size=%d, current->buffer delta=%d\n",
2620 (
int)checkpoint_header.
size, (
int)(current - buffer.
get_ptr ()));
2629 "ThreadSafeContext::save_checkpoint:" \
2630 " resetting checkpoint. Next checkpoint starts fresh here\n");
2637 "ThreadSafeContext::save_checkpoint:" \
2638 " writing final data for state #%d\n",
2641 if (buffer_remaining != max_buffer)
2644 (size_t)(current - buffer.
get_ptr ()), 1, file);
2645 total_written += (size_t)(current - buffer.
get_ptr ());
2648 "ThreadSafeContext::save_checkpoint:" \
2649 " current->buffer=%d bytes, max->remaining=%d bytes\n",
2650 (
int)(current - buffer.
get_ptr ()), (
int)max_buffer - buffer_remaining);
2654 "ThreadSafeContext::save_checkpoint:" \
2655 " chkpt.header: size=%d, updates=%d\n",
2656 (
int)checkpoint_header.
size, (
int)checkpoint_header.
updates);
2658 buffer_remaining = max_buffer;
2659 fseek (file, (
long)checkpoint_start, SEEK_SET);
2660 current = checkpoint_header.
write (buffer.
get_ptr (), buffer_remaining);
2661 fwrite (buffer.
get_ptr (), current - buffer.
get_ptr (), 1, file);
2663 meta.size += checkpoint_header.
size;
2666 "ThreadSafeContext::save_checkpoint:" \
2667 " new file meta: size=%d, states=%d, lastchkpt.size=%d\n",
2668 (
int)meta.size, (
int)meta.states, (
int)checkpoint_header.
size);
2671 fseek (file, 0, SEEK_SET);
2674 "ThreadSafeContext::save_checkpoint:" \
2675 " updating file meta data in the file\n");
2677 buffer_remaining = max_buffer;
2678 current = meta.write (buffer.
get_ptr (), buffer_remaining);
2680 fwrite (buffer.
get_ptr (), current - buffer.
get_ptr (), 1, file);
2689 "ThreadSafeContext::save_checkpoint:" \
2690 " checkpoint doesn't exist. Creating.\n");
2693 file = fopen (settings.
filename.c_str (),
"wb");
2696 strncpy (meta.originator, settings.
originator.c_str (),
2697 sizeof (meta.originator) < settings.
originator.size () + 1 ?
2698 sizeof (meta.originator) : settings.
originator.size () + 1);
2703 int64_t buffer_remaining (max_buffer);
2706 char * current = buffer.
get_ptr ();
2709 "ThreadSafeContext::save_checkpoint:" \
2710 " creating file meta. file.meta.size=%d, state.size=%d\n",
2711 (
int)meta.size, (
int)checkpoint_header.
encoded_size ());
2722 current = meta.write (current, buffer_remaining);
2733 current = checkpoint_header.
write (current, buffer_remaining);
2736 "ThreadSafeContext::save_checkpoint:" \
2737 " writing diff records\n");
2740 MADARA_GUARD_TYPE guard (
mutex_);
2744 for (
const auto &e : local_records)
2746 auto record = e.second.get_record_unsafe();
2748 if (record->exists ())
2754 "ThreadSafeContext::save_checkpoint:" \
2755 " we have %d prefixes to check against.\n",
2758 bool prefix_found =
false;
2760 j < settings.
prefixes.size () && !prefix_found; ++j)
2763 "ThreadSafeContext::save_checkpoint:" \
2764 " checking record %s against prefix %s.\n",
2772 "ThreadSafeContext::save_checkpoint:" \
2773 " record has the correct prefix.\n");
2775 prefix_found =
true;
2782 "ThreadSafeContext::save_checkpoint:" \
2783 " record has the wrong prefix. Rejected.\n");
2790 int64_t encoded_size = record->get_encoded_size (e.first);
2793 "ThreadSafeContext::save_checkpoint:" \
2794 " estimated encoded size of update=%d bytes\n",
2798 meta.size += encoded_size;
2799 checkpoint_header.
size += encoded_size;
2801 current = record->write (current, e.first, buffer_remaining);
2804 "ThreadSafeContext::save_checkpoint:" \
2805 " current->buffer delta=%d bytes\n",
2806 (
int)(current - buffer.
get_ptr ()));
2812 current = meta.write (buffer.
get_ptr (), max_buffer);
2813 current = checkpoint_header.
write (current, max_buffer);
2816 int total = settings.
encode ((
unsigned char *)buffer.
get_ptr (),
2817 (int)meta.size, (
int)max_buffer);
2820 fseek (file, 0, SEEK_SET);
2823 "ThreadSafeContext::save_checkpoint:" \
2824 " file size: %d bytes written (file:%d, state.size:%d).\n",
2825 (
int)total, (
int)meta.size, (
int)checkpoint_header.
size);
2827 fwrite (buffer.
get_ptr (), (size_t)total, 1, file);
2840 "ThreadSafeContext::save_checkpoint:" \
2841 " couldn't create checkpoint file: %s.\n",
2848 return checkpoint_header.
size;
This class encapsulates an entry in a KnowledgeBase.
bool expand_variables
Toggle for always attempting to expand variables (true) or never expanding variables (false) ...
ExpressionTree interpret(madara::knowledge::ThreadSafeContext &context, const std::string &input)
Compiles an expression into an expression tree.
uint32_t get_quality(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Atomically gets quality of a variable.
std::map< std::string, bool > CopySet
Typedef for set of copyable keys.
uint64_t initial_lamport_clock
initial lamport clock saved in the checkpoint
std::vector< MatchPredicate > predicates
A vector of acceptable predicates to match (prefix and suffix).
std::string version
the MADARA version
int update_record_from_external(const std::string &key, const knowledge::KnowledgeRecord &rhs, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings(true))
Atomically sets if the variable value meets update conditions.
madara::knowledge::KnowledgeMap map_
Hash table containing variable names and values.
void get_matches(const std::string &prefix, const std::string &suffix, VariableReferences &matches)
Creates an iteration of VariableReferences to all keys matching the prefix and suffix.
size_t buffer_size
the size of the buffer needed for the checkpoint
std::vector< std::string > expansion_splitters_
size_t to_vector(const std::string &subject, unsigned int start, unsigned int end, std::vector< KnowledgeRecord > &target)
Fills a vector with Knowledge Records that begin with a common subject and have a finite range of int...
This class stores a function definition.
std::pair< KnowledgeMap::const_iterator, KnowledgeMap::const_iterator > get_prefix_range(const std::string &prefix) const
uint32_t quality
priority of the update
int64_t save_context(const std::string &filename, const std::string &id="") const
Saves the context to a file.
MADARA_CONDITION_TYPE changed_
size_t to_map(const std::string &subject, std::map< std::string, knowledge::KnowledgeRecord > &target)
Fills a variable map with Knowledge Records that match an expression.
int set_file(const std::string &key, const unsigned char *value, size_t size, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets the value of a variable to an arbitrary string.
madara::knowledge::KnowledgeRecord KnowledgeRecord
int set_jpeg(const std::string &key, const unsigned char *value, size_t size, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets the value of a variable to a JPEG image.
void mark_and_signal(VariableReference ref, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
method for marking a record modified and signaling changes
const char * read(const char *buffer, int64_t &buffer_remaining)
Reads a KnowledgeRecord instance from a buffer and updates the amount of buffer room remaining...
void copy(const ThreadSafeContext &source, const KnowledgeRequirements &settings)
Copies variables and values from source to this context.
void set_file(const unsigned char *new_value, size_t size)
sets the value to an unknown file type
KnowledgeRecord * get_record_unsafe(void) const
Returns a pointer to the variable's KnowledgeRecord Do not use this pointer unless you've locked the ...
uint64_t last_state
the last state number of interest (useful for loading ranges of checkpoint states.
int encode(unsigned char *source, int size, int max_size) const
Calls encode on the the buffer filter chain.
void set_text(const char *new_value, size_t size)
sets the value to a plaintext string
This class stores variables and their values for use by any entity needing state information in a thr...
int64_t load_context(const std::string &filename, std::string &id, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings(true, true, true, false))
Loads the context from a file.
virtual madara::knowledge::KnowledgeRecord evaluate(const madara::knowledge::KnowledgeUpdateSettings &settings)=0
Evaluates the expression tree.
void print(unsigned int level) const
Atomically prints all variables and values in the context.
int decode(unsigned char *source, int size, int max_size) const
Calls decode on the the buffer filter chain.
MADARA_EXPORT utility::Refcounter< logger::Logger > global_logger
void delete_prefix(const std::string &prefix, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Deletes keys starting with the given prefix.
std::string extract_path(const std::string &name)
Extracts the path of a filename.
Holds settings for checkpoints to load or save.
void set_xml(const char *new_value, size_t size)
sets the value to an xml string
std::string logic
the logic that was compiled
int64_t save_as_karl(const std::string &filename) const
Saves the context to a file as karl assignments, rather than binary.
Provides knowledge logging services to files and terminals.
Optimized reference to a variable within the knowledge base.
Compiled, optimized KaRL logic.
std::vector< KnowledgeRecord > FunctionArguments
bool is_valid(void) const
Checks to see if the variable reference has been initialized.
void define_function(const std::string &name, knowledge::KnowledgeRecord(*func)(FunctionArguments &, Variables &), const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Defines an external function.
bool clear_knowledge
If true, during loads, clear the KnowledgeBase first.
void set_value(const KnowledgeRecord &new_value)
Sets the value from another KnowledgeRecord, does not copy clock and write_quality.
std::string originator
the originator id of the checkpoint
uint32_t get_write_quality(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Atomically gets write quality of this process for a variable.
std::string filename
path to files
knowledge::KnowledgeRecord evaluate(CompiledExpression expression, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Evaluate a compiled expression.
madara::expression::ExpressionTree expression
the expression tree
void set_jpeg(const unsigned char *new_value, size_t size)
sets the value to a jpeg
uint64_t initial_state
the initial state number of interest (useful for loading ranges of checkpoint states).
VariableReferenceMap changed_map_
int64_t save_checkpoint(const std::string &filename, const std::string &id="") const
Saves a checkpoint of a list of changes to a file.
DeepIterator< Iterator > deep_iterate(const Iterator &i)
Returns an input iterator from an iterator.
int set_xml(const std::string &key, const char *value, size_t size, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets the value of a variable to an XML string.
uint64_t last_lamport_clock
final lamport clock saved in the checkpoint
std::vector< std::string > prefixes
A list of prefixes to save/load.
T * get_ptr(void)
get the underlying pointer
FunctionMap functions_
map of function names to functions
void to_string(std::string &target, const std::string &array_delimiter=",", const std::string &record_delimiter=";\n", const std::string &key_val_delimiter="=") const
Saves all keys and values into a string, using the underlying knowledge::KnowledgeRecord::to_string f...
bool override_timestamp
use the timestamps in this class instead of current wallclock time when writing context or checkpoint...
#define madara_logger_ptr_log(logger, level,...)
Fast version of the madara::logger::log method for Logger pointers.
std::string to_string_version(uint32_t version)
Converts a MADARA uint32_t version number to human-readable.
Holds settings requirements for knowledge, usually in copying.
::std::map< std::string, KnowledgeRecord > KnowledgeMap
CompiledExpression compile(const std::string &expression)
Compiles a KaRL expression into an expression tree.
const VariableReferenceMap & get_local_modified(void) const
Retrieves a list of modified local variables.
An abstract base class defines a simple abstract implementation of an expression tree node...
uint64_t initial_timestamp
initial wallclock time saved in the checkpoint
int set_if_unequal(const std::string &key, madara::knowledge::KnowledgeRecord::Integer value, uint32_t quality, uint64_t clock, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets if the variable value will be different.
VariableReference get_ref(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Atomically returns a reference to the variable.
bool clear_knowledge
If true, during loads, clear the KnowledgeBase first.
void set_changed(void)
Force a change to be registered, waking up anyone waiting on entry.
std::map< const char *, VariableReference, VariableReferenceMapCompare > VariableReferenceMap
a map of variable references
bool always_overwrite
Toggle for always overwriting records, regardless of quality, clock values, etc.
std::vector< VariableReference > VariableReferences
a vector of variable references
int set_text(const std::string &key, const char *value, size_t size, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets the value of a variable to an XML string.
std::string expand_statement(const std::string &statement) const
Expands a string with variable expansion.
KnowledgeMap::value_type * pair_ptr
logger::Logger * logger_
Logger for printing.
Provides functions and classes for the distributed knowledge base.
uint64_t states
the number of states checkpointed in the file stream
uint64_t last_timestamp
final wallclock time saved in the checkpoint
bool override_lamport
use the lamport clocks in this class instead of KB clock when writing context or checkpoints ...
KnowledgeRecord * get_record(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Retrieves a knowledge record from the key.
void reset_checkpoint(void) const
Reset all checkpoint variables in the modified lists.
knowledge::KnowledgeMap to_map_stripped(const std::string &prefix) const
Creates a map with Knowledge Records that begin with the given prefix.
bool clear(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Clears a variable.
Settings for applying knowledge updates.
void set_write_quality(const std::string &key, uint32_t quality, const KnowledgeReferenceSettings &settings)
Atomically sets write quality of this process for a variable.
Copyright (c) 2015 Carnegie Mellon University.
uint32_t set_quality(const std::string &key, uint32_t quality, bool force_update, const KnowledgeReferenceSettings &settings)
Atomically sets quality of this process for a variable.
int read_file(const std::string &filename, uint32_t read_as_type=0)
reads a file and sets the type appropriately according to the extension
MADARA_EXPORT bool begins_with(const std::string &input, const std::string &prefix)
Check if input contains prefix at the beginning.
Function * retrieve_function(const std::string &name, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Retrieves an external function.
Settings for applying knowledge updates.
~ThreadSafeContext(void)
Destructor.
bool reset_checkpoint
If true, resets the checkpoint to start a new diff from this point forward.
Provides an interface for external functions into the MADARA KaRL variable settings.
ssize_t write_file(const std::string &filename, void *buffer, size_t size)
Writes a file with provided contents.
madara::knowledge::KnowledgeRecord evaluate(const madara::knowledge::KnowledgeUpdateSettings &settings=knowledge::KnowledgeUpdateSettings())
Evaluates the expression tree.
madara::expression::Interpreter * interpreter_
KaRL interpreter.
ThreadSafeContext()
Constructor.
int read_file(const std::string &key, const std::string &filename, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically reads a file into a variable.
MADARA_EXPORT bool ends_with(const std::string &input, const std::string &ending)
Check if input contains a pattern at the end.
uint64_t clock
last modification time
int64_t save_as_json(const std::string &filename) const
Saves the context to a file as JSON.
VariableReferenceMap local_changed_map_