MADARA  3.4.1
ThreadSafeContext.cpp
Go to the documentation of this file.
1 #include <iostream>
2 #include <fstream>
3 #include <sstream>
4 #include <iterator>
5 #include <memory>
6 
7 #include <string.h>
8 
9 #include <stdio.h>
10 #include <time.h>
16 #include "madara/utility/Utility.h"
17 
20 
23 
25 
26 namespace madara
27 {
28 namespace knowledge
29 {
30 // constructor
32  :
33 #ifdef MADARA_CONDITION_MUTEX_CONSTRUCTOR
34  changed_(mutex_),
35 #endif
36  clock_(0)
37 #ifndef _MADARA_NO_KARL_
38  ,
39  interpreter_(new madara::expression::Interpreter())
40 #endif // _MADARA_NO_KARL_
41  ,
42  logger_(logger::global_logger.get())
43 {
44  expansion_splitters_.push_back("{");
45  expansion_splitters_.push_back("}");
46 }
47 
48 // destructor
50 {
51 #ifndef _MADARA_NO_KARL_
52  delete interpreter_;
53 #endif // _MADARA_NO_KARL_
54 }
55 
62  const std::string& key, const KnowledgeReferenceSettings& settings)
63 {
64  std::string key_actual;
65  const std::string* key_ptr;
66  MADARA_GUARD_TYPE guard(mutex_);
67 
68  if (settings.expand_variables)
69  {
70  key_actual = expand_statement(key);
71  key_ptr = &key_actual;
72  }
73  else
74  key_ptr = &key;
75 
76  if (*key_ptr == "")
77  return 0;
78 
79  // if the variable doesn't exist, hash maps create a record automatically
80  // when used in this manner
81  return &map_[*key_ptr];
82 }
83 
85  const std::string& key, const KnowledgeReferenceSettings& settings)
86 {
87  std::string key_actual;
88  const std::string* key_ptr;
89  MADARA_GUARD_TYPE guard(mutex_);
90 
91  // expand the key if the user asked for it
92  if (settings.expand_variables)
93  {
94  key_actual = expand_statement(key);
95  key_ptr = &key_actual;
96  }
97  else
98  key_ptr = &key;
99 
100  if (*key_ptr == "")
101  {
102  return {};
103  }
104 
105  auto iter = map_.lower_bound(*key_ptr);
106  if (iter == map_.end() || iter->first != *key_ptr)
107  {
108  iter = map_.emplace_hint(iter, std::piecewise_construct,
109  std::forward_as_tuple(*key_ptr), std::forward_as_tuple());
110  }
111 
112  return &*iter;
113 }
114 
116  const std::string& key, const KnowledgeReferenceSettings& settings) const
117 {
118  std::string key_actual;
119  const std::string* key_ptr;
120  MADARA_GUARD_TYPE guard(mutex_);
121 
122  // expand the key if the user asked for it
123  if (settings.expand_variables)
124  {
125  key_actual = expand_statement(key);
126  key_ptr = &key_actual;
127  }
128  else
129  key_ptr = &key;
130 
131  if (*key_ptr == "")
132  {
133  return {};
134  }
135 
136  KnowledgeMap::const_iterator found = map_.find(*key_ptr);
137  return {const_cast<VariableReference::pair_ptr>(&*found)};
138 }
139 
140 // set the value of a variable
142  const char* value, size_t size, const KnowledgeUpdateSettings& settings)
143 {
144  MADARA_GUARD_TYPE guard(mutex_);
145  auto record = variable.get_record_unsafe();
146 
147  if (record)
148  {
149  // check if we have the appropriate write quality
150  if (!settings.always_overwrite && record->write_quality < record->quality)
151  return -2;
152 
153  record->set_xml(value, size);
154  record->quality = record->write_quality;
155  record->clock = clock_;
156  record->set_toi(utility::get_time());
157 
158  mark_and_signal(variable, settings);
159  }
160  else
161  return -1;
162 
163  return 0;
164 }
165 
166 // set the value of a variable
168  const char* value, size_t size, const KnowledgeUpdateSettings& settings)
169 {
170  MADARA_GUARD_TYPE guard(mutex_);
171  auto record = variable.get_record_unsafe();
172 
173  if (record)
174  {
175  // check if we have the appropriate write quality
176  if (!settings.always_overwrite && record->write_quality < record->quality)
177  return -2;
178 
179  record->set_text(value, size);
180  record->quality = record->write_quality;
181  record->clock = clock_;
182  record->set_toi(utility::get_time());
183 
184  mark_and_signal(variable, settings);
185  }
186  else
187  return -1;
188 
189  return 0;
190 }
191 
192 // set the value of a variable
194  const unsigned char* value, size_t size,
195  const KnowledgeUpdateSettings& settings)
196 {
197  MADARA_GUARD_TYPE guard(mutex_);
198  auto record = variable.get_record_unsafe();
199 
200  if (record)
201  {
202  // check if we have the appropriate write quality
203  if (!settings.always_overwrite && record->write_quality < record->quality)
204  return -2;
205 
206  record->set_jpeg(value, size);
207  record->quality = record->write_quality;
208  record->clock = clock_;
209  record->set_toi(utility::get_time());
210 
211  mark_and_signal(variable, settings);
212  }
213  else
214  return -1;
215 
216  return 0;
217 }
218 
219 // set the value of a variable
221  const unsigned char* value, size_t size,
222  const KnowledgeUpdateSettings& settings)
223 {
224  MADARA_GUARD_TYPE guard(mutex_);
225  auto record = variable.get_record_unsafe();
226 
227  if (record)
228  {
229  // check if we have the appropriate write quality
230  if (!settings.always_overwrite && record->write_quality < record->quality)
231  return -2;
232 
233  record->set_file(value, size);
234  record->quality = record->write_quality;
235  record->clock = clock_;
236  record->set_toi(utility::get_time());
237 
238  mark_and_signal(variable, settings);
239  }
240  else
241  return -1;
242 
243  return 0;
244 }
245 
246 // set the value of a variable
248  const std::string& filename, const KnowledgeUpdateSettings& settings)
249 {
250  int return_value = 0;
251  MADARA_GUARD_TYPE guard(mutex_);
252  auto record = variable.get_record_unsafe();
253 
254  if (record)
255  {
256  // check if we have the appropriate write quality
257  if (!settings.always_overwrite && record->write_quality < record->quality)
258  return -2;
259 
260  return_value = record->read_file(filename);
261  record->quality = record->write_quality;
262  record->clock = clock_;
263  record->set_toi(utility::get_time());
264 
265  mark_and_signal(variable, settings);
266  }
267  else
268  return return_value = -1;
269 
270  return return_value;
271 }
272 
276  const std::string& key, const KnowledgeReferenceSettings& settings)
277 {
278  // enter the mutex
279  std::string key_actual;
280  const std::string* key_ptr;
281  MADARA_GUARD_TYPE guard(mutex_);
282 
283  if (settings.expand_variables)
284  {
285  key_actual = expand_statement(key);
286  key_ptr = &key_actual;
287  }
288  else
289  key_ptr = &key;
290 
291  // find the key in the knowledge base
292  KnowledgeMap::iterator found = map_.find(*key_ptr);
293 
294  // create the variable if it has never been written to before
295  // and update its current value quality to the quality parameter
296 
297  if (found != map_.end())
298  return map_[*key_ptr].quality;
299 
300  // default quality is 0
301  return 0;
302 }
303 
307  const std::string& key, const KnowledgeReferenceSettings& settings)
308 {
309  // enter the mutex
310  std::string key_actual;
311  const std::string* key_ptr;
312  MADARA_GUARD_TYPE guard(mutex_);
313 
314  if (settings.expand_variables)
315  {
316  key_actual = expand_statement(key);
317  key_ptr = &key_actual;
318  }
319  else
320  key_ptr = &key;
321 
322  // find the key in the knowledge base
323  KnowledgeMap::iterator found = map_.find(*key_ptr);
324 
325  // create the variable if it has never been written to before
326  // and update its current value quality to the quality parameter
327 
328  if (found != map_.end())
329  return map_[*key_ptr].write_quality;
330 
331  // default quality is 0
332  return 0;
333 }
334 
338  uint32_t quality, bool force_update,
339  const KnowledgeReferenceSettings& settings)
340 {
341  // enter the mutex
342  std::string key_actual;
343  const std::string* key_ptr;
344  MADARA_GUARD_TYPE guard(mutex_);
345 
346  if (settings.expand_variables)
347  {
348  key_actual = expand_statement(key);
349  key_ptr = &key_actual;
350  }
351  else
352  key_ptr = &key;
353 
354  // check for null key
355  if (*key_ptr == "")
356  return 0;
357 
358  // find the key in the knowledge base
359  KnowledgeMap::iterator found = map_.find(*key_ptr);
360 
361  // create the variable if it has never been written to before
362  // and update its current value quality to the quality parameter
363 
364  if (found == map_.end() || force_update || quality > found->second.quality)
365  map_[*key_ptr].quality = quality;
366 
367  // return current quality
368  return map_[*key_ptr].quality;
369 }
370 
373  uint32_t quality, const KnowledgeReferenceSettings& settings)
374 {
375  // enter the mutex
376  std::string key_actual;
377  const std::string* key_ptr;
378  MADARA_GUARD_TYPE guard(mutex_);
379 
380  if (settings.expand_variables)
381  {
382  key_actual = expand_statement(key);
383  key_ptr = &key_actual;
384  }
385  else
386  key_ptr = &key;
387 
388  // create the variable if it has never been written to before
389  // and update its local process write quality to the quality parameter
390  map_[*key_ptr].write_quality = quality;
391 }
392 
398  KnowledgeRecord::Integer value, uint32_t quality, uint64_t clock,
399  const KnowledgeUpdateSettings& settings)
400 {
401  int result = 1;
402 
403  // enter the mutex
404  std::string key_actual;
405  const std::string* key_ptr;
406  MADARA_GUARD_TYPE guard(mutex_);
407 
408  if (settings.expand_variables)
409  {
410  key_actual = expand_statement(key);
411  key_ptr = &key_actual;
412  }
413  else
414  key_ptr = &key;
415 
416  // check for null key
417  if (*key_ptr == "")
418  return -1;
419 
420  // find the key in the knowledge base
421  KnowledgeMap::iterator found = map_.find(*key_ptr);
422 
423  // if it's found, then compare the value
424  if (!settings.always_overwrite && found != map_.end())
425  {
426  // setup a rhs
427  KnowledgeRecord rhs;
428  rhs.set_value(value);
429 
430  // if we do not have enough quality to update the variable
431  // return -2
432  if (quality < found->second.quality)
433  result = -2;
434 
435  // if we have the same quality, but our clock value
436  // is less than what we've already seen, then return -3
437  else if (quality == found->second.quality && clock < found->second.clock)
438  result = -3;
439 
440  // check for value already set
441  else if (found->second == rhs)
442  result = 0;
443  }
444  else
445  {
446  auto ret = map_.emplace(std::piecewise_construct,
447  std::forward_as_tuple(*key_ptr), std::make_tuple());
448  found = ret.first;
449  }
450 
451  KnowledgeRecord& record = found->second;
452 
453  // if we need to update quality, then update it
454  if (result != -2 && record.quality != quality)
455  record.quality = quality;
456 
457  // if we need to update the variable clock, then update it
458  if (clock > record.clock)
459  record.clock = clock;
460 
461  // if we need to update the global clock, then update it
462  if (clock > this->clock_)
463  this->clock_ = clock;
464 
465  if (result == 1)
466  {
467  // we have a situation where the value needs to be changed
468  record.set_value(value);
469 
470  record.clock = clock_;
471  record.set_toi(utility::get_time());
472 
473  mark_and_signal(&*found, settings);
474  }
475 
476  // value was changed
477  return result;
478 }
479 
484 int ThreadSafeContext::set_if_unequal(const std::string& key, double value,
485  uint32_t quality, uint64_t clock, const KnowledgeUpdateSettings& settings)
486 {
487  int result = 1;
488 
489  // enter the mutex
490  std::string key_actual;
491  const std::string* key_ptr;
492  MADARA_GUARD_TYPE guard(mutex_);
493 
494  if (settings.expand_variables)
495  {
496  key_actual = expand_statement(key);
497  key_ptr = &key_actual;
498  }
499  else
500  key_ptr = &key;
501 
502  // check for null key
503  if (*key_ptr == "")
504  return -1;
505 
506  // find the key in the knowledge base
507  KnowledgeMap::iterator found = map_.find(*key_ptr);
508 
509  // if it's found, then compare the value
510  if (!settings.always_overwrite && found != map_.end())
511  {
512  // setup a rhs
513  KnowledgeRecord rhs;
514  rhs.set_value(value);
515 
516  // if we do not have enough quality to update the variable
517  // return -2
518  if (quality < found->second.quality)
519  result = -2;
520 
521  // if we have the same quality, but our clock value
522  // is less than what we've already seen, then return -3
523  else if (quality == found->second.quality && clock < found->second.clock)
524  result = -3;
525 
526  // check for value already set
527  else if (found->second == rhs)
528  result = 0;
529  }
530  else
531  {
532  auto ret = map_.emplace(std::piecewise_construct,
533  std::forward_as_tuple(*key_ptr), std::make_tuple());
534  found = ret.first;
535  }
536 
537  KnowledgeRecord& record = found->second;
538 
539  // if we need to update quality, then update it
540  if (result != -2 && record.quality != quality)
541  record.quality = quality;
542 
543  // if we need to update the variable clock, then update it
544  if (clock > record.clock)
545  record.clock = clock;
546 
547  // if we need to update the global clock, then update it
548  if (clock > this->clock_)
549  this->clock_ = clock;
550 
551  if (result == 1)
552  {
553  // we have a situation where the value needs to be changed
554  record.set_value(value);
555  record.clock = clock_;
556  record.set_toi(utility::get_time());
557 
558  mark_and_signal(&*found, settings);
559  }
560 
561  // value was changed
562  return result;
563 }
564 
570  const std::string& value, uint32_t quality, uint64_t clock,
571  const KnowledgeUpdateSettings& settings)
572 {
573  int result = 1;
574 
575  // enter the mutex
576  std::string key_actual;
577  const std::string* key_ptr;
578  MADARA_GUARD_TYPE guard(mutex_);
579 
580  if (settings.expand_variables)
581  {
582  key_actual = expand_statement(key);
583  key_ptr = &key_actual;
584  }
585  else
586  key_ptr = &key;
587 
588  // check for null key
589  if (*key_ptr == "")
590  return -1;
591 
592  // find the key in the knowledge base
593  KnowledgeMap::iterator found = map_.find(*key_ptr);
594 
595  // if it's found, then compare the value
596  if (!settings.always_overwrite && found != map_.end())
597  {
598  // setup a rhs
599  KnowledgeRecord rhs;
600  rhs.set_value(value);
601 
602  // if we do not have enough quality to update the variable
603  // return -2
604  if (quality < found->second.quality)
605  result = -2;
606 
607  // if we have the same quality, but our clock value
608  // is less than what we've already seen, then return -3
609  else if (quality == found->second.quality && clock < found->second.clock)
610  result = -3;
611 
612  // check for value already set
613  else if (found->second == rhs)
614  result = 0;
615  }
616  else
617  {
618  auto ret = map_.emplace(std::piecewise_construct,
619  std::forward_as_tuple(*key_ptr), std::make_tuple());
620  found = ret.first;
621  }
622 
623  KnowledgeRecord& record = found->second;
624 
625  // if we need to update quality, then update it
626  if (result != -2 && record.quality != quality)
627  record.quality = quality;
628 
629  // if we need to update the variable clock, then update it
630  if (clock > record.clock)
631  record.clock = clock;
632 
633  // if we need to update the global clock, then update it
634  if (clock > this->clock_)
635  this->clock_ = clock;
636 
637  if (result == 1)
638  {
639  // we have a situation where the value needs to be changed
640  record.set_value(value);
641  record.clock = clock_;
642  record.set_toi(utility::get_time());
643 
644  // otherwise set the value
645  mark_and_signal(&*found, settings);
646  }
647  // value was changed
648  return result;
649 }
650 
656  const knowledge::KnowledgeRecord& rhs,
657  const KnowledgeUpdateSettings& settings)
658 {
659  int result = 1;
660 
661  // enter the mutex
662  std::string key_actual;
663  const std::string* key_ptr;
664  MADARA_GUARD_TYPE guard(mutex_);
665 
666  if (settings.expand_variables)
667  {
668  key_actual = expand_statement(key);
669  key_ptr = &key_actual;
670  }
671  else
672  key_ptr = &key;
673 
674  // check for null key
675  if (*key_ptr == "")
676  return -1;
677 
678  // find the key in the knowledge base
679  KnowledgeMap::iterator found = map_.find(*key_ptr);
680 
681  // if it's found, then compare the value
682  if (!settings.always_overwrite && found != map_.end())
683  {
684  // if we do not have enough quality to update the variable
685  // return -2
686  if (rhs.quality < found->second.quality)
687  return -2;
688 
689  // if we have the same quality, but our clock value
690  // is less than what we've already seen, then return -3
691  else if (rhs.quality == found->second.quality &&
692  rhs.clock < found->second.clock)
693  return -3;
694 
695  // if we reach this point, then the record is safe to copy
696  found->second.set_full(rhs);
697 
698  mark_and_signal(&*found, settings);
699  }
700  else
701  {
702  // if we reach this point, then we have to create the record
703  auto ret = map_.emplace(std::piecewise_construct,
704  std::forward_as_tuple(*key_ptr), std::forward_as_tuple(rhs));
705  found = ret.first;
706 
707  if (!ret.second)
708  {
709  ret.first->second = rhs;
710  }
711 
712  mark_and_signal(&*found, settings);
713  }
714 
715  // if we need to update the global clock, then update it
716  if (rhs.clock >= this->clock_)
717  this->clock_ = rhs.clock + 1;
718 
719  // if (settings.signal_changes)
720  // changed_.MADARA_CONDITION_NOTIFY_ONE ();
721 
722  // value was changed
723  return result;
724 }
725 
731  const VariableReference& target, const knowledge::KnowledgeRecord& rhs,
732  const KnowledgeUpdateSettings& settings)
733 {
734  int result = 1;
735 
736  MADARA_GUARD_TYPE guard(mutex_);
737  auto record = target.get_record_unsafe();
738 
739  // if it's found, then compare the value
740  if (!settings.always_overwrite && target.is_valid())
741  {
742  // if we do not have enough quality to update the variable
743  // return -2
744  if (rhs.quality < record->quality)
745  result = -2;
746 
747  // if we have the same quality, but our clock value
748  // is less than what we've already seen, then return -3
749  else if (rhs.quality == record->quality && rhs.clock < record->clock)
750  result = -3;
751 
752  // if we reach this point, then the record is safe to copy
753  record->set_full(rhs);
754 
755  mark_and_signal(target, settings);
756  }
757 
758  // if we need to update the global clock, then update it
759  if (rhs.clock >= this->clock_)
760  this->clock_ = rhs.clock + 1;
761 
762  // value was changed
763  return result;
764 }
765 
770 {
771  changed_.MADARA_CONDITION_NOTIFY_ONE();
772 }
773 
774 // print all variables and their values
775 void ThreadSafeContext::print(unsigned int level) const
776 {
777  MADARA_GUARD_TYPE guard(mutex_);
778  for (KnowledgeMap::const_iterator i = map_.begin(); i != map_.end(); ++i)
779  {
780  if (i->second.exists())
781  {
782  madara_logger_checked_ptr_log(logger_, (int)level, "%s=%s\n", i->first.c_str(),
783  i->second.to_string(", ").c_str());
784  }
785  }
786 }
787 
788 // print all variables and their values
790  const std::string& array_delimiter, const std::string& record_delimiter,
791  const std::string& key_val_delimiter) const
792 {
793  MADARA_GUARD_TYPE guard(mutex_);
794  std::stringstream buffer;
795 
796  bool first = true;
797 
798  for (KnowledgeMap::const_iterator i = map_.begin(); i != map_.end(); ++i)
799  {
800  // separate each record with the record_delimiter
801  if (!first)
802  {
803  buffer << record_delimiter;
804  }
805 
806  buffer << i->first;
807 
808  // separate the key/value pairing with the key_val_delimiter
809  buffer << key_val_delimiter;
810 
811  if (i->second.is_string_type())
812  {
813  buffer << "'";
814  }
815  else if (i->second.type() == i->second.DOUBLE_ARRAY ||
816  i->second.type() == i->second.INTEGER_ARRAY)
817  {
818  buffer << "[";
819  }
820 
821  // use the array_delimiter for the underlying to_string functions
822  buffer << i->second.to_string(array_delimiter);
823 
824  if (i->second.is_string_type())
825  {
826  buffer << "'";
827  }
828  else if (i->second.type() == i->second.DOUBLE_ARRAY ||
829  i->second.type() == i->second.INTEGER_ARRAY)
830  {
831  buffer << "]";
832  }
833 
834  if (first)
835  first = false;
836  }
837 
838  target = buffer.str();
839 }
840 
845  const std::string& statement) const
846 {
847  // enter the mutex
848  MADARA_GUARD_TYPE guard(mutex_);
849 
850  // vectors for holding parsed tokens and pivot_list
851  size_t subcount = 0;
852  size_t begin_exp = 0;
853 
854  std::stringstream builder;
855 
856  // iterate over the input string
857  for (std::string::size_type i = 0; i < statement.size(); ++i)
858  {
859  // if this is an open brace, increase the subcount
860  if (statement[i] == '{')
861  {
862  ++subcount;
863  if (subcount == 1)
864  begin_exp = i;
865  }
866  // closed brace should decrease subcount
867  else if (statement[i] == '}')
868  {
869  if (subcount == 1)
870  {
871  std::string expandable =
872  statement.substr(begin_exp + 1, i - begin_exp - 1);
873  std::string results = this->expand_statement(expandable);
874  builder << this->get(results);
875  }
876  --subcount;
877  }
878  // otherwise, if this subcount is 0, then we need to add it to our output
879  // we allow anything not in subcount == 0 to be handled through recursion
880  else
881  {
882  if (subcount == 0)
883  builder << statement[i];
884  }
885  }
886 
887  // check to see if all brace counts are appropriate
888  if (subcount != 0)
889  {
891  "KARL COMPILE ERROR : Improperly matched braces in %s\n",
892  statement.c_str());
893  }
894 
895  return builder.str();
896 }
897 
898 #ifndef _MADARA_NO_KARL_
899 
900 // defines a function by name
903  const KnowledgeReferenceSettings& settings)
904 {
905  // enter the mutex
906  std::string key_actual;
907  const std::string* key_ptr;
908  MADARA_GUARD_TYPE guard(mutex_);
909 
910  if (settings.expand_variables)
911  {
912  key_actual = expand_statement(name);
913  key_ptr = &key_actual;
914  }
915  else
916  key_ptr = &name;
917 
918  // check for null key
919  if (*key_ptr == "")
920  return;
921 
922  functions_[*key_ptr] = Function(func);
923 }
924 
927  const char* name, FunctionArguments&, Variables&),
928  const KnowledgeReferenceSettings& settings)
929 {
930  // enter the mutex
931  std::string key_actual;
932  const std::string* key_ptr;
933  MADARA_GUARD_TYPE guard(mutex_);
934 
935  if (settings.expand_variables)
936  {
937  key_actual = expand_statement(name);
938  key_ptr = &key_actual;
939  }
940  else
941  key_ptr = &name;
942 
943  // check for null key
944  if (*key_ptr == "")
945  return;
946 
947  functions_[*key_ptr] = Function(func);
948 }
949 
950 #ifdef _MADARA_JAVA_
952  jobject callable, const KnowledgeReferenceSettings& settings)
953 {
954  // enter the mutex
955  std::string key_actual;
956  const std::string* key_ptr;
957  MADARA_GUARD_TYPE guard(mutex_);
958 
959  if (settings.expand_variables)
960  {
961  key_actual = expand_statement(name);
962  key_ptr = &key_actual;
963  }
964  else
965  key_ptr = &name;
966 
967  // check for null key
968  if (*key_ptr == "")
969  return;
970 
971  functions_[*key_ptr] = Function(callable);
972 }
973 #endif
974 
975 #ifdef _MADARA_PYTHON_CALLBACKS_
977  boost::python::object callable, const KnowledgeReferenceSettings& settings)
978 {
979  // enter the mutex
980  std::string key_actual;
981  const std::string* key_ptr;
982  MADARA_GUARD_TYPE guard(mutex_);
983 
984  if (settings.expand_variables)
985  {
986  key_actual = expand_statement(name);
987  key_ptr = &key_actual;
988  }
989  else
990  key_ptr = &name;
991 
992  // check for null key
993  if (*key_ptr == "")
994  return;
995 
996  functions_[*key_ptr] = Function(callable);
997 }
998 
999 #endif
1000 
1002  const std::string& expression, const KnowledgeReferenceSettings& settings)
1003 {
1004  CompiledExpression compiled = compile(expression);
1005  define_function(name, compiled, settings);
1006 }
1007 
1009  const CompiledExpression& expression,
1010  const KnowledgeReferenceSettings& settings)
1011 {
1012 #ifndef _MADARA_NO_KARL_
1013  // enter the mutex
1014  std::string key_actual;
1015  const std::string* key_ptr;
1016  MADARA_GUARD_TYPE guard(mutex_);
1017 
1018  if (settings.expand_variables)
1019  {
1020  key_actual = expand_statement(name);
1021  key_ptr = &key_actual;
1022  }
1023  else
1024  key_ptr = &name;
1025 
1026  // check for null key
1027  if (*key_ptr == "")
1028  return;
1029 
1030  functions_[*key_ptr] = Function(expression.expression);
1031 #endif
1032 }
1033 
1035  const std::string& name, const KnowledgeReferenceSettings& settings)
1036 {
1037  // enter the mutex
1038  std::string key_actual;
1039  const std::string* key_ptr;
1040  MADARA_GUARD_TYPE guard(mutex_);
1041 
1042  if (settings.expand_variables)
1043  {
1044  key_actual = expand_statement(name);
1045  key_ptr = &key_actual;
1046  }
1047  else
1048  key_ptr = &name;
1049 
1050  // check for null key
1051  if (*key_ptr == "")
1052  return 0;
1053 
1054  return &functions_[*key_ptr];
1055 }
1056 
1058 {
1060  "ThreadSafeContext::compile:"
1061  " compiling %s\n",
1062  expression.c_str());
1063 
1064  MADARA_GUARD_TYPE guard(mutex_);
1065  CompiledExpression ce;
1066  ce.logic = expression;
1067  ce.expression = interpreter_->interpret(*this, expression);
1068 
1069  return ce;
1070 }
1071 
1073  CompiledExpression expression, const KnowledgeUpdateSettings& settings)
1074 {
1075  MADARA_GUARD_TYPE guard(mutex_);
1076  return expression.expression.evaluate(settings);
1077 }
1078 
1080  expression::ComponentNode* root, const KnowledgeUpdateSettings& settings)
1081 {
1082  MADARA_GUARD_TYPE guard(mutex_);
1083  if (root)
1084  return root->evaluate(settings);
1085  else
1087 }
1088 
1089 #endif // _MADARA_NO_KARL_
1090 
1092  unsigned int start, unsigned int end, std::vector<KnowledgeRecord>& target)
1093 {
1094  target.clear();
1095 
1096  // enter the mutex
1097  MADARA_GUARD_TYPE guard(mutex_);
1098 
1099  if (end >= start)
1100  {
1101  target.resize(end - start + 1);
1102 
1103  for (unsigned int i = 0; start <= end; ++start, ++i)
1104  {
1105  std::stringstream buffer;
1106  buffer << subject;
1107  buffer << start;
1108  target[i] = get(buffer.str());
1109  }
1110  }
1111 
1112  return target.size();
1113 }
1114 
1115 size_t ThreadSafeContext::to_map(const std::string& expression,
1116  std::map<std::string, knowledge::KnowledgeRecord>& target)
1117 {
1118  target.clear();
1119 
1120  std::string subject(expression);
1121  bool matches_found(false);
1122 
1123  // remove the wildcard and make this into a subject
1124  if (subject[subject.size() - 1] == '*')
1125  subject.resize(subject.size() - 1);
1126 
1127  // just in case a string implementation does not inline
1128  std::string::size_type subject_size = subject.size();
1129  const char* subject_ptr = subject.c_str();
1130 
1131  // enter the mutex
1132  MADARA_GUARD_TYPE guard(mutex_);
1133 
1134  // if expression is blank, assume the user wants all variables
1135  if (expression.size() == 0)
1136  target = map_;
1137  else
1138  {
1139  for (KnowledgeMap::iterator i = map_.begin(); i != map_.end(); ++i)
1140  {
1141  if (i->first.size() >= subject_size)
1142  {
1143  int result = strncmp(i->first.c_str(), subject_ptr, subject_size);
1144  if (result == 0)
1145  {
1146  // we have a match, add this to the map
1147  target[i->first] = i->second;
1148  matches_found = true;
1149  }
1150  else if (matches_found)
1151  {
1152  // we have already found matches, and now we're not seeing matches
1153  break;
1154  }
1155  }
1156  }
1157  }
1158 
1159  return target.size();
1160 }
1161 
1163  const std::string& suffix, VariableReferences& matches)
1164 {
1165  // get the first thing that either matches the prefix or is just after it
1166  KnowledgeMap::iterator i = map_.lower_bound(prefix);
1167 
1168  // if we have a valid prefix, then there is more to do
1169  if (i != map_.end())
1170  {
1171  // keep track of the beginning as we're going to iterate twice
1172  KnowledgeMap::iterator first_match = i;
1173  KnowledgeMap::iterator after_matches = map_.end();
1174  VariableReferences::iterator match;
1175 
1176  size_t num_matches = 0;
1177 
1178  size_t prefix_length = prefix.length();
1179 
1180  // Iterate over all of the prefix matches
1181  while (i != map_.end() && i->first.compare(0, prefix_length, prefix) == 0)
1182  {
1183  ++i;
1184  if (suffix == "" || utility::ends_with(i->first, suffix))
1185  {
1186  ++num_matches;
1187  }
1188  }
1189 
1190  // save the end point so we can do fewer checks
1191  after_matches = i;
1192 
1193  // resize the matches to the appropriate size
1194  matches.resize(num_matches);
1195 
1196  // now, instead of many resizes, we are just going to resize once and set
1197  i = first_match;
1198  num_matches = 0;
1199 
1200  match = matches.begin();
1201 
1202  // Reiterate
1203  while (i != after_matches)
1204  {
1205  if (suffix == "" || utility::ends_with(i->first, suffix))
1206  {
1207  match->assign(&*i);
1208  ++match;
1209  }
1210  ++i;
1211  }
1212  }
1213  else
1214  {
1215  matches.clear();
1216  }
1217 }
1218 
1220  const std::string& delimiter, const std::string& suffix,
1221  std::vector<std::string>& next_keys,
1222  std::map<std::string, knowledge::KnowledgeRecord>& result, bool just_keys)
1223 {
1224  // clear the user provided maps
1225  next_keys.clear();
1226  result.clear();
1227 
1228  // loop tracking for optimizations
1229  bool matches_found(false);
1230  std::string last_key("");
1231 
1232  // enter the mutex
1233  MADARA_GUARD_TYPE guard(mutex_);
1234 
1235  KnowledgeMap::iterator i = map_.begin();
1236 
1237  if (prefix != "")
1238  {
1239  i = map_.lower_bound(prefix);
1240  }
1241 
1242  for (; i != map_.end(); ++i)
1243  {
1244  // if the prefix doesn't match
1245  if (prefix != "" && !utility::begins_with(i->first, prefix))
1246  {
1247  // if we had previously matched a prefix, we're done
1248  if (matches_found)
1249  {
1250  break;
1251  }
1252  }
1253  // we have a prefix match
1254  else
1255  {
1256  // set matches found if it hasn't been set previously
1257  if (!matches_found)
1258  {
1259  matches_found = true;
1260  }
1261 
1262  // if the suffix is provided and doesn't match, continue
1263  if (suffix != "" && !utility::ends_with(i->first, suffix))
1264  {
1265  continue;
1266  }
1267 
1268  if (!just_keys)
1269  {
1270  // the key is safe to add to the master map
1271  result[i->first] = i->second;
1272  }
1273 
1274  // determine if there is a next key in the hierarchy
1275  size_t prefix_end = prefix.length() + delimiter.length();
1276 
1277  std::string current_delimiter =
1278  i->first.substr(prefix.length(), delimiter.length());
1279 
1280  if (current_delimiter == delimiter && i->first.length() > prefix_end)
1281  {
1282  // find the end of the sub key
1283  size_t key_end = i->first.find(delimiter, prefix_end);
1284 
1285  // if we haven't seen the subkey, add it
1286  std::string current_key(
1287  i->first.substr(prefix_end, key_end - prefix_end));
1288  if (current_key != last_key)
1289  {
1290  next_keys.push_back(current_key);
1291  last_key = current_key;
1292  }
1293  }
1294  }
1295  }
1296 
1297  return result.size();
1298 }
1299 
1301  const std::string& prefix, const KnowledgeReferenceSettings&)
1302 {
1303  // enter the mutex
1304  MADARA_GUARD_TYPE guard(mutex_);
1305 
1306  std::pair<KnowledgeMap::iterator, KnowledgeMap::iterator> iters(
1307  get_prefix_range(prefix));
1308 
1309  map_.erase(iters.first, iters.second);
1310 
1311  {
1312  // check the changed map
1313  std::pair<VariableReferenceMap::iterator, VariableReferenceMap::iterator>
1314  changed(changed_map_.lower_bound(prefix.c_str()), changed_map_.end());
1315 
1316  // does our lower bound actually contain the prefix?
1317  if (prefix.compare(0, prefix.size(), changed.first->first, prefix.size()) ==
1318  0)
1319  {
1320  changed.second = changed.first;
1321 
1322  // until we find an entry that does not begin with prefix, loop
1323  for (++changed.second; (prefix.compare(0, prefix.size(),
1324  changed.second->first, prefix.size()) == 0) &&
1325  changed.second != changed_map_.end();
1326  ++changed.second)
1327  {
1328  }
1329 
1330  changed_map_.erase(changed.first, changed.second);
1331  }
1332  }
1333 
1334  {
1335  // check the local changed map
1336  std::pair<VariableReferenceMap::iterator, VariableReferenceMap::iterator>
1337  local_changed(local_changed_map_.lower_bound(prefix.c_str()),
1338  local_changed_map_.end());
1339 
1340  // does our lower bound actually contain the prefix?
1341  if (prefix.compare(
1342  0, prefix.size(), local_changed.first->first, prefix.size()) == 0)
1343  {
1344  local_changed.second = local_changed.first;
1345 
1346  // until we find an entry that does not begin with prefix, loop
1347  for (++local_changed.second;
1348  (prefix.compare(0, prefix.size(), local_changed.second->first,
1349  prefix.size()) == 0) &&
1350  local_changed.second != local_changed_map_.end();
1351  ++local_changed.second)
1352  {
1353  }
1354 
1355  local_changed_map_.erase(local_changed.first, local_changed.second);
1356  }
1357  }
1358 }
1359 
1360 std::pair<KnowledgeMap::iterator, KnowledgeMap::iterator>
1362 {
1363  std::pair<KnowledgeMap::iterator, KnowledgeMap::iterator> ret(
1364  map_.begin(), map_.end());
1365 
1366  // If prefix is empty string, copy entire map
1367  if (prefix.size() > 0)
1368  {
1369  ssize_t psz = prefix.size();
1370 
1371  // Find first element >= prefix; i.e., first match or first with that prefix
1372  ret.second = ret.first = map_.lower_bound(prefix);
1373 
1374  // Advance e until it is just past last element with prefix (or at end)
1375  while (ret.second != map_.end() &&
1376  ret.second->first.compare(0, psz, prefix) == 0)
1377  ++ret.second;
1378  }
1379  return ret;
1380 }
1381 
1382 std::pair<KnowledgeMap::const_iterator, KnowledgeMap::const_iterator>
1384 {
1385  std::pair<KnowledgeMap::const_iterator, KnowledgeMap::const_iterator> ret(
1386  map_.begin(), map_.end());
1387 
1388  // If prefix is empty string, copy entire map
1389  if (prefix.size() > 0)
1390  {
1391  ssize_t psz = prefix.size();
1392 
1393  // Find first element >= prefix; i.e., first match or first with that prefix
1394  ret.second = ret.first = map_.lower_bound(prefix);
1395 
1396  // Advance e until it is just past last element with prefix (or at end)
1397  while (ret.second != map_.end() &&
1398  ret.second->first.compare(0, psz, prefix) == 0)
1399  ++ret.second;
1400  }
1401  return ret;
1402 }
1403 
1405 {
1406  // enter the mutex
1407  MADARA_GUARD_TYPE guard(mutex_);
1408 
1409  std::pair<KnowledgeMap::const_iterator, KnowledgeMap::const_iterator> iters(
1410  get_prefix_range(prefix));
1411 
1412  // RVO should avoid copying this map
1413  return KnowledgeMap(iters.first, iters.second);
1414 }
1415 
1417 {
1418  // enter the mutex
1419  MADARA_GUARD_TYPE guard(mutex_);
1420 
1421  std::pair<KnowledgeMap::const_iterator, KnowledgeMap::const_iterator> iters(
1422  get_prefix_range(prefix));
1423 
1424  // NRVO should avoid copying this map
1425  KnowledgeMap ret;
1426  for (; iters.first != iters.second; ++iters.first)
1427  {
1428  ret.emplace_hint(ret.end(), iters.first->first.substr(prefix.size()),
1429  iters.first->second);
1430  }
1431  return ret;
1432 }
1433 
1435  const KnowledgeRequirements& reqs, const KnowledgeUpdateSettings& settings)
1436 {
1438  "ThreadSafeContext::copy:"
1439  " copying a context\n");
1440 
1441  if (reqs.clear_knowledge)
1442  {
1444  "ThreadSafeContext::copy:"
1445  " clearing knowledge in target context\n");
1446 
1447  map_.clear();
1448  }
1449 
1450  if (reqs.predicates.size() != 0)
1451  {
1452  for (auto predicate : reqs.predicates)
1453  {
1454  std::pair<KnowledgeMap::const_iterator, KnowledgeMap::const_iterator>
1455  iters(source.get_prefix_range(predicate.prefix));
1456 
1457  if (predicate.suffix == "")
1458  {
1460  "ThreadSafeContext::copy:"
1461  " matching predicate.prefix=%s\n",
1462  predicate.prefix.c_str());
1463 
1464  for (; iters.first != iters.second; ++iters.first)
1465  {
1467  "ThreadSafeContext::copy:"
1468  " looking for %s\n",
1469  iters.first->first.c_str());
1470 
1471  auto where = map_.lower_bound(iters.first->first);
1472 
1473  if (where == map_.end() || where->first != iters.first->first)
1474  {
1476  "ThreadSafeContext::copy:"
1477  " inserting %s\n",
1478  iters.first->first.c_str());
1479 
1480  where = map_.emplace_hint(
1481  where, iters.first->first, iters.first->second);
1482  }
1483  else
1484  {
1486  "ThreadSafeContext::copy:"
1487  " overwriting %s\n",
1488  iters.first->first.c_str());
1489 
1490  where->second = iters.first->second;
1491  }
1492 
1493  mark_modified(iters.first->first, settings);
1494  }
1495  }
1496  else // we need to match a suffix
1497  {
1499  "ThreadSafeContext::copy:"
1500  " matching predicate.suffix=%s\n",
1501  predicate.suffix.c_str());
1502 
1503  for (; iters.first != iters.second; ++iters.first)
1504  {
1505  if (madara::utility::ends_with(iters.first->first, predicate.suffix))
1506  {
1508  "ThreadSafeContext::copy:"
1509  " looking for %s\n",
1510  iters.first->first.c_str());
1511 
1512  auto where = map_.lower_bound(iters.first->first);
1513 
1514  if (where == map_.end() || where->first != iters.first->first)
1515  {
1517  "ThreadSafeContext::copy:"
1518  " inserting %s\n",
1519  iters.first->first.c_str());
1520 
1521  where = map_.emplace_hint(
1522  where, iters.first->first, iters.first->second);
1523  }
1524  else
1525  {
1527  "ThreadSafeContext::copy:"
1528  " overwriting %s\n",
1529  iters.first->first.c_str());
1530 
1531  where->second = iters.first->second;
1532  }
1533 
1534  mark_modified(iters.first->first, settings);
1535  } // end suffix match
1536  }
1537  }
1538  }
1539  }
1540  // we need to insert everything from source into this
1541  else
1542  {
1543  std::pair<KnowledgeMap::const_iterator, KnowledgeMap::const_iterator> iters(
1544  source.map_.begin(), source.map_.end());
1545 
1546  for (; iters.first != iters.second; ++iters.first)
1547  {
1548  map_.insert(map_.begin(),
1549  KnowledgeMap::value_type(iters.first->first, iters.first->second));
1550 
1551  mark_modified(iters.first->first, settings);
1552  }
1553  }
1554 }
1555 
1557  const CopySet& copy_set, bool clean_copy,
1558  const KnowledgeUpdateSettings& settings)
1559 {
1560  // if we need to clean first, clear the map
1561  if (clean_copy)
1562  map_.clear();
1563 
1564  // if the copy set is empty, copy everything
1565  if (copy_set.size() == 0)
1566  {
1567  for (KnowledgeMap::const_iterator i = source.map_.begin();
1568  i != source.map_.end(); ++i)
1569  {
1570  map_[i->first] = (i->second);
1571  mark_modified(i->first, settings);
1572  }
1573  }
1574  else
1575  {
1576  // we have a copy set, so only copy what the user asked for
1577  for (CopySet::const_iterator key = copy_set.begin(); key != copy_set.end();
1578  ++key)
1579  {
1580  // check source for existence of the current copy set key
1581  KnowledgeMap::const_iterator i = source.map_.find(key->first);
1582 
1583  // if found, make a copy of the found entry
1584  if (i != source.map_.end())
1585  {
1586  map_[i->first] = (i->second);
1587  mark_modified(i->first, settings);
1588  }
1589  }
1590  }
1591 }
1592 
1594  const std::string& filename, const std::string& id) const
1595 {
1596  CheckpointSettings settings;
1597  settings.filename = filename;
1598  settings.originator = id;
1599 
1600  return save_context(settings);
1601 }
1602 
1604  const CheckpointSettings& settings) const
1605 {
1607  "ThreadSafeContext::save_context:"
1608  " opening file %s\n",
1609  settings.filename.c_str());
1610 
1611  // int64_t total_written (0);
1612  FILE* file = fopen(settings.filename.c_str(), "wb");
1613 
1614  FileHeader meta;
1615  meta.states = 1;
1616  utility::strncpy_safe(meta.originator, settings.originator.c_str(),
1617  sizeof(meta.originator) < settings.originator.size() + 1
1618  ? sizeof(meta.originator)
1619  : settings.originator.size() + 1);
1620 
1621  transport::MessageHeader checkpoint_header;
1622 
1623  if (file)
1624  {
1625  int64_t max_buffer(settings.buffer_size);
1626  int64_t buffer_remaining(max_buffer);
1627 
1629  "ThreadSafeContext::save_context:"
1630  " allocating %d byte buffer\n",
1631  (int)max_buffer);
1632 
1633  utility::ScopedArray<char> buffer = new char[max_buffer];
1634 
1635  char* current = buffer.get_ptr();
1636 
1638  "ThreadSafeContext::save_context:"
1639  " generating file meta\n");
1640 
1641  meta.size += checkpoint_header.encoded_size();
1642  checkpoint_header.size = checkpoint_header.encoded_size();
1643 
1644  if (settings.override_timestamp)
1645  {
1646  meta.initial_timestamp = settings.initial_timestamp;
1647  meta.last_timestamp = settings.last_timestamp;
1648  }
1649 
1650  current = meta.write(current, buffer_remaining);
1651 
1652  if (settings.override_lamport)
1653  {
1654  checkpoint_header.clock = settings.initial_lamport_clock;
1655  }
1656  else
1657  {
1658  checkpoint_header.clock = clock_;
1659  }
1660 
1661  current = checkpoint_header.write(current, buffer_remaining);
1662 
1664  "ThreadSafeContext::save_context:"
1665  " writing records\n");
1666 
1667  // lock the context
1668  MADARA_GUARD_TYPE guard(mutex_);
1669 
1670  for (KnowledgeMap::const_iterator i = map_.begin(); i != map_.end(); ++i)
1671  {
1672  if (i->second.exists())
1673  {
1674  // check if the prefix is allowed
1675  if (settings.prefixes.size() > 0)
1676  {
1678  "ThreadSafeContext::save_context:"
1679  " we have %d prefixes to check against.\n",
1680  (int)settings.prefixes.size());
1681 
1682  bool prefix_found = false;
1683  for (size_t j = 0; j < settings.prefixes.size() && !prefix_found; ++j)
1684  {
1686  "ThreadSafeContext::save_context:"
1687  " checking record %s against prefix %s.\n",
1688  i->first.c_str(), settings.prefixes[j].c_str());
1689 
1690  if (madara::utility::begins_with(i->first, settings.prefixes[j]))
1691  {
1693  "ThreadSafeContext::save_context:"
1694  " record has the correct prefix.\n");
1695 
1696  prefix_found = true;
1697  }
1698  }
1699 
1700  if (!prefix_found)
1701  {
1703  "ThreadSafeContext::save_context:"
1704  " record has the wrong prefix. Rejected.\n");
1705 
1706  continue;
1707  }
1708  }
1709 
1710  auto pre_write = current;
1711  current = i->second.write(current, i->first, buffer_remaining);
1712  size_t encoded_size = current - pre_write;
1713 
1714  ++checkpoint_header.updates;
1715  // meta.size += encoded_size;
1716  checkpoint_header.size += encoded_size;
1717  }
1718  }
1719 
1720  // write the final sizes
1721  current = checkpoint_header.write(
1722  buffer.get_ptr() + (int)FileHeader::encoded_size(), max_buffer);
1723 
1724  // call decode with any buffer filters
1725  int total =
1726  settings.encode(buffer.get_ptr() + (int)FileHeader::encoded_size(),
1727  (int)checkpoint_header.size, (int)max_buffer);
1728 
1729  if (total < 0)
1730  {
1731  throw exceptions::FilterException("ThreadSafeContext::save_context: "
1732  "encode() returned -1 size. Cannot "
1733  "save a negative sized checkpoint.");
1734  }
1735 
1736  // if (settings.buffer_filters.size () > 0)
1737  // {
1738  // total += (int)filters::BufferFilterHeader::encoded_size ();
1739  // }
1740 
1741  meta.size = (uint64_t)total;
1742 
1743  current = meta.write(buffer.get_ptr(), max_buffer);
1744 
1745  // update the meta data at the front
1746  fseek(file, 0, SEEK_SET);
1747 
1749  "ThreadSafeContext::save_context:"
1750  " encoding with buffer filters: %d:%d bytes written to offset %d.\n",
1751  (int)meta.size, (int)checkpoint_header.size,
1752  (int)FileHeader::encoded_size());
1753 
1754  fwrite(buffer.get_ptr(), (size_t)total + (size_t)FileHeader::encoded_size(),
1755  1, file);
1756 
1757  fclose(file);
1758  }
1759  else
1760  {
1762  "ThreadSafeContext::save_context:"
1763  " couldn't open context file: %s.\n",
1764  settings.filename.c_str());
1765 
1766  return -1;
1767  }
1768 
1769  return meta.size;
1770 }
1771 
1772 int64_t ThreadSafeContext::save_as_karl(const std::string& filename) const
1773 {
1774  CheckpointSettings settings;
1775  settings.filename = filename;
1776 
1777  return save_as_karl(settings);
1778 }
1779 
1781  const CheckpointSettings& settings) const
1782 {
1784  "ThreadSafeContext::save_as_karl:"
1785  " opening file %s\n",
1786  settings.filename.c_str());
1787 
1788  int64_t bytes_written(0);
1789  std::stringstream buffer;
1790  std::ofstream file;
1791  file.open(settings.filename.c_str(), std::ios::binary);
1792 
1793  if (file.is_open())
1794  {
1795  // lock the context
1796  MADARA_GUARD_TYPE guard(mutex_);
1797 
1798  for (KnowledgeMap::const_iterator i = map_.begin(); i != map_.end(); ++i)
1799  {
1800  if (i->second.exists())
1801  {
1802  // check if the prefix is allowed
1803  if (settings.prefixes.size() > 0)
1804  {
1806  "ThreadSafeContext::save_as_karl:"
1807  " we have %d prefixes to check against.\n",
1808  (int)settings.prefixes.size());
1809 
1810  bool prefix_found = false;
1811  for (size_t j = 0; j < settings.prefixes.size() && !prefix_found; ++j)
1812  {
1814  "ThreadSafeContext::save_as_karl:"
1815  " checking record %s against prefix %s.\n",
1816  i->first.c_str(), settings.prefixes[j].c_str());
1817 
1818  if (madara::utility::begins_with(i->first, settings.prefixes[j]))
1819  {
1821  "ThreadSafeContext::save_as_karl:"
1822  " the record has the correct prefix.\n");
1823 
1824  prefix_found = true;
1825  }
1826  }
1827 
1828  if (!prefix_found)
1829  {
1831  "ThreadSafeContext::save_as_karl:"
1832  " the record does not have a correct prefix.\n");
1833 
1834  continue;
1835  }
1836  }
1837 
1838  buffer << i->first;
1839  buffer << "=";
1840 
1841  if (!i->second.is_binary_file_type())
1842  {
1843  // record is a non binary file type
1844  if (i->second.is_string_type())
1845  {
1846  // strings require quotation marks
1847  buffer << "\"";
1848  }
1849  else if (i->second.type() ==
1851  i->second.type() == knowledge::KnowledgeRecord::DOUBLE_ARRAY)
1852  {
1853  // arrays require brackets
1854  buffer << "[";
1855  }
1856 
1857  buffer << i->second;
1858  if (i->second.is_string_type())
1859  {
1860  // strings require quotation marks
1861  buffer << "\"";
1862  }
1863  else if (i->second.type() ==
1865  i->second.type() == knowledge::KnowledgeRecord::DOUBLE_ARRAY)
1866  {
1867  // arrays require brackets
1868  buffer << "]";
1869  }
1870  }
1871  else
1872  {
1873  buffer << "#read_file ('";
1874 
1875  std::string path = utility::extract_path(settings.filename);
1876 
1877  if (path == "")
1878  path = ".";
1879 
1880  path += "/";
1881  path += i->first;
1882 
1883  if (i->second.type() == knowledge::KnowledgeRecord::IMAGE_JPEG)
1884  {
1885  path += ".jpg";
1886  }
1887  else
1888  {
1889  path += ".dat";
1890  }
1891 
1893  path, (void*)&(*i->second.file_value_)[0], i->second.size());
1894  buffer << path;
1895 
1896  buffer << "')";
1897  }
1898 
1899  buffer << ";\n";
1900  }
1901  }
1902 
1903  std::string result = buffer.str();
1904 
1905  if (settings.buffer_filters.size() > 0)
1906  {
1907  char* result_copy = new char[settings.buffer_size];
1908  memcpy(result_copy, result.c_str(), result.size() + 1);
1909 
1910  int size = settings.encode(
1911  result_copy, (int)result.size(), (int)settings.buffer_size);
1912 
1913  if (size < 0)
1914  {
1916  "ThreadSafeContext::save_as_karl: "
1917  "encode() returned -1. Incorrect filter.");
1918  }
1919 
1920  file.write(result_copy, size);
1921  bytes_written = (int64_t)size;
1922  }
1923  else
1924  {
1925  file.write(result.c_str(), result.size());
1926  bytes_written = (int64_t)result.size();
1927  }
1928 
1929  file.close();
1930  }
1931  else
1932  {
1934  "ThreadSafeContext::save_as_karl:"
1935  " couldn't open karl file: %s.\n",
1936  settings.filename.c_str());
1937 
1938  return -1;
1939  }
1940 
1941  return bytes_written;
1942 }
1943 
1944 int64_t ThreadSafeContext::save_as_json(const std::string& filename) const
1945 {
1946  CheckpointSettings settings;
1947  settings.filename = filename;
1948 
1949  return save_as_json(settings);
1950 }
1951 
1953  const CheckpointSettings& settings) const
1954 {
1956  "ThreadSafeContext::save_as_json:"
1957  " opening file %s\n",
1958  settings.filename.c_str());
1959 
1960  int64_t bytes_written(0);
1961 
1962  std::stringstream buffer;
1963  std::ofstream file;
1964  file.open(settings.filename.c_str());
1965 
1966  if (file.is_open())
1967  {
1968  // lock the context
1969  MADARA_GUARD_TYPE guard(mutex_);
1970 
1971  buffer << "{\n";
1972 
1973  for (KnowledgeMap::const_iterator i = map_.begin(); i != map_.end(); ++i)
1974  {
1975  if (i->second.exists())
1976  {
1977  // check if the prefix is allowed
1978  if (settings.prefixes.size() > 0)
1979  {
1981  "ThreadSafeContext::save_as_json:"
1982  " we have %d prefixes to check against.\n",
1983  (int)settings.prefixes.size());
1984 
1985  bool prefix_found = false;
1986  for (size_t j = 0; j < settings.prefixes.size() && !prefix_found; ++j)
1987  {
1989  "ThreadSafeContext::save_as_json:"
1990  " checking record %s against prefix %s.\n",
1991  i->first.c_str(), settings.prefixes[j].c_str());
1992 
1993  if (madara::utility::begins_with(i->first, settings.prefixes[j]))
1994  {
1996  "ThreadSafeContext::save_as_json:"
1997  " the record has the correct prefix.\n");
1998 
1999  prefix_found = true;
2000  }
2001  }
2002 
2003  if (!prefix_found)
2004  {
2006  "ThreadSafeContext::save_as_json:"
2007  " the record does not have a correct prefix.\n");
2008 
2009  continue;
2010  }
2011  }
2012 
2013  buffer << " \"";
2014  buffer << i->first;
2015  buffer << "\" : ";
2016 
2017  if (!i->second.is_binary_file_type())
2018  {
2019  // record is a non binary file type
2020  if (i->second.is_string_type())
2021  {
2022  // strings require quotation marks
2023  buffer << "\"";
2024  }
2025  else if (i->second.type() ==
2027  i->second.type() == knowledge::KnowledgeRecord::DOUBLE_ARRAY)
2028  {
2029  // arrays require brackets
2030  buffer << "[";
2031  }
2032 
2033  buffer << i->second;
2034  if (i->second.is_string_type())
2035  {
2036  // strings require quotation marks
2037  buffer << "\"";
2038  }
2039  else if (i->second.type() ==
2041  i->second.type() == knowledge::KnowledgeRecord::DOUBLE_ARRAY)
2042  {
2043  // arrays require brackets
2044  buffer << "]";
2045  }
2046  }
2047  else
2048  {
2049  buffer << "#read_file ('";
2050 
2051  std::string path = utility::extract_path(settings.filename);
2052 
2053  if (path == "")
2054  path = ".";
2055 
2056  path += "/";
2057  path += i->first;
2058 
2059  if (i->second.type() == knowledge::KnowledgeRecord::IMAGE_JPEG)
2060  {
2061  path += ".jpg";
2062  }
2063  else
2064  {
2065  path += ".dat";
2066  }
2067 
2069  path, (void*)&(*i->second.file_value_)[0], i->second.size());
2070  buffer << path;
2071 
2072  buffer << "')";
2073  }
2074 
2075  KnowledgeMap::const_iterator j(i);
2076 
2077  if (++j != map_.end())
2078  buffer << ",\n";
2079  }
2080  }
2081 
2082  buffer << "\n}\n";
2083 
2084  std::string result = buffer.str();
2085  file << result;
2086 
2087  bytes_written = (int64_t)result.size();
2088 
2089  file.close();
2090  }
2091  else
2092  {
2094  "ThreadSafeContext::save_as_json:"
2095  " couldn't open json file: %s.\n",
2096  settings.filename.c_str());
2097 
2098  return -1;
2099  }
2100 
2101  return bytes_written;
2102 }
2103 
2105  std::string& id, const KnowledgeUpdateSettings& settings)
2106 {
2107  CheckpointSettings checkpoint_settings;
2108  checkpoint_settings.filename = filename;
2109  checkpoint_settings.originator = id;
2110 
2111  return load_context(checkpoint_settings, settings);
2112 }
2113 
2115  FileHeader& meta, const KnowledgeUpdateSettings& settings)
2116 {
2118  "ThreadSafeContext::load_context:"
2119  " opening file %s for just header info\n",
2120  filename.c_str());
2121 
2122  // read the initial FileHeader in
2123  FILE* file = fopen(filename.c_str(), "rb");
2124 
2125  int64_t total_read(0);
2126 
2127  if (file)
2128  {
2129  int64_t max_buffer(102800);
2130  int64_t buffer_remaining(max_buffer);
2131 
2132  utility::ScopedArray<char> buffer = new char[max_buffer];
2133  const char* current = buffer.get_ptr();
2134 
2136  "ThreadSafeContext::load_context:"
2137  " reading file meta data\n");
2138 
2139  total_read = fread(buffer.get_ptr(), 1, max_buffer, file);
2140  buffer_remaining = (int64_t)total_read;
2141 
2142  if (total_read > FileHeader::encoded_size() &&
2144  {
2145  // if there was something in the file, and it was the right header
2146 
2147  current = meta.read(current, buffer_remaining);
2148 
2149  } // end if total_read > 0
2150  else
2151  {
2153  "ThreadSafeContext::load_context:"
2154  " invalid file or wrong version. No contextual change.\n");
2155  }
2156 
2157  fclose(file);
2158  }
2159  else
2160  {
2162  "ThreadSafeContext::load_context:"
2163  " could not open file %s for reading. "
2164  "Check that file exists and that permissions are appropriate.\n",
2165  filename.c_str());
2166  }
2167 
2168  CheckpointSettings checkpoint_settings;
2169  checkpoint_settings.filename = filename;
2170 
2171  return load_context(checkpoint_settings, settings);
2172 }
2173 
2175  CheckpointSettings& checkpoint_settings,
2176  const KnowledgeUpdateSettings& update_settings)
2177 {
2179  "ThreadSafeContext::evaluate_file:"
2180  " opening file %s\n",
2181  checkpoint_settings.filename.c_str());
2182 
2183 #ifndef _MADARA_NO_KARL_
2184 
2185  CompiledExpression expression = compile(file_to_string(checkpoint_settings));
2186 
2187  return evaluate(expression, update_settings);
2188 #else // if KARL has been disabled
2189  KnowledgeRecord result;
2190  return result;
2191 #endif
2192 }
2193 
2195  CheckpointSettings& checkpoint_settings)
2196 {
2198  "ThreadSafeContext::file_to_string:"
2199  " opening file %s\n",
2200  checkpoint_settings.filename.c_str());
2201 
2202  FILE* file = fopen(checkpoint_settings.filename.c_str(), "rb");
2203 
2204  int64_t total_read(0);
2205 
2206  if (checkpoint_settings.clear_knowledge)
2207  {
2208  this->clear();
2209  }
2210 
2211  if (file)
2212  {
2213  FileHeader meta;
2214  int64_t max_buffer(checkpoint_settings.buffer_size);
2215 
2216  utility::ScopedArray<char> buffer = new char[max_buffer];
2217 
2218  total_read = fread(buffer.get_ptr(), 1, max_buffer, file);
2219 
2221  "ThreadSafeContext::file_to_string:"
2222  " reading file: %d bytes read. Decoding...\n",
2223  (int)total_read);
2224 
2225  // call decode with any buffer filters
2226  int size = checkpoint_settings.decode(
2227  buffer.get_ptr(), (int)total_read, (int)max_buffer);
2228 
2229  if (size < 0)
2230  {
2232  "ThreadSafeContext::file_to_string: "
2233  "decode () returned a negative encoding size. Bad filter/encode.");
2234  }
2235 
2237  "ThreadSafeContext::file_to_string:"
2238  " decoded %d bytes. Converting to string.\n",
2239  size);
2240 
2241  std::string script(buffer.get(), (size_t)size);
2242 
2244  "ThreadSafeContext::file_to_string:"
2245  " reading file: %d bytes read.\n",
2246  (int)total_read);
2247 
2249  "ThreadSafeContext::file_to_string:"
2250  " file_contents: %s.\n",
2251  script.c_str());
2252 
2253  fclose(file);
2254 
2255  return script;
2256  }
2257 
2258  return std::string();
2259 }
2260 
2262  const KnowledgeUpdateSettings& update_settings)
2263 {
2264  CheckpointReader reader(checkpoint_settings);
2265 
2266  reader.start();
2267 
2268  if (!reader.is_open())
2269  {
2270  return 0;
2271  }
2272 
2273  if (checkpoint_settings.clear_knowledge)
2274  {
2275  this->clear();
2276  }
2277 
2278  for (;;)
2279  {
2280  auto cur = reader.next();
2281  if (cur.first.empty())
2282  {
2283  return reader.get_total_read();
2284  }
2285 
2286  cur.second.clock = clock_;
2287  update_record_from_external(cur.first, cur.second, update_settings);
2288  }
2289 }
2290 
2291 static uint64_t update_checkpoint_header(logger::Logger* logger_,
2292  uint64_t clock_, const CheckpointSettings& settings, std::fstream& file,
2293  FileHeader& meta, transport::MessageHeader& checkpoint_header,
2294  int64_t& buffer_remaining, utility::ScopedArray<char>& buffer)
2295 {
2296  char* current = buffer.get_ptr();
2297  const char* meta_reader = current;
2298 
2299  // read the meta data at the front
2300  // fseek (file, 0, SEEK_SET);
2301  file.seekg(0, file.beg);
2302  // size_t ret = fread (current, meta.encoded_size (), 1, file);
2303 
2304  if (!file.read(buffer.get(), FileHeader::encoded_size()))
2305  {
2307  "ThreadSafeContext::save_checkpoint:"
2308  " failed to read existing file header: size=%d\n",
2309  (int)meta.encoded_size());
2310 
2312  "ThreadSafeContext::save_checkpoint:"
2313  "Checkpoint file appears to have been corrupted. Bad header.");
2314  }
2315 
2316  meta_reader = meta.read(meta_reader, buffer_remaining);
2317 
2319  "ThreadSafeContext::save_checkpoint:"
2320  " init file meta: size=%d, states=%d\n",
2321  (int)meta.size, (int)meta.states);
2322 
2323  if (settings.originator != "")
2324  {
2326  "ThreadSafeContext::save_checkpoint:"
2327  " setting file meta id to %s\n",
2328  settings.originator.c_str());
2329 
2330  utility::strncpy_safe(meta.originator, settings.originator.c_str(),
2331  sizeof(meta.originator) < settings.originator.size() + 1
2332  ? sizeof(meta.originator)
2333  : settings.originator.size() + 1);
2334  }
2335 
2336  // save the spot where the file ends
2337  uint64_t checkpoint_start = meta.size + (uint64_t)FileHeader::encoded_size();
2338 
2339  checkpoint_header.size = checkpoint_header.encoded_size();
2340 
2342  "ThreadSafeContext::save_checkpoint:"
2343  " meta.size=%d, chkpt.header.size=%d \n",
2344  (int)meta.size, (int)checkpoint_header.size);
2345 
2346  if (settings.override_timestamp)
2347  {
2348  meta.initial_timestamp = settings.initial_timestamp;
2349  meta.last_timestamp = settings.last_timestamp;
2350  }
2351 
2352  if (settings.override_lamport)
2353  {
2354  checkpoint_header.clock = settings.initial_lamport_clock;
2355  }
2356  else
2357  {
2358  checkpoint_header.clock = clock_;
2359  }
2360 
2361  return checkpoint_start;
2362 }
2363 
2364 static char* init_checkpoint_header(logger::Logger* logger_, uint64_t clock_,
2365  const CheckpointSettings& settings, FileHeader& meta,
2366  transport::MessageHeader& checkpoint_header, int64_t& buffer_remaining,
2368 {
2369  char* current = buffer.get_ptr();
2370 
2372  "ThreadSafeContext::save_checkpoint:"
2373  " creating file meta. file.meta.size=%d, state.size=%d\n",
2374  (int)meta.size, (int)checkpoint_header.encoded_size());
2375 
2376  meta.size += checkpoint_header.encoded_size();
2377  checkpoint_header.size = checkpoint_header.encoded_size();
2378 
2379  if (settings.override_timestamp)
2380  {
2381  meta.initial_timestamp = settings.initial_timestamp;
2382  meta.last_timestamp = settings.last_timestamp;
2383  }
2384 
2385  current = meta.write(current, buffer_remaining);
2386 
2387  if (settings.override_lamport)
2388  {
2389  checkpoint_header.clock = settings.initial_lamport_clock;
2390  }
2391  else
2392  {
2393  checkpoint_header.clock = clock_;
2394  }
2395 
2396  current = checkpoint_header.write(current, buffer_remaining);
2397 
2398  return current;
2399 }
2400 
2402  const std::string& name, const KnowledgeRecord* record,
2403  const CheckpointSettings& settings,
2404  transport::MessageHeader& checkpoint_header, char*& current,
2405  utility::ScopedArray<char>& buffer, int64_t& buffer_remaining)
2406 {
2407  if (record->exists())
2408  {
2409  // check if the prefix is allowed
2410  if (settings.prefixes.size() > 0)
2411  {
2413  "ThreadSafeContext::save_checkpoint:"
2414  " we have %d prefixes to check against.\n",
2415  (int)settings.prefixes.size());
2416 
2417  bool prefix_found = false;
2418  for (size_t j = 0; j < settings.prefixes.size() && !prefix_found; ++j)
2419  {
2421  "ThreadSafeContext::save_checkpoint:"
2422  " checking record %s against prefix %s.\n",
2423  name.c_str(), settings.prefixes[j].c_str());
2424 
2425  if (madara::utility::begins_with(name, settings.prefixes[j]))
2426  {
2427  prefix_found = true;
2428  }
2429  } // end for j->prefixes.size
2430 
2431  if (!prefix_found)
2432  {
2434  "ThreadSafeContext::save_checkpoint:"
2435  " record has the wrong prefix. Rejected.\n");
2436 
2437  return;
2438  }
2439  } // end if prefixes exists
2440 
2441  // get the encoded size of the record for checking buffer boundaries
2442  int64_t encoded_size = record->get_encoded_size(name);
2443 
2445  "ThreadSafeContext::save_checkpoint:"
2446  " estimated encoded size of update=%d bytes\n",
2447  (int)encoded_size);
2448 
2449  if (encoded_size > buffer_remaining)
2450  {
2452  "ThreadSafeContext::save_checkpoint: "
2453  "%d buffer size is not big enough. Partial encoded size "
2454  "already hit %d bytes. CheckpointSettings.buffer_size is "
2455  "needs to be larger.");
2456  } // end if larger than buffer remaining
2457 
2458  auto pre_write = current;
2459  current = record->write(current, name, buffer_remaining);
2460  encoded_size = current - pre_write;
2461 
2462  ++checkpoint_header.updates;
2463  checkpoint_header.size += (uint64_t)encoded_size;
2464 
2466  "ThreadSafeContext::save_checkpoint:"
2467  " chkpt.header.size=%d, current->buffer delta=%d\n",
2468  (int)checkpoint_header.size, (int)(current - buffer.get_ptr()));
2469  } // if record exists
2470 }
2471 
2472 namespace
2473 {
2474 class ContextLocalModifiedsLister : public VariablesLister
2475 {
2476 public:
2477  ContextLocalModifiedsLister(const ThreadSafeContext& context)
2478  : context_(&context)
2479  {
2480  }
2481 
2482  void start(const CheckpointSettings& settings) override
2483  {
2484  guard_.reset(new ContextGuard(*context_));
2485  map_ = context_->get_local_modified();
2486  iter_ = map_.begin();
2487 
2488  if (settings.reset_checkpoint)
2489  {
2490  clear_modifieds_ = true;
2491  }
2492  }
2493 
2494  std::pair<const char*, const KnowledgeRecord*> next() override
2495  {
2496  std::pair<const char*, const KnowledgeRecord*> ret{nullptr, nullptr};
2497 
2498  if (iter_ == map_.end())
2499  {
2500  guard_.reset();
2501  if (clear_modifieds_)
2502  {
2503  context_->reset_checkpoint();
2504  }
2505  return ret;
2506  }
2507 
2508  ret.first = iter_->first;
2509  ret.second = iter_->second.get_record_unsafe();
2510 
2511  ++iter_;
2512 
2513  return ret;
2514  }
2515 
2516 private:
2517  const ThreadSafeContext* context_;
2518  std::unique_ptr<ContextGuard> guard_;
2520  VariableReferenceMap::iterator iter_;
2521  bool clear_modifieds_ = false;
2522 };
2523 }
2524 
2525 static void checkpoint_write_records(const ThreadSafeContext& context,
2526  logger::Logger* logger_, const CheckpointSettings& settings,
2527  transport::MessageHeader& checkpoint_header, char*& current,
2528  utility::ScopedArray<char>& buffer, int64_t& buffer_remaining)
2529 {
2530  ContextLocalModifiedsLister default_lister(context);
2531 
2532  VariablesLister* lister = settings.variables_lister;
2533 
2534  if (!lister)
2535  {
2536  lister = &default_lister;
2537  }
2538 
2539  lister->start(settings);
2540  for (;;)
2541  {
2542  auto e = lister->next();
2543  if (e.second == nullptr)
2544  {
2545  break;
2546  }
2547 
2548  auto record = e.second;
2549 
2550  checkpoint_write_record(logger_, e.first, record, settings,
2551  checkpoint_header, current, buffer, buffer_remaining);
2552  }
2553 }
2554 
2556  logger::Logger* logger_, uint64_t clock_,
2557  const CheckpointSettings& settings, std::fstream& file, FileHeader& meta,
2558  transport::MessageHeader& checkpoint_header)
2559 {
2560  int64_t total_written(0);
2561 
2562  int64_t max_buffer(settings.buffer_size);
2563  int64_t buffer_remaining(max_buffer);
2564  utility::ScopedArray<char> buffer = new char[max_buffer];
2565 
2566  uint64_t checkpoint_start = update_checkpoint_header(logger_, clock_,
2567  settings, file, meta, checkpoint_header, buffer_remaining, buffer);
2568 
2569  if (settings.variables_lister != nullptr ||
2570  context.get_local_modified().size() != 0)
2571  {
2572  // skip over the checkpoint header. We'll write this later with the records
2573 
2575  "ThreadSafeContext::save_checkpoint:"
2576  " fseek set to %d\n",
2577  (int)(checkpoint_start));
2578 
2579  // set the file pointer to the checkpoint header start
2580  // fseek (file, (long)checkpoint_start, SEEK_SET);
2581  file.seekp(checkpoint_start);
2582 
2583  // start updates just past the checkpoint header's buffer location
2584  char* current = checkpoint_header.write(buffer.get_ptr(), buffer_remaining);
2585 
2587  "ThreadSafeContext::save_checkpoint:"
2588  " chkpt.header.size=%d, current->buffer delta=%d\n",
2589  (int)checkpoint_header.encoded_size(),
2590  (int)(current - buffer.get_ptr()));
2591 
2592  checkpoint_write_records(context, logger_, settings, checkpoint_header,
2593  current, buffer, buffer_remaining);
2594 
2595  ++meta.states;
2596 
2598  "ThreadSafeContext::save_checkpoint:"
2599  " writing final data to checkpoint for state #%d\n",
2600  (int)meta.states);
2601 
2603  "ThreadSafeContext::save_checkpoint:"
2604  " chkpt.header: size=%d, updates=%d\n",
2605  (int)checkpoint_header.size, (int)checkpoint_header.updates);
2606 
2607  int total_encoded = 0;
2608 
2609  if (buffer_remaining != max_buffer)
2610  {
2611  total_written = (int64_t)(current - buffer.get_ptr());
2612 
2614  "ThreadSafeContext::save_checkpoint:"
2615  " encoding %d bytes in checkpoint\n",
2616  (int)total_written);
2617 
2618  current = checkpoint_header.write(buffer.get_ptr(), buffer_remaining);
2619 
2620  // call decode with any buffer filters
2621  total_encoded = settings.encode(
2622  buffer.get_ptr(), (int)total_written, (int)max_buffer);
2623 
2624  if (total_encoded < 0)
2625  {
2627  "ThreadSafeContext::save_context: "
2628  "encode () returned a negative encoding size. Bad filter/encode.");
2629  }
2630 
2632  "ThreadSafeContext::save_checkpoint:"
2633  " encoded %d bytes in buffer. Writing to disk at offset %d bytes\n",
2634  (int)total_encoded, (int)checkpoint_start);
2635 
2636  // if (settings.buffer_filters.size () > 0)
2637  // {
2638  // total_encoded += (int)filters::BufferFilterHeader::encoded_size ();
2639  // }
2640 
2641  file.write(buffer.get_ptr(), total_encoded);
2642  // fwrite (buffer.get_ptr (), (size_t)(total_encoded), 1, file);
2643 
2644  meta.size += (uint64_t)total_encoded;
2645 
2647  "ThreadSafeContext::save_checkpoint:"
2648  " meta.size updated to %d bytes\n",
2649  total_encoded, (int)meta.size);
2650  }
2651 
2652  buffer_remaining = max_buffer;
2653  // fseek (file, (long)checkpoint_start, SEEK_SET);
2654  file.seekp(0, file.beg);
2655 
2657  "ThreadSafeContext::save_checkpoint:"
2658  " new file meta: size=%d, states=%d, "
2659  " lastchkpt.start=%d, lastchkpt.size=%d, encoded=%d\n",
2660  (int)meta.size, (int)meta.states, (int)checkpoint_start,
2661  (int)checkpoint_header.size, (int)total_encoded);
2662 
2663  // update the meta data at the front
2664  // fseek (file, 0, SEEK_SET);
2665 
2667  "ThreadSafeContext::save_checkpoint:"
2668  " updating file meta data at beginning in the file\n");
2669  buffer_remaining = max_buffer;
2670  current = meta.write(buffer.get_ptr(), buffer_remaining);
2671 
2672  // fwrite (buffer.get_ptr (), current - buffer.get_ptr (), 1, file);
2673  file.write(buffer.get(), FileHeader::encoded_size());
2674 
2675  } // if there are local checkpointing records
2676 
2677  // fclose (file);
2678  file.close();
2679 }
2680 
2681 static void checkpoint_do_initial(const ThreadSafeContext& context,
2682  logger::Logger* logger_, uint64_t clock_,
2683  const CheckpointSettings& settings, std::fstream& file, FileHeader& meta,
2684  transport::MessageHeader& checkpoint_header)
2685 {
2686  int file_header_size = (int)FileHeader::encoded_size();
2687 
2688  int64_t max_buffer(settings.buffer_size);
2689  int64_t buffer_remaining(max_buffer);
2690  utility::ScopedArray<char> buffer = new char[max_buffer];
2691 
2692  char* current = init_checkpoint_header(logger_, clock_, settings, meta,
2693  checkpoint_header, buffer_remaining, buffer);
2694 
2696  "ThreadSafeContext::save_checkpoint:"
2697  " writing diff records\n");
2698 
2699  checkpoint_write_records(context, logger_, settings, checkpoint_header,
2700  current, buffer, buffer_remaining);
2701 
2702  char* final_position = current;
2703  int full_buffer = final_position - buffer.get_ptr();
2704 
2706  "ThreadSafeContext::save_checkpoint:"
2707  " final_position indicates a buffer of %d bytes,"
2708  " encode buffer is %d bytes\n",
2709  full_buffer, full_buffer - file_header_size);
2710 
2711  // write the final sizes for the checkpoint at [108]
2712  current =
2713  checkpoint_header.write(buffer.get_ptr() + file_header_size, max_buffer);
2714 
2715  // call decode with any buffer filters on [108]
2716  int total =
2717  settings.encode(buffer.get_ptr() + (int)FileHeader::encoded_size(),
2718  (int)(full_buffer - file_header_size), (int)max_buffer);
2719 
2720  if (total < 0)
2721  {
2723  "ThreadSafeContext::save_checkpoint: "
2724  "encode () returned a negative encoding size. Bad filter/encode.");
2725  }
2726 
2727  meta.size = (uint64_t)total;
2728 
2729  current = meta.write(buffer.get_ptr(), max_buffer);
2730 
2731  // update the meta data at the front
2732  // fseek (file, 0, SEEK_SET);
2733  file.seekp(0, file.beg);
2734 
2736  "ThreadSafeContext::save_checkpoint:"
2737  " encoding with buffer filters: meta.size=%d, chkpt.size=%d, "
2738  "encoded=%d, chkpt.offset=%d.\n",
2739  (int)meta.size, (int)checkpoint_header.size, (int)total,
2740  file_header_size);
2741 
2742  // fwrite (buffer.get_ptr (),
2743  // (size_t)total + (size_t)FileHeader::encoded_size (), 1, file);
2744  file.write(buffer.get(), (size_t)total + (size_t)file_header_size);
2745 
2747  "ThreadSafeContext::save_checkpoint:"
2748  " wrote: %d bytes to file from beginning.\n",
2749  (int)total + (int)FileHeader::encoded_size());
2750 
2751  // fclose (file);
2752  file.close();
2753 }
2754 
2756  const CheckpointSettings& settings) const
2757 {
2759  "ThreadSafeContext::save_checkpoint:"
2760  " opening file %s\n",
2761  settings.filename.c_str());
2762 
2763  // FILE * file = fopen (settings.filename.c_str (), "rb+");
2764  std::fstream file(
2765  settings.filename, std::ios::in | std::ios::out | std::ios::binary);
2766 
2767  FileHeader meta;
2768  transport::MessageHeader checkpoint_header;
2769  // int chkpt_header_size = (int)checkpoint_header.encoded_size ();
2770 
2771  if (file)
2772  {
2774  *this, logger_, clock_, settings, file, meta, checkpoint_header);
2775  } // if file is opened
2776  else
2777  {
2779  "ThreadSafeContext::save_checkpoint:"
2780  " checkpoint doesn't exist. Creating.\n");
2781 
2782  file.open(settings.filename, std::ios::out | std::ios::binary);
2783 
2784  meta.states = 1;
2785  utility::strncpy_safe(meta.originator, settings.originator.c_str(),
2786  sizeof(meta.originator) < settings.originator.size() + 1
2787  ? sizeof(meta.originator)
2788  : settings.originator.size() + 1);
2789 
2790  // if the new file creation for wb was unsuccessful
2791  if (!file)
2792  {
2794  "ThreadSafeContext::save_checkpoint:"
2795  " couldn't create checkpoint file: %s.\n",
2796  settings.filename.c_str());
2797 
2798  return -1;
2799  }
2800 
2802  *this, logger_, clock_, settings, file, meta, checkpoint_header);
2803  } // end if we need to create a new file
2804 
2805  return checkpoint_header.size;
2806 }
2807 
2809  const std::string& filename, const std::string& id) const
2810 {
2811  CheckpointSettings settings;
2812  settings.filename = filename;
2813  settings.originator = id;
2814 
2815  return save_checkpoint(settings);
2816 }
2817 }
2818 }
#define madara_logger_checked_ptr_log(loggering, level,...)
Fast version of the madara::logger::log method for Logger pointers.
Definition: Logger.h:62
bool clear_modifieds_
VariableReferenceMap map_
std::unique_ptr< ContextGuard > guard_
const ThreadSafeContext * context_
VariableReferenceMap::iterator iter_
madara::knowledge::KnowledgeRecord KnowledgeRecord
An exception for attempting to access an invalid context1.
An exception for general memory errors like out-of-memory.
An abstract base class defines a simple abstract implementation of an expression tree node.
Definition: ComponentNode.h:37
virtual madara::knowledge::KnowledgeRecord evaluate(const madara::knowledge::KnowledgeUpdateSettings &settings)=0
Evaluates the expression tree.
madara::knowledge::KnowledgeRecord evaluate(const madara::knowledge::KnowledgeUpdateSettings &settings=knowledge::KnowledgeUpdateSettings())
Evaluates the expression tree.
ExpressionTree interpret(madara::knowledge::ThreadSafeContext &context, const std::string &input)
Compiles an expression into an expression tree.
Class for iterating binary checkpoint files.
void start()
Begin by reading any header information.
std::pair< std::string, KnowledgeRecord > next()
Get the next update from the checkpoint file.
bool is_open() const
Check if underlying file is open.
int64_t get_total_read() const
Get total number of bytes read so far during iteration.
Holds settings for checkpoints to load or save.
std::string originator
the originator id of the checkpoint
bool override_timestamp
use the timestamps in this class instead of current wallclock time when writing context or checkpoint...
size_t buffer_size
the size of the buffer needed for the checkpoint
std::vector< std::string > prefixes
A list of prefixes to save/load.
bool clear_knowledge
If true, during loads, clear the KnowledgeBase first.
uint64_t initial_lamport_clock
initial lamport clock saved in the checkpoint
VariablesLister * variables_lister
Object which will be used to extract variables for checkpoint saving.
int decode(char *source, int size, int max_size) const
Calls decode on the the buffer filter chain.
uint64_t last_timestamp
final wallclock time saved in the checkpoint
int encode(char *source, int size, int max_size) const
Calls encode on the the buffer filter chain.
uint64_t initial_timestamp
initial wallclock time saved in the checkpoint
filters::BufferFilters buffer_filters
buffer filters.
bool override_lamport
use the lamport clocks in this class instead of KB clock when writing context or checkpoints
Compiled, optimized KaRL logic.
std::string logic
the logic that was compiled
madara::expression::ExpressionTree expression
the expression tree
Defines a file header which is the default for KaRL checkpointing.
Definition: FileHeader.h:37
static uint32_t encoded_size(void)
Returns the size of the encoded FileHeader class, which may be different from sizeof (FileHeader) bec...
Definition: FileHeader.cpp:26
uint64_t initial_timestamp
the timestamp for the initial checkpointing
Definition: FileHeader.h:107
virtual char * write(char *buffer, int64_t &buffer_remaining)
Writes a FileHeader instance to a buffer and updates the amount of buffer room remaining.
Definition: FileHeader.cpp:164
char originator[64]
the originator of the message (host:port)
Definition: FileHeader.h:127
uint64_t size
the size of this header plus the updates
Definition: FileHeader.h:97
static bool file_header_test(const char *buffer)
Tests the buffer for a normal message identifier.
Definition: FileHeader.h:89
uint64_t last_timestamp
the timestamp for the last checkpoint
Definition: FileHeader.h:112
uint64_t states
the number of states checkpointed in the file stream
Definition: FileHeader.h:102
virtual const char * read(const char *buffer, int64_t &buffer_remaining)
Reads a FileHeader instance from a buffer and updates the amount of buffer room remaining.
Definition: FileHeader.cpp:31
This class stores a function definition.
Definition: Functions.h:46
This class encapsulates an entry in a KnowledgeBase.
void set_xml(const char *new_value, size_t size)
sets the value to an xml string
void set_full(const KnowledgeRecord &new_value)
Sets the value and meta data from another KnowledgeRecord.
char * write(char *buffer, int64_t &buffer_remaining) const
Writes a KnowledgeRecord instance to a buffer and updates the amount of buffer room remaining.
uint64_t clock
last modification lamport clock time
void set_value(const KnowledgeRecord &new_value)
Sets the value from another KnowledgeRecord, does not copy toi, clock, and write_quality.
void set_jpeg(const unsigned char *new_value, size_t size)
sets the value to a jpeg
bool exists(void) const
Checks if record exists (i.e., is not uncreated)
void set_text(const char *new_value, size_t size)
sets the value to a plaintext string
int read_file(const std::string &filename, uint32_t read_as_type=0)
reads a file and sets the type appropriately according to the extension
uint32_t quality
priority of the update
int64_t get_encoded_size(const std::string &key) const
Returns the encoded size of the record.
void set_file(const unsigned char *new_value, size_t size)
sets the value to an unknown file type
Settings for applying knowledge updates.
bool expand_variables
Toggle for always attempting to expand variables (true) or never expanding variables (false)
Settings for applying knowledge updates.
bool always_overwrite
Toggle for always overwriting records, regardless of quality, clock values, etc.
This class stores variables and their values for use by any entity needing state information in a thr...
CompiledExpression compile(const std::string &expression)
Compiles a KaRL expression into an expression tree.
int64_t save_as_json(const std::string &filename) const
Saves the context to a file as JSON.
uint32_t get_quality(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Atomically gets quality of a variable.
std::string file_to_string(CheckpointSettings &checkpoint_settings)
Loads and returns a karl script from a file with encode/decode.
bool clear(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Clears a variable.
void to_string(std::string &target, const std::string &array_delimiter=",", const std::string &record_delimiter=";\n", const std::string &key_val_delimiter="=") const
Saves all keys and values into a string, using the underlying knowledge::KnowledgeRecord::to_string f...
int read_file(const std::string &key, const std::string &filename, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically reads a file into a variable.
madara::knowledge::KnowledgeRecord get(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings()) const
Atomically returns the current value of a variable.
madara::expression::Interpreter * interpreter_
KaRL interpreter.
KnowledgeRecord evaluate_file(CheckpointSettings &checkpoint_settings, const KnowledgeUpdateSettings &update_settings=KnowledgeUpdateSettings(true, true, true, false))
Loads and evaluates a karl script from a file.
logger::Logger * logger_
Logger for printing.
void delete_prefix(const std::string &prefix, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Deletes keys starting with the given prefix.
void define_function(const std::string &name, knowledge::KnowledgeRecord(*func)(FunctionArguments &, Variables &), const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Defines an external function.
void mark_modified(const VariableReference &variable, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Marks the variable reference as updated for the purposes of sending or checkpointing knowledge (for g...
const VariableReferenceMap & get_local_modified(void) const
Retrieves a list of modified local variables.
int update_record_from_external(const std::string &key, const knowledge::KnowledgeRecord &rhs, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings(true))
Atomically sets if the variable value meets update conditions.
knowledge::KnowledgeRecord evaluate(CompiledExpression expression, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Evaluate a compiled expression.
int set_text(const std::string &key, const char *value, size_t size, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets the value of a variable to an XML string.
void get_matches(const std::string &prefix, const std::string &suffix, VariableReferences &matches)
Creates an iteration of VariableReferences to all keys matching the prefix and suffix.
FunctionMap functions_
map of function names to functions
madara::knowledge::KnowledgeMap map_
Hash table containing variable names and values.
int set_xml(const std::string &key, const char *value, size_t size, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets the value of a variable to an XML string.
knowledge::KnowledgeMap to_map_stripped(const std::string &prefix) const
Creates a map with Knowledge Records that begin with the given prefix.
void set_changed(void)
Force a change to be registered, waking up anyone waiting on entry.
int set_jpeg(const std::string &key, const unsigned char *value, size_t size, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets the value of a variable to a JPEG image.
uint32_t get_write_quality(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Atomically gets write quality of this process for a variable.
int64_t save_checkpoint(const std::string &filename, const std::string &id="") const
Saves a checkpoint of a list of changes to a file.
VariableReference get_ref(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Atomically returns a reference to the variable.
int set_file(const std::string &key, const unsigned char *value, size_t size, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets the value of a variable to an arbitrary string.
int64_t save_context(const std::string &filename, const std::string &id="") const
Saves the context to a file.
void mark_and_signal(VariableReference ref, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
method for marking a record modified and signaling changes
KnowledgeRecord * get_record(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Retrieves a knowledge record from the key.
std::pair< KnowledgeMap::const_iterator, KnowledgeMap::const_iterator > get_prefix_range(const std::string &prefix) const
size_t to_map(const std::string &subject, std::map< std::string, knowledge::KnowledgeRecord > &target)
Fills a variable map with Knowledge Records that match an expression.
Function * retrieve_function(const std::string &name, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Retrieves an external function.
std::string expand_statement(const std::string &statement) const
Expands a string with variable expansion.
int64_t load_context(const std::string &filename, std::string &id, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings(true, true, true, false))
Loads the context from a file.
size_t to_vector(const std::string &subject, unsigned int start, unsigned int end, std::vector< KnowledgeRecord > &target)
Fills a vector with Knowledge Records that begin with a common subject and have a finite range of int...
int set_if_unequal(const std::string &key, madara::knowledge::KnowledgeRecord::Integer value, uint32_t quality, uint64_t clock, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets if the variable value will be different.
void copy(const ThreadSafeContext &source, const KnowledgeRequirements &reqs, const KnowledgeUpdateSettings &settings)
Copies variables and values from source to this context.
void print(unsigned int level) const
Atomically prints all variables and values in the context.
std::vector< std::string > expansion_splitters_
uint32_t set_quality(const std::string &key, uint32_t quality, bool force_update, const KnowledgeReferenceSettings &settings)
Atomically sets quality of this process for a variable.
void set_write_quality(const std::string &key, uint32_t quality, const KnowledgeReferenceSettings &settings)
Atomically sets write quality of this process for a variable.
int64_t save_as_karl(const std::string &filename) const
Saves the context to a file as karl assignments, rather than binary.
Optimized reference to a variable within the knowledge base.
bool is_valid(void) const
Checks to see if the variable reference has been initialized.
KnowledgeRecord * get_record_unsafe(void) const
Returns a pointer to the variable's KnowledgeRecord Do not use this pointer unless you've locked the ...
KnowledgeMap::value_type * pair_ptr
virtual std::pair< const char *, const KnowledgeRecord * > next()=0
virtual void start(const CheckpointSettings &settings)=0
Provides an interface for external functions into the MADARA KaRL variable settings.
Definition: Variables.h:53
A multi-threaded logger for logging to one or more destinations.
Definition: Logger.h:165
Defines a robust message header which is the default for KaRL messages.
Definition: MessageHeader.h:57
uint32_t updates
the number of knowledge variable updates in the message
uint64_t clock
the clock of the sender when the message was generated
uint64_t size
the size of this header plus the updates
virtual uint32_t encoded_size(void) const
Returns the size of the encoded MessageHeader class, which may be different from sizeof (MessageHeade...
virtual char * write(char *buffer, int64_t &buffer_remaining)
Writes a MessageHeader instance to a buffer and updates the amount of buffer room remaining.
T * get_ptr(void)
get the underlying pointer
Definition: ScopedArray.inl:64
T * get(void)
get the underlying pointer
Definition: ScopedArray.inl:78
constexpr string_t string
constexpr binary_t binary
Provides functions and classes for the distributed knowledge base.
std::map< std::string, bool > CopySet
Typedef for set of copyable keys.
T get(const KnowledgeRecord &kr)
Get the value of a KnowlegeRecord.
Definition: GetRecord.h:121
static void checkpoint_do_incremental(const ThreadSafeContext &context, logger::Logger *logger_, uint64_t clock_, const CheckpointSettings &settings, std::fstream &file, FileHeader &meta, transport::MessageHeader &checkpoint_header)
std::map< const char *, VariableReference, utility::ComparisonLessThan > VariableReferenceMap
a map of variable references
static char * init_checkpoint_header(logger::Logger *logger_, uint64_t clock_, const CheckpointSettings &settings, FileHeader &meta, transport::MessageHeader &checkpoint_header, int64_t &buffer_remaining, utility::ScopedArray< char > &buffer)
static void checkpoint_write_records(const ThreadSafeContext &context, logger::Logger *logger_, const CheckpointSettings &settings, transport::MessageHeader &checkpoint_header, char *&current, utility::ScopedArray< char > &buffer, int64_t &buffer_remaining)
static uint64_t update_checkpoint_header(logger::Logger *logger_, uint64_t clock_, const CheckpointSettings &settings, std::fstream &file, FileHeader &meta, transport::MessageHeader &checkpoint_header, int64_t &buffer_remaining, utility::ScopedArray< char > &buffer)
static void checkpoint_write_record(logger::Logger *logger_, const std::string &name, const KnowledgeRecord *record, const CheckpointSettings &settings, transport::MessageHeader &checkpoint_header, char *&current, utility::ScopedArray< char > &buffer, int64_t &buffer_remaining)
static void checkpoint_do_initial(const ThreadSafeContext &context, logger::Logger *logger_, uint64_t clock_, const CheckpointSettings &settings, std::fstream &file, FileHeader &meta, transport::MessageHeader &checkpoint_header)
std::vector< KnowledgeRecord > FunctionArguments
::std::map< std::string, KnowledgeRecord > KnowledgeMap
std::vector< VariableReference > VariableReferences
a vector of variable references
Provides knowledge logging services to files and terminals.
Definition: GlobalLogger.h:12
MADARA_EXPORT utility::Refcounter< logger::Logger > global_logger
ssize_t write_file(const std::string &filename, void *buffer, size_t size)
Writes a file with provided contents.
Definition: Utility.cpp:487
MADARA_EXPORT bool ends_with(const std::string &input, const std::string &ending)
Check if input contains a pattern at the end.
Definition: Utility.inl:650
int64_t get_time(void)
Returns a time of day in nanoseconds If simtime feature is enabled, this may be simulation time inste...
Definition: Utility.inl:265
MADARA_EXPORT void strncpy_safe(char *dst, const char *src, size_t dst_size)
Performs a strncpy in a way that will compile without warnings.
Definition: Utility.cpp:376
MADARA_EXPORT std::string extract_path(const std::string &name)
Extracts the path of a filename.
Definition: Utility.inl:374
MADARA_EXPORT bool begins_with(const std::string &input, const std::string &prefix)
Check if input contains prefix at the beginning.
Definition: Utility.inl:638
Copyright(c) 2020 Galois.
Holds settings requirements for knowledge, usually in copying.
bool clear_knowledge
If true, during loads, clear the KnowledgeBase first.
std::vector< MatchPredicate > predicates
A vector of acceptable predicates to match (prefix and suffix).