MADARA  3.2.3
ThreadSafeContext.cpp
Go to the documentation of this file.
1 #include <iostream>
2 #include <fstream>
3 #include <sstream>
4 #include <iterator>
5 #include <memory>
6 
7 #include <string.h>
8 
10 
12 
15 #include <stdio.h>
16 #include <time.h>
19 
20 namespace madara { namespace knowledge {
21 
22 // constructor
24  :
25 #ifdef MADARA_CONDITION_MUTEX_CONSTRUCTOR
26  changed_ (mutex_),
27 #endif
28  clock_ (0)
29 #ifndef _MADARA_NO_KARL_
30 ,
31  interpreter_ (new madara::expression::Interpreter ())
32 #endif // _MADARA_NO_KARL_
33  , logger_ (logger::global_logger.get ())
34 {
35  expansion_splitters_.push_back ("{");
36  expansion_splitters_.push_back ("}");
37 }
38 
39 // destructor
41 {
42 #ifndef _MADARA_NO_KARL_
43  delete interpreter_;
44 #endif // _MADARA_NO_KARL_
45 
46 }
47 
55  const std::string & key,
56  const KnowledgeReferenceSettings & settings)
57 {
58  std::string key_actual;
59  const std::string * key_ptr;
60  MADARA_GUARD_TYPE guard (mutex_);
61 
62  if (settings.expand_variables)
63  {
64  key_actual = expand_statement (key);
65  key_ptr = &key_actual;
66  }
67  else
68  key_ptr = &key;
69 
70  if (*key_ptr == "")
71  return 0;
72 
73  // if the variable doesn't exist, hash maps create a record automatically
74  // when used in this manner
75  return &map_[*key_ptr];
76 }
77 
78 
81  const std::string & key,
82  const KnowledgeReferenceSettings & settings)
83 {
84  std::string key_actual;
85  const std::string * key_ptr;
86  MADARA_GUARD_TYPE guard (mutex_);
87 
88  // expand the key if the user asked for it
89  if (settings.expand_variables)
90  {
91  key_actual = expand_statement (key);
92  key_ptr = &key_actual;
93  }
94  else
95  key_ptr = &key;
96 
97  if (*key_ptr == "") {
98  return {};
99  }
100 
101  auto iter = map_.lower_bound(*key_ptr);
102  if (iter == map_.end() || iter->first != *key_ptr) {
103  iter = map_.emplace_hint(iter, std::piecewise_construct,
104  std::forward_as_tuple(*key_ptr), std::forward_as_tuple());
105  }
106 
107  return &*iter;
108 }
109 
110 
113  const std::string & key,
114  const KnowledgeReferenceSettings & settings) const
115 {
116  std::string key_actual;
117  const std::string * key_ptr;
118  MADARA_GUARD_TYPE guard (mutex_);
119 
120  VariableReference record;
121 
122  // expand the key if the user asked for it
123  if (settings.expand_variables)
124  {
125  key_actual = expand_statement (key);
126  key_ptr = &key_actual;
127  }
128  else
129  key_ptr = &key;
130 
131  if (*key_ptr == "") {
132  return {};
133  }
134 
135  KnowledgeMap::const_iterator found = map_.find (*key_ptr);
136  return {const_cast<VariableReference::pair_ptr>(&*found)};
137 }
138 
139 // set the value of a variable
140 int
142  const VariableReference & variable,
143  const char * value, size_t size,
144  const KnowledgeUpdateSettings & settings)
145 {
146 
147  MADARA_GUARD_TYPE guard (mutex_);
148  auto record = variable.get_record_unsafe();
149 
150  if (record)
151  {
152  // check if we have the appropriate write quality
153  if (!settings.always_overwrite &&
154  record->write_quality < record->quality)
155  return -2;
156 
157  record->set_xml (value, size);
158  record->quality = record->write_quality;
159 
160  mark_and_signal (variable, settings);
161  }
162  else
163  return -1;
164 
165  return 0;
166 }
167 
168 // set the value of a variable
169 int
171  const VariableReference & variable,
172  const char * value, size_t size,
173  const KnowledgeUpdateSettings & settings)
174 {
175  MADARA_GUARD_TYPE guard (mutex_);
176  auto record = variable.get_record_unsafe();
177 
178  if (record)
179  {
180  // check if we have the appropriate write quality
181  if (!settings.always_overwrite &&
182  record->write_quality < record->quality)
183  return -2;
184 
185  record->set_text (value, size);
186  record->quality = record->write_quality;
187 
188  mark_and_signal (variable, settings);
189  }
190  else
191  return -1;
192 
193  return 0;
194 }
195 
196 // set the value of a variable
197 int
199  const VariableReference & variable,
200  const unsigned char * value, size_t size,
201  const KnowledgeUpdateSettings & settings)
202 {
203  MADARA_GUARD_TYPE guard (mutex_);
204  auto record = variable.get_record_unsafe();
205 
206  if (record)
207  {
208  // check if we have the appropriate write quality
209  if (!settings.always_overwrite &&
210  record->write_quality < record->quality)
211  return -2;
212 
213  record->set_jpeg (value, size);
214  record->quality = record->write_quality;
215 
216  mark_and_signal (variable, settings);
217  }
218  else
219  return -1;
220 
221  return 0;
222 }
223 
224 // set the value of a variable
225 int
227  const VariableReference & variable,
228  const unsigned char * value, size_t size,
229  const KnowledgeUpdateSettings & settings)
230 {
231  MADARA_GUARD_TYPE guard (mutex_);
232  auto record = variable.get_record_unsafe();
233 
234  if (record)
235  {
236  // check if we have the appropriate write quality
237  if (!settings.always_overwrite &&
238  record->write_quality < record->quality)
239  return -2;
240 
241  record->set_file (value, size);
242  record->quality = record->write_quality;
243 
244  mark_and_signal (variable, settings);
245  }
246  else
247  return -1;
248 
249  return 0;
250 }
251 
252 // set the value of a variable
253 int
255  const VariableReference & variable,
256  const std::string & filename,
257  const KnowledgeUpdateSettings & settings)
258 {
259  int return_value = 0;
260  MADARA_GUARD_TYPE guard (mutex_);
261  auto record = variable.get_record_unsafe();
262 
263  if (record)
264  {
265  // check if we have the appropriate write quality
266  if (!settings.always_overwrite &&
267  record->write_quality < record->quality)
268  return -2;
269 
270  return_value = record->read_file (filename);
271  record->quality = record->write_quality;
272 
273  mark_and_signal (variable, settings);
274  }
275  else
276  return return_value = -1;
277 
278  return return_value;
279 }
280 
283 uint32_t
285  const std::string & key,
286  const KnowledgeReferenceSettings & settings)
287 {
288  // enter the mutex
289  std::string key_actual;
290  const std::string * key_ptr;
291  MADARA_GUARD_TYPE guard (mutex_);
292 
293  if (settings.expand_variables)
294  {
295  key_actual = expand_statement (key);
296  key_ptr = &key_actual;
297  }
298  else
299  key_ptr = &key;
300 
301  // find the key in the knowledge base
302  KnowledgeMap::iterator found = map_.find (*key_ptr);
303 
304  // create the variable if it has never been written to before
305  // and update its current value quality to the quality parameter
306 
307  if (found != map_.end ())
308  return map_[*key_ptr].quality;
309 
310  // default quality is 0
311  return 0;
312 }
313 
316 uint32_t
318  const std::string & key,
319  const KnowledgeReferenceSettings & settings)
320 {
321  // enter the mutex
322  std::string key_actual;
323  const std::string * key_ptr;
324  MADARA_GUARD_TYPE guard (mutex_);
325 
326  if (settings.expand_variables)
327  {
328  key_actual = expand_statement (key);
329  key_ptr = &key_actual;
330  }
331  else
332  key_ptr = &key;
333 
334  // find the key in the knowledge base
335  KnowledgeMap::iterator found = map_.find (*key_ptr);
336 
337  // create the variable if it has never been written to before
338  // and update its current value quality to the quality parameter
339 
340  if (found != map_.end ())
341  return map_[*key_ptr].write_quality;
342 
343  // default quality is 0
344  return 0;
345 }
346 
349 uint32_t
351  const std::string & key, uint32_t quality,
352  bool force_update,
353  const KnowledgeReferenceSettings & settings)
354 {
355  // enter the mutex
356  std::string key_actual;
357  const std::string * key_ptr;
358  MADARA_GUARD_TYPE guard (mutex_);
359 
360  if (settings.expand_variables)
361  {
362  key_actual = expand_statement (key);
363  key_ptr = &key_actual;
364  }
365  else
366  key_ptr = &key;
367 
368  // check for null key
369  if (*key_ptr == "")
370  return 0;
371 
372  // find the key in the knowledge base
373  KnowledgeMap::iterator found = map_.find (*key_ptr);
374 
375  // create the variable if it has never been written to before
376  // and update its current value quality to the quality parameter
377 
378  if (found == map_.end () || force_update || quality > found->second.quality)
379  map_[*key_ptr].quality = quality;
380 
381  // return current quality
382  return map_[*key_ptr].quality;
383 }
384 
386 void
388  const std::string & key, uint32_t quality,
389  const KnowledgeReferenceSettings & settings)
390 {
391  // enter the mutex
392  std::string key_actual;
393  const std::string * key_ptr;
394  MADARA_GUARD_TYPE guard (mutex_);
395 
396  if (settings.expand_variables)
397  {
398  key_actual = expand_statement (key);
399  key_ptr = &key_actual;
400  }
401  else
402  key_ptr = &key;
403 
404  // create the variable if it has never been written to before
405  // and update its local process write quality to the quality parameter
406  map_[*key_ptr].write_quality = quality;
407 }
408 
413 int
415  const std::string & key, KnowledgeRecord::Integer value,
416  uint32_t quality, uint64_t clock,
417  const KnowledgeUpdateSettings & settings)
418 {
419  int result = 1;
420 
421  // enter the mutex
422  std::string key_actual;
423  const std::string * key_ptr;
424  MADARA_GUARD_TYPE guard (mutex_);
425 
426  if (settings.expand_variables)
427  {
428  key_actual = expand_statement (key);
429  key_ptr = &key_actual;
430  }
431  else
432  key_ptr = &key;
433 
434  // check for null key
435  if (*key_ptr == "")
436  return -1;
437 
438  // find the key in the knowledge base
439  KnowledgeMap::iterator found = map_.find (*key_ptr);
440 
441  // if it's found, then compare the value
442  if (!settings.always_overwrite && found != map_.end ())
443  {
444  // setup a rhs
445  KnowledgeRecord rhs;
446  rhs.set_value (value);
447 
448  // if we do not have enough quality to update the variable
449  // return -2
450  if (quality < found->second.quality)
451  result = -2;
452 
453  // if we have the same quality, but our clock value
454  // is less than what we've already seen, then return -3
455  else if (quality == found->second.quality &&
456  clock < found->second.clock)
457  result = -3;
458 
459  // check for value already set
460  else if (found->second == rhs)
461  result = 0;
462  } else {
463  auto ret = map_.emplace(std::piecewise_construct,
464  std::forward_as_tuple(*key_ptr),
465  std::make_tuple());
466  found = ret.first;
467  }
468 
469  KnowledgeRecord & record = found->second;
470 
471  // if we need to update quality, then update it
472  if (result != -2 && record.quality != quality)
473  record.quality = quality;
474 
475  // if we need to update the variable clock, then update it
476  if (clock > record.clock)
477  record.clock = clock;
478 
479  // if we need to update the global clock, then update it
480  if (clock > this->clock_)
481  this->clock_ = clock;
482 
483  if (result == 1)
484  {
485  // we have a situation where the value needs to be changed
486  record.set_value (value);
487 
488  mark_and_signal (&*found, settings);
489  }
490 
491  // value was changed
492  return result;
493 }
494 
499 int
501  const std::string & key, double value,
502  uint32_t quality, uint64_t clock,
503  const KnowledgeUpdateSettings & settings)
504 {
505  int result = 1;
506 
507  // enter the mutex
508  std::string key_actual;
509  const std::string * key_ptr;
510  MADARA_GUARD_TYPE guard (mutex_);
511 
512  if (settings.expand_variables)
513  {
514  key_actual = expand_statement (key);
515  key_ptr = &key_actual;
516  }
517  else
518  key_ptr = &key;
519 
520  // check for null key
521  if (*key_ptr == "")
522  return -1;
523 
524  // find the key in the knowledge base
525  KnowledgeMap::iterator found = map_.find (*key_ptr);
526 
527  // if it's found, then compare the value
528  if (!settings.always_overwrite && found != map_.end ())
529  {
530  // setup a rhs
531  KnowledgeRecord rhs;
532  rhs.set_value (value);
533 
534  // if we do not have enough quality to update the variable
535  // return -2
536  if (quality < found->second.quality)
537  result = -2;
538 
539  // if we have the same quality, but our clock value
540  // is less than what we've already seen, then return -3
541  else if (quality == found->second.quality &&
542  clock < found->second.clock)
543  result = -3;
544 
545  // check for value already set
546  else if (found->second == rhs)
547  result = 0;
548  } else {
549  auto ret = map_.emplace(std::piecewise_construct,
550  std::forward_as_tuple(*key_ptr),
551  std::make_tuple());
552  found = ret.first;
553  }
554 
555  KnowledgeRecord & record = found->second;
556 
557  // if we need to update quality, then update it
558  if (result != -2 && record.quality != quality)
559  record.quality = quality;
560 
561  // if we need to update the variable clock, then update it
562  if (clock > record.clock)
563  record.clock = clock;
564 
565  // if we need to update the global clock, then update it
566  if (clock > this->clock_)
567  this->clock_ = clock;
568 
569  if (result == 1)
570  {
571  // we have a situation where the value needs to be changed
572  record.set_value (value);
573 
574  mark_and_signal (&*found, settings);
575  }
576 
577  // value was changed
578  return result;
579 }
580 
585 int
587  const std::string & key, const std::string & value,
588  uint32_t quality, uint64_t clock,
589  const KnowledgeUpdateSettings & settings)
590 {
591  int result = 1;
592 
593  // enter the mutex
594  std::string key_actual;
595  const std::string * key_ptr;
596  MADARA_GUARD_TYPE guard (mutex_);
597 
598  if (settings.expand_variables)
599  {
600  key_actual = expand_statement (key);
601  key_ptr = &key_actual;
602  }
603  else
604  key_ptr = &key;
605 
606  // check for null key
607  if (*key_ptr == "")
608  return -1;
609 
610  // find the key in the knowledge base
611  KnowledgeMap::iterator found = map_.find (*key_ptr);
612 
613  // if it's found, then compare the value
614  if (!settings.always_overwrite && found != map_.end ())
615  {
616  // setup a rhs
617  KnowledgeRecord rhs;
618  rhs.set_value (value);
619 
620  // if we do not have enough quality to update the variable
621  // return -2
622  if (quality < found->second.quality)
623  result = -2;
624 
625  // if we have the same quality, but our clock value
626  // is less than what we've already seen, then return -3
627  else if (quality == found->second.quality &&
628  clock < found->second.clock)
629  result = -3;
630 
631  // check for value already set
632  else if (found->second == rhs)
633  result = 0;
634  } else {
635  auto ret = map_.emplace(std::piecewise_construct,
636  std::forward_as_tuple(*key_ptr),
637  std::make_tuple());
638  found = ret.first;
639  }
640 
641  KnowledgeRecord & record = found->second;
642 
643  // if we need to update quality, then update it
644  if (result != -2 && record.quality != quality)
645  record.quality = quality;
646 
647  // if we need to update the variable clock, then update it
648  if (clock > record.clock)
649  record.clock = clock;
650 
651  // if we need to update the global clock, then update it
652  if (clock > this->clock_)
653  this->clock_ = clock;
654 
655  if (result == 1)
656  {
657  // we have a situation where the value needs to be changed
658  record.set_value (value);
659 
660  // otherwise set the value
661  mark_and_signal (&*found, settings);
662  }
663  // value was changed
664  return result;
665 }
666 
667 
672 int
674  const std::string & key, const knowledge::KnowledgeRecord & rhs,
675  const KnowledgeUpdateSettings & settings)
676 {
677  int result = 1;
678 
679  // enter the mutex
680  std::string key_actual;
681  const std::string * key_ptr;
682  MADARA_GUARD_TYPE guard (mutex_);
683 
684  if (settings.expand_variables)
685  {
686  key_actual = expand_statement (key);
687  key_ptr = &key_actual;
688  }
689  else
690  key_ptr = &key;
691 
692  // check for null key
693  if (*key_ptr == "")
694  return -1;
695 
696  // find the key in the knowledge base
697  KnowledgeMap::iterator found = map_.find (*key_ptr);
698 
699  // if it's found, then compare the value
700  if (!settings.always_overwrite && found != map_.end ())
701  {
702  // if we do not have enough quality to update the variable
703  // return -2
704  if (rhs.quality < found->second.quality)
705  result = -2;
706 
707  // if we have the same quality, but our clock value
708  // is less than what we've already seen, then return -3
709  else if (rhs.quality == found->second.quality &&
710  rhs.clock < found->second.clock)
711  result = -3;
712 
713  // if we reach this point, then the record is safe to copy
714  found->second.set_value (rhs);
715 
716  mark_and_signal (&*found, settings);
717  }
718  else
719  {
720  // if we reach this point, then we have to create the record
721  auto ret = map_.emplace(std::piecewise_construct,
722  std::forward_as_tuple(*key_ptr),
723  std::make_tuple());
724  found = ret.first;
725 
726  knowledge::KnowledgeRecord & current_value = found->second;
727  current_value.set_value (rhs);
728 
729  mark_and_signal (&*found, settings);
730  }
731 
732  // if we need to update the global clock, then update it
733  if (rhs.clock >= this->clock_)
734  this->clock_ = rhs.clock + 1;
735 
736  //if (settings.signal_changes)
737  // changed_.MADARA_CONDITION_NOTIFY_ONE ();
738 
739  // value was changed
740  return result;
741 }
742 
747 int
749  const VariableReference & target,
750  const knowledge::KnowledgeRecord & rhs,
751  const KnowledgeUpdateSettings & settings)
752 {
753  int result = 1;
754 
755  MADARA_GUARD_TYPE guard (mutex_);
756  auto record = target.get_record_unsafe();
757 
758  // if it's found, then compare the value
759  if (!settings.always_overwrite && target.is_valid ())
760  {
761  // if we do not have enough quality to update the variable
762  // return -2
763  if (rhs.quality < record->quality)
764  result = -2;
765 
766  // if we have the same quality, but our clock value
767  // is less than what we've already seen, then return -3
768  else if (rhs.quality == record->quality &&
769  rhs.clock < record->clock)
770  result = -3;
771 
772  // if we reach this point, then the record is safe to copy
773  record->set_value (rhs);
774 
775  mark_and_signal (target, settings);
776  }
777 
778  // if we need to update the global clock, then update it
779  if (rhs.clock >= this->clock_)
780  this->clock_ = rhs.clock + 1;
781 
782  // value was changed
783  return result;
784 }
785 
789 void
791 {
792  changed_.MADARA_CONDITION_NOTIFY_ONE ();
793 }
794 
795 // print all variables and their values
796 void
798  unsigned int level) const
799 {
800  MADARA_GUARD_TYPE guard (mutex_);
801  for (KnowledgeMap::const_iterator i = map_.begin ();
802  i != map_.end ();
803  ++i)
804  {
805  if (i->second.exists ())
806  {
807  madara_logger_ptr_log (logger_, (int)level, "%s=%s\n",
808  i->first.c_str (), i->second.to_string (", ").c_str ());
809  }
810  }
811 }
812 
813 // print all variables and their values
814 void
816  std::string & target,
817  const std::string & array_delimiter,
818  const std::string & record_delimiter,
819  const std::string & key_val_delimiter) const
820 {
821  MADARA_GUARD_TYPE guard (mutex_);
822  std::stringstream buffer;
823 
824  bool first = true;
825 
826  for (KnowledgeMap::const_iterator i = map_.begin ();
827  i != map_.end ();
828  ++i)
829  {
830  // separate each record with the record_delimiter
831  if (!first)
832  {
833  buffer << record_delimiter;
834  }
835 
836  buffer << i->first;
837 
838  // separate the key/value pairing with the key_val_delimiter
839  buffer << key_val_delimiter;
840 
841  if (i->second.is_string_type ())
842  {
843  buffer << "'";
844  }
845  else if (i->second.type () == i->second.DOUBLE_ARRAY ||
846  i->second.type () == i->second.INTEGER_ARRAY)
847  {
848  buffer << "[";
849  }
850 
851  // use the array_delimiter for the underlying to_string functions
852  buffer << i->second.to_string (array_delimiter);
853 
854  if (i->second.is_string_type ())
855  {
856  buffer << "'";
857  }
858  else if (i->second.type () == i->second.DOUBLE_ARRAY ||
859  i->second.type () == i->second.INTEGER_ARRAY)
860  {
861  buffer << "]";
862  }
863 
864  if (first)
865  first = false;
866  }
867 
868  target = buffer.str ();
869 }
870 
876  const std::string & statement) const
877 {
878  // enter the mutex
879  MADARA_GUARD_TYPE guard (mutex_);
880 
881  // vectors for holding parsed tokens and pivot_list
882  size_t subcount = 0;
883  size_t begin_exp = 0;
884 
885  std::stringstream builder;
886 
887  // iterate over the input string
888  for (std::string::size_type i = 0; i < statement.size (); ++i)
889  {
890  // if this is an open brace, increase the subcount
891  if (statement[i] == '{')
892  {
893  ++subcount;
894  if (subcount == 1)
895  begin_exp = i;
896  }
897  // closed brace should decrease subcount
898  else if (statement[i] == '}')
899  {
900  if (subcount == 1)
901  {
902  std::string expandable =
903  statement.substr (begin_exp + 1, i - begin_exp - 1);
904  std::string results = this->expand_statement (expandable);
905  builder << this->get (results);
906  }
907  --subcount;
908  }
909  // otherwise, if this subcount is 0, then we need to add it to our output
910  // we allow anything not in subcount == 0 to be handled through recursion
911  else
912  {
913  if (subcount == 0)
914  builder << statement[i];
915  }
916  }
917 
918  // check to see if all brace counts are appropriate
919  if (subcount != 0)
920  {
922  "KARL COMPILE ERROR : Improperly matched braces in %s\n",
923  statement.c_str ());
924  }
925 
926  return builder.str ();
927 }
928 
929 #ifndef _MADARA_NO_KARL_
930 
931 // defines a function by name
932 void
934  const std::string & name,
936  const KnowledgeReferenceSettings & settings)
937 {
938  // enter the mutex
939  std::string key_actual;
940  const std::string * key_ptr;
941  MADARA_GUARD_TYPE guard (mutex_);
942 
943  if (settings.expand_variables)
944  {
945  key_actual = expand_statement (name);
946  key_ptr = &key_actual;
947  }
948  else
949  key_ptr = &name;
950 
951  // check for null key
952  if (*key_ptr == "")
953  return;
954 
955  functions_[*key_ptr] = Function (func);
956 }
957 
958 void
960  const std::string & name,
961  knowledge::KnowledgeRecord (*func) (const char * name, FunctionArguments &, Variables &),
962  const KnowledgeReferenceSettings & settings)
963 {
964  // enter the mutex
965  std::string key_actual;
966  const std::string * key_ptr;
967  MADARA_GUARD_TYPE guard (mutex_);
968 
969  if (settings.expand_variables)
970  {
971  key_actual = expand_statement (name);
972  key_ptr = &key_actual;
973  }
974  else
975  key_ptr = &name;
976 
977  // check for null key
978  if (*key_ptr == "")
979  return;
980 
981  functions_[*key_ptr] = Function (func);
982 }
983 
984 #ifdef _MADARA_JAVA_
985 void
987  jobject callable,
988  const KnowledgeReferenceSettings & settings)
989 {
990  // enter the mutex
991  std::string key_actual;
992  const std::string * key_ptr;
993  MADARA_GUARD_TYPE guard (mutex_);
994 
995  if (settings.expand_variables)
996  {
997  key_actual = expand_statement (name);
998  key_ptr = &key_actual;
999  }
1000  else
1001  key_ptr = &name;
1002 
1003  // check for null key
1004  if (*key_ptr == "")
1005  return;
1006 
1007  functions_[*key_ptr] = Function (callable);
1008 }
1009 #endif
1010 
1011 #ifdef _MADARA_PYTHON_CALLBACKS_
1012 void
1014  boost::python::object callable,
1015  const KnowledgeReferenceSettings & settings)
1016 {
1017  // enter the mutex
1018  std::string key_actual;
1019  const std::string * key_ptr;
1020  MADARA_GUARD_TYPE guard (mutex_);
1021 
1022  if (settings.expand_variables)
1023  {
1024  key_actual = expand_statement (name);
1025  key_ptr = &key_actual;
1026  }
1027  else
1028  key_ptr = &name;
1029 
1030  // check for null key
1031  if (*key_ptr == "")
1032  return;
1033 
1034  functions_[*key_ptr] = Function (callable);
1035 }
1036 
1037 #endif
1038 
1039 void
1041  const std::string & expression,
1042  const KnowledgeReferenceSettings & settings)
1043 {
1044  CompiledExpression compiled = compile (expression);
1045  define_function (name, compiled, settings);
1046 }
1047 
1048 void
1050  const CompiledExpression & expression,
1051  const KnowledgeReferenceSettings & settings)
1052 {
1053  // enter the mutex
1054  std::string key_actual;
1055  const std::string * key_ptr;
1056  MADARA_GUARD_TYPE guard (mutex_);
1057 
1058  if (settings.expand_variables)
1059  {
1060  key_actual = expand_statement (name);
1061  key_ptr = &key_actual;
1062  }
1063  else
1064  key_ptr = &name;
1065 
1066  // check for null key
1067  if (*key_ptr == "")
1068  return;
1069 
1070  functions_[*key_ptr] = Function (expression.expression);
1071 }
1072 
1073 
1074 Function *
1076  const std::string & name,
1077  const KnowledgeReferenceSettings & settings)
1078 {
1079  // enter the mutex
1080  std::string key_actual;
1081  const std::string * key_ptr;
1082  MADARA_GUARD_TYPE guard (mutex_);
1083 
1084  if (settings.expand_variables)
1085  {
1086  key_actual = expand_statement (name);
1087  key_ptr = &key_actual;
1088  }
1089  else
1090  key_ptr = &name;
1091 
1092  // check for null key
1093  if (*key_ptr == "")
1094  return 0;
1095 
1096  return &functions_[*key_ptr];
1097 }
1098 
1099 
1102  const std::string & expression)
1103 {
1105  "ThreadSafeContext::compile:" \
1106  " compiling %s\n", expression.c_str ());
1107 
1108  MADARA_GUARD_TYPE guard (mutex_);
1109  CompiledExpression ce;
1110  ce.logic = expression;
1111  ce.expression = interpreter_->interpret (*this, expression);
1112 
1113  return ce;
1114 }
1115 
1118  CompiledExpression expression,
1119  const KnowledgeUpdateSettings & settings)
1120 {
1121  MADARA_GUARD_TYPE guard (mutex_);
1122  return expression.expression.evaluate (settings);
1123 }
1124 
1128  const KnowledgeUpdateSettings & settings)
1129 {
1130  MADARA_GUARD_TYPE guard (mutex_);
1131  if (root)
1132  return root->evaluate (settings);
1133  else
1135 }
1136 
1137 #endif // _MADARA_NO_KARL_
1138 
1139 size_t
1141  const std::string & subject,
1142  unsigned int start,
1143  unsigned int end,
1144  std::vector <KnowledgeRecord> & target)
1145 {
1146  target.clear ();
1147 
1148  // enter the mutex
1149  MADARA_GUARD_TYPE guard (mutex_);
1150 
1151  if (end >= start)
1152  {
1153  target.resize (end - start + 1);
1154 
1155  for (unsigned int i = 0; start <= end; ++start, ++i)
1156  {
1157  std::stringstream buffer;
1158  buffer << subject;
1159  buffer << start;
1160  target[i] = get (buffer.str ());
1161  }
1162  }
1163 
1164  return target.size ();
1165 }
1166 
1167 
1168 size_t
1170  const std::string & expression,
1171  std::map <std::string, knowledge::KnowledgeRecord> & target)
1172 {
1173  target.clear ();
1174 
1175  std::string subject (expression);
1176  bool matches_found (false);
1177 
1178  // remove the wildcard and make this into a subject
1179  if (subject[subject.size () - 1] == '*')
1180  subject.resize (subject.size () - 1);
1181 
1182  // just in case a string implementation does not inline
1183  std::string::size_type subject_size = subject.size ();
1184  const char * subject_ptr = subject.c_str ();
1185 
1186  // enter the mutex
1187  MADARA_GUARD_TYPE guard (mutex_);
1188 
1189  // if expression is blank, assume the user wants all variables
1190  if (expression.size () == 0)
1191  target = map_;
1192  else
1193  {
1194  for (KnowledgeMap::iterator i = map_.begin ();
1195  i != map_.end (); ++i)
1196  {
1197  if (i->first.size () >= subject_size)
1198  {
1199  int result = strncmp (i->first.c_str (), subject_ptr, subject_size);
1200  if (result == 0)
1201  {
1202  // we have a match, add this to the map
1203  target[i->first] = i->second;
1204  matches_found = true;
1205  }
1206  else if (matches_found)
1207  {
1208  // we have already found matches, and now we're not seeing matches
1209  break;
1210  }
1211  }
1212  }
1213  }
1214 
1215 
1216  return target.size ();
1217 }
1218 
1219 
1220 void
1222  const std::string & prefix,
1223  const std::string & suffix,
1224  VariableReferences & matches)
1225 {
1226  // get the first thing that either matches the prefix or is just after it
1227  KnowledgeMap::iterator i = map_.lower_bound (prefix);
1228 
1229  // if we have a valid prefix, then there is more to do
1230  if (i != map_.end ())
1231  {
1232  // keep track of the beginning as we're going to iterate twice
1233  KnowledgeMap::iterator first_match = i;
1234  KnowledgeMap::iterator after_matches = map_.end ();
1235  VariableReferences::iterator match;
1236 
1237  size_t num_matches = 0;
1238 
1239  size_t prefix_length = prefix.length ();
1240 
1241  // Iterate over all of the prefix matches
1242  while (i != map_.end () &&
1243  i->first.compare (0, prefix_length, prefix) == 0)
1244  {
1245  ++i;
1246  if (suffix == "" || utility::ends_with (i->first, suffix))
1247  {
1248  ++num_matches;
1249  }
1250  }
1251 
1252  // save the end point so we can do fewer checks
1253  after_matches = i;
1254 
1255  // resize the matches to the appropriate size
1256  matches.resize (num_matches);
1257 
1258  // now, instead of many resizes, we are just going to resize once and set
1259  i = first_match;
1260  num_matches = 0;
1261 
1262  match = matches.begin ();
1263 
1264  // Reiterate
1265  while (i != after_matches)
1266  {
1267  if (suffix == "" || utility::ends_with (i->first, suffix))
1268  {
1269  match->assign (&*i);
1270  ++match;
1271  }
1272  ++i;
1273  }
1274  }
1275  else
1276  {
1277  matches.clear ();
1278  }
1279 }
1280 
1281 
1282 size_t
1284  const std::string & prefix,
1285  const std::string & delimiter,
1286  const std::string & suffix,
1287  std::vector <std::string> & next_keys,
1288  std::map <std::string, knowledge::KnowledgeRecord> & result,
1289  bool just_keys)
1290 {
1291  // clear the user provided maps
1292  next_keys.clear ();
1293  result.clear ();
1294 
1295  // loop tracking for optimizations
1296  bool matches_found (false);
1297  std::string last_key ("");
1298 
1299  // enter the mutex
1300  MADARA_GUARD_TYPE guard (mutex_);
1301 
1302  KnowledgeMap::iterator i = map_.begin ();
1303 
1304  if (prefix != "")
1305  {
1306  i = map_.lower_bound (prefix);
1307  }
1308 
1309  for (; i != map_.end (); ++i)
1310  {
1311  // if the prefix doesn't match
1312  if (prefix != "" && !utility::begins_with (i->first, prefix))
1313  {
1314  // if we had previously matched a prefix, we're done
1315  if (matches_found)
1316  {
1317  break;
1318  }
1319  }
1320  // we have a prefix match
1321  else
1322  {
1323  // set matches found if it hasn't been set previously
1324  if (!matches_found)
1325  {
1326  matches_found = true;
1327  }
1328 
1329  // if the suffix is provided and doesn't match, continue
1330  if (suffix != "" && !utility::ends_with (i->first, suffix))
1331  {
1332  continue;
1333  }
1334 
1335  if (!just_keys)
1336  {
1337  // the key is safe to add to the master map
1338  result[i->first] = i->second;
1339  }
1340 
1341  // determine if there is a next key in the hierarchy
1342  size_t prefix_end = prefix.length () + delimiter.length ();
1343 
1344  std::string current_delimiter = i->first.substr (prefix.length (), delimiter.length ());
1345 
1346  if (current_delimiter == delimiter && i->first.length () > prefix_end)
1347  {
1348  // find the end of the sub key
1349  size_t key_end = i->first.find (delimiter, prefix_end);
1350 
1351  // if we haven't seen the subkey, add it
1352  std::string current_key (
1353  i->first.substr (prefix_end, key_end - prefix_end));
1354  if (current_key != last_key)
1355  {
1356  next_keys.push_back (current_key);
1357  last_key = current_key;
1358  }
1359  }
1360  }
1361  }
1362 
1363 
1364 
1365  return result.size ();
1366 }
1367 
1368 void
1370 const std::string & prefix,
1372 {
1373  // enter the mutex
1374  MADARA_GUARD_TYPE guard (mutex_);
1375 
1376  std::pair<KnowledgeMap::iterator, KnowledgeMap::iterator>
1377  iters (get_prefix_range (prefix));
1378 
1379  map_.erase (iters.first, iters.second);
1380 
1381  {
1382  // check the changed map
1383  std::pair<VariableReferenceMap::iterator, VariableReferenceMap::iterator>
1384  changed (changed_map_.lower_bound (prefix.c_str()), changed_map_.end ());
1385 
1386  // does our lower bound actually contain the prefix?
1387  if (prefix.compare (0, prefix.size(), changed.first->first, prefix.size()) == 0)
1388  {
1389  changed.second = changed.first;
1390 
1391  // until we find an entry that does not begin with prefix, loop
1392  for (++changed.second;
1393  (prefix.compare (0, prefix.size(), changed.second->first, prefix.size()) == 0) &&
1394  changed.second != changed_map_.end ();
1395  ++changed.second) {}
1396 
1397  changed_map_.erase (changed.first, changed.second);
1398  }
1399  }
1400 
1401  {
1402  // check the local changed map
1403  std::pair<VariableReferenceMap::iterator, VariableReferenceMap::iterator>
1404  local_changed (local_changed_map_.lower_bound (prefix.c_str()),
1405  local_changed_map_.end ());
1406 
1407 
1408  // does our lower bound actually contain the prefix?
1409  if (prefix.compare (0, prefix.size(), local_changed.first->first, prefix.size()) == 0)
1410  {
1411  local_changed.second = local_changed.first;
1412 
1413  // until we find an entry that does not begin with prefix, loop
1414  for (++local_changed.second;
1415  (prefix.compare (0, prefix.size(), local_changed.second->first, prefix.size()) == 0) &&
1416  local_changed.second != local_changed_map_.end ();
1417  ++local_changed.second) {}
1418 
1419  local_changed_map_.erase (local_changed.first, local_changed.second);
1420  }
1421  }
1422 }
1423 
1424 std::pair<KnowledgeMap::iterator,
1425  KnowledgeMap::iterator>
1427  const std::string &prefix)
1428 {
1429  std::pair<KnowledgeMap::iterator, KnowledgeMap::iterator>
1430  ret(map_.begin(), map_.end());
1431 
1432  // If prefix is empty string, copy entire map
1433  if(prefix.size() > 0)
1434  {
1435  ssize_t psz = prefix.size();
1436 
1437  // Find first element >= prefix; i.e., first match or first with that prefix
1438  ret.second = ret.first = map_.lower_bound(prefix);
1439 
1440  // Advance e until it is just past last element with prefix (or at end)
1441  while(ret.second != map_.end() &&
1442  ret.second->first.compare(0, psz, prefix) == 0)
1443  ++ret.second;
1444  }
1445  return ret;
1446 }
1447 
1448 std::pair<KnowledgeMap::const_iterator,
1449  KnowledgeMap::const_iterator>
1451  const std::string &prefix) const
1452 {
1453  std::pair<KnowledgeMap::const_iterator, KnowledgeMap::const_iterator>
1454  ret(map_.begin(), map_.end());
1455 
1456  // If prefix is empty string, copy entire map
1457  if(prefix.size() > 0)
1458  {
1459  ssize_t psz = prefix.size ();
1460 
1461  // Find first element >= prefix; i.e., first match or first with that prefix
1462  ret.second = ret.first = map_.lower_bound(prefix);
1463 
1464  // Advance e until it is just past last element with prefix (or at end)
1465  while(ret.second != map_.end() &&
1466  ret.second->first.compare(0, psz, prefix) == 0)
1467  ++ret.second;
1468  }
1469  return ret;
1470 }
1471 
1474  const std::string &prefix) const
1475 {
1476  // enter the mutex
1477  MADARA_GUARD_TYPE guard (mutex_);
1478 
1479  std::pair<KnowledgeMap::const_iterator, KnowledgeMap::const_iterator>
1480  iters(get_prefix_range(prefix));
1481 
1482  // RVO should avoid copying this map
1483  return KnowledgeMap(deep_iterate(iters.first), deep_iterate(iters.second));
1484 }
1485 
1488  const std::string &prefix) const
1489 {
1490  // enter the mutex
1491  MADARA_GUARD_TYPE guard (mutex_);
1492 
1493  std::pair<KnowledgeMap::const_iterator, KnowledgeMap::const_iterator>
1494  iters(get_prefix_range(prefix));
1495 
1496  // NRVO should avoid copying this map
1497  KnowledgeMap ret;
1498  for(;iters.first != iters.second; ++iters.first)
1499  {
1500  ret.emplace_hint(ret.end(), iters.first->first.substr(prefix.size()),
1501  iters.first->second);
1502  }
1503  return ret;
1504 }
1505 
1506 void
1508  const ThreadSafeContext & source,
1509  const KnowledgeRequirements & settings)
1510 {
1512  "ThreadSafeContext::copy:" \
1513  " copying a context\n");
1514 
1515  if (settings.clear_knowledge)
1516  {
1518  "ThreadSafeContext::copy:" \
1519  " clearing knowledge in target context\n");
1520 
1521  map_.clear ();
1522  }
1523 
1524  if (settings.predicates.size () != 0)
1525  {
1526  for (auto predicate : settings.predicates)
1527  {
1528  std::pair<KnowledgeMap::const_iterator, KnowledgeMap::const_iterator>
1529  iters(source.get_prefix_range(predicate.prefix));
1530 
1531  if (predicate.suffix == "")
1532  {
1534  "ThreadSafeContext::copy:" \
1535  " matching predicate.prefix=%s\n", predicate.prefix.c_str ());
1536 
1537  for(;iters.first != iters.second; ++iters.first)
1538  {
1540  "ThreadSafeContext::copy:" \
1541  " looking for %s\n", iters.first->first.c_str ());
1542 
1543  auto where = map_.lower_bound(iters.first->first);
1544 
1545  if (where == map_.end() || where->first != iters.first->first)
1546  {
1548  "ThreadSafeContext::copy:" \
1549  " inserting %s\n", iters.first->first.c_str ());
1550 
1551  where = map_.emplace_hint(where,
1552  iters.first->first, iters.first->second);
1553  }
1554  else
1555  {
1557  "ThreadSafeContext::copy:" \
1558  " overwriting %s\n", iters.first->first.c_str ());
1559 
1560  where->second = iters.first->second;
1561  }
1562  }
1563  }
1564  else // we need to match a suffix
1565  {
1567  "ThreadSafeContext::copy:" \
1568  " matching predicate.suffix=%s\n", predicate.suffix.c_str ());
1569 
1570  for(;iters.first != iters.second; ++iters.first)
1571  {
1572  if (madara::utility::ends_with (iters.first->first,
1573  predicate.suffix))
1574  {
1576  "ThreadSafeContext::copy:" \
1577  " looking for %s\n", iters.first->first.c_str ());
1578 
1579  auto where = map_.lower_bound(iters.first->first);
1580 
1581  if (where == map_.end() || where->first != iters.first->first)
1582  {
1584  "ThreadSafeContext::copy:" \
1585  " inserting %s\n", iters.first->first.c_str ());
1586 
1587  where = map_.emplace_hint(where,
1588  iters.first->first, iters.first->second);
1589  }
1590  else
1591  {
1593  "ThreadSafeContext::copy:" \
1594  " overwriting %s\n", iters.first->first.c_str ());
1595 
1596  where->second = iters.first->second;
1597  }
1598  } // end suffix match
1599  }
1600  }
1601  }
1602  }
1603  // we need to insert everything from source into this
1604  else
1605  {
1606 
1607  std::pair<KnowledgeMap::const_iterator, KnowledgeMap::const_iterator>
1608  iters (source.map_.begin (), source.map_.end ());
1609 
1610  for(;iters.first != iters.second; ++iters.first)
1611  {
1612  map_.insert (map_.begin (), KnowledgeMap::value_type(
1613  iters.first->first, iters.first->second));
1614  }
1615  }
1616 }
1617 
1618 void
1620  const ThreadSafeContext & source,
1621  const CopySet & copy_set,
1622  bool clean_copy)
1623 {
1624  // if we need to clean first, clear the map
1625  if (clean_copy)
1626  map_.clear ();
1627 
1628  // if the copy set is empty, copy everything
1629  if (copy_set.size () == 0)
1630  {
1631  for (KnowledgeMap::const_iterator i = source.map_.begin ();
1632  i != source.map_.end (); ++i)
1633  {
1634  map_[i->first] = (i->second);
1635  }
1636  }
1637  else
1638  {
1639  // we have a copy set, so only copy what the user asked for
1640  for (CopySet::const_iterator key = copy_set.begin ();
1641  key != copy_set.end (); ++key)
1642  {
1643  // check source for existence of the current copy set key
1644  KnowledgeMap::const_iterator i = source.map_.find (key->first);
1645 
1646  // if found, make a copy of the found entry
1647  if (i != source.map_.end ())
1648  {
1649  map_[i->first] = (i->second);
1650  }
1651  }
1652  }
1653 }
1654 
1655 int64_t
1657  const std::string & filename,
1658  const std::string & id) const
1659 {
1660  CheckpointSettings settings;
1661  settings.filename = filename;
1662  settings.originator = id;
1663 
1664  return save_context (settings);
1665 }
1666 
1667 int64_t
1669  const CheckpointSettings & settings) const
1670 {
1672  "ThreadSafeContext::save_context:" \
1673  " opening file %s\n", settings.filename.c_str ());
1674 
1675  //int64_t total_written (0);
1676  FILE * file = fopen (settings.filename.c_str (), "wb");
1677 
1678  FileHeader meta;
1679  meta.states = 1;
1680  strncpy (meta.originator, settings.originator.c_str (),
1681  sizeof (meta.originator) < settings.originator.size () + 1 ?
1682  sizeof (meta.originator) : settings.originator.size () + 1);
1683 
1684  transport::MessageHeader checkpoint_header;
1685 
1686  if (file)
1687  {
1688  int64_t max_buffer (settings.buffer_size);
1689  int64_t buffer_remaining (max_buffer);
1690 
1692  "ThreadSafeContext::save_context:" \
1693  " allocating %d byte buffer\n",
1694  (int)max_buffer);
1695 
1696  utility::ScopedArray <char> buffer = new char [max_buffer];
1697 
1698  char * current = buffer.get_ptr ();
1699 
1701  "ThreadSafeContext::save_context:" \
1702  " generating file meta\n");
1703 
1704  meta.size += checkpoint_header.encoded_size ();
1705  checkpoint_header.size = checkpoint_header.encoded_size ();
1706 
1707  if (settings.override_timestamp)
1708  {
1709  meta.initial_timestamp = settings.initial_timestamp;
1710  meta.last_timestamp = settings.last_timestamp;
1711  }
1712 
1713  current = meta.write (current, buffer_remaining);
1714 
1715  if (settings.override_lamport)
1716  {
1717  checkpoint_header.clock = settings.initial_lamport_clock;
1718  }
1719  else
1720  {
1721  checkpoint_header.clock = clock_;
1722  }
1723 
1724  current = checkpoint_header.write (current, buffer_remaining);
1725 
1727  "ThreadSafeContext::save_context:" \
1728  " writing records\n");
1729 
1730  // lock the context
1731  MADARA_GUARD_TYPE guard (mutex_);
1732 
1733  for (KnowledgeMap::const_iterator i = map_.begin ();
1734  i != map_.end (); ++i)
1735  {
1736  if (i->second.exists ())
1737  {
1738  // check if the prefix is allowed
1739  if (settings.prefixes.size () > 0)
1740  {
1742  "ThreadSafeContext::save_context:" \
1743  " we have %d prefixes to check against.\n",
1744  (int)settings.prefixes.size ());
1745 
1746  bool prefix_found = false;
1747  for (size_t j = 0;
1748  j < settings.prefixes.size () && !prefix_found; ++j)
1749  {
1751  "ThreadSafeContext::save_context:" \
1752  " checking record %s against prefix %s.\n",
1753  i->first.c_str (),
1754  settings.prefixes[j].c_str ());
1755 
1757  i->first, settings.prefixes[j]))
1758  {
1760  "ThreadSafeContext::save_context:" \
1761  " record has the correct prefix.\n");
1762 
1763  prefix_found = true;
1764  }
1765  }
1766 
1767  if (!prefix_found)
1768  {
1770  "ThreadSafeContext::save_context:" \
1771  " record has the wrong prefix. Rejected.\n");
1772 
1773  continue;
1774  }
1775  }
1776 
1777  // get the encoded size of the record for checking buffer boundaries
1778  int64_t encoded_size = i->second.get_encoded_size (i->first);
1779  ++checkpoint_header.updates;
1780  meta.size += encoded_size;
1781  checkpoint_header.size += encoded_size;
1782 
1783  current = i->second.write (current, i->first, buffer_remaining);
1784  }
1785  }
1786 
1787  // write the final sizes
1788  current = meta.write (buffer.get_ptr (), max_buffer);
1789  current = checkpoint_header.write (current, max_buffer);
1790 
1791  // call decode with any buffer filters
1792  int total = settings.encode ((unsigned char *)buffer.get_ptr (),
1793  (int)meta.size, (int)max_buffer);
1794 
1795  // update the meta data at the front
1796  fseek (file, 0, SEEK_SET);
1797 
1799  "ThreadSafeContext::save_context:" \
1800  " encoding with buffer filters: %d:%d bytes written.\n",
1801  (int)meta.size, (int)checkpoint_header.size);
1802 
1803  fwrite (buffer.get_ptr (), (size_t)total, 1, file);
1804 
1805  fclose (file);
1806  }
1807  else
1808  {
1810  "ThreadSafeContext::save_context:" \
1811  " couldn't open context file: %s.\n",
1812  settings.filename.c_str ());
1813 
1814  return -1;
1815  }
1816 
1817  return meta.size;
1818 }
1819 
1820 
1821 
1822 int64_t
1824 const std::string & filename) const
1825 {
1826  CheckpointSettings settings;
1827  settings.filename = filename;
1828 
1829  return save_as_karl (settings);
1830 }
1831 
1832 
1833 int64_t
1835 const CheckpointSettings & settings) const
1836 {
1838  "ThreadSafeContext::save_as_karl:" \
1839  " opening file %s\n", settings.filename.c_str ());
1840 
1841  int64_t bytes_written (0);
1842  std::stringstream buffer;
1843  std::ofstream file;
1844  file.open (settings.filename.c_str ());
1845 
1846  if (file.is_open ())
1847  {
1848  // lock the context
1849  MADARA_GUARD_TYPE guard (mutex_);
1850 
1851  for (KnowledgeMap::const_iterator i = map_.begin ();
1852  i != map_.end (); ++i)
1853  {
1854  if (i->second.exists ())
1855  {
1856  // check if the prefix is allowed
1857  if (settings.prefixes.size () > 0)
1858  {
1860  "ThreadSafeContext::save_as_karl:" \
1861  " we have %d prefixes to check against.\n",
1862  (int)settings.prefixes.size ());
1863 
1864  bool prefix_found = false;
1865  for (size_t j = 0;
1866  j < settings.prefixes.size () && !prefix_found; ++j)
1867  {
1869  "ThreadSafeContext::save_as_karl:" \
1870  " checking record %s against prefix %s.\n",
1871  i->first.c_str (),
1872  settings.prefixes[j].c_str ());
1873 
1875  i->first, settings.prefixes[j]))
1876  {
1878  "ThreadSafeContext::save_as_karl:" \
1879  " the record has the correct prefix.\n");
1880 
1881  prefix_found = true;
1882  }
1883  }
1884 
1885  if (!prefix_found)
1886  {
1888  "ThreadSafeContext::save_as_karl:" \
1889  " the record does not have a correct prefix.\n");
1890 
1891  continue;
1892  }
1893  }
1894 
1895  buffer << i->first;
1896  buffer << "=";
1897 
1898  if (!i->second.is_binary_file_type ())
1899  {
1900  // record is a non binary file type
1901  if (i->second.is_string_type ())
1902  {
1903  // strings require quotation marks
1904  buffer << "\"";
1905  }
1906  else if (i->second.type () == knowledge::KnowledgeRecord::INTEGER_ARRAY ||
1907  i->second.type () == knowledge::KnowledgeRecord::DOUBLE_ARRAY)
1908  {
1909  // arrays require brackets
1910  buffer << "[";
1911  }
1912 
1913  buffer << i->second;
1914  if (i->second.is_string_type ())
1915  {
1916  // strings require quotation marks
1917  buffer << "\"";
1918  }
1919  else if (i->second.type () == knowledge::KnowledgeRecord::INTEGER_ARRAY ||
1920  i->second.type () == knowledge::KnowledgeRecord::DOUBLE_ARRAY)
1921  {
1922  // arrays require brackets
1923  buffer << "]";
1924  }
1925  }
1926  else
1927  {
1928  buffer << "#read_file ('";
1929 
1930  std::string path = utility::extract_path (settings.filename);
1931 
1932  if (path == "")
1933  path = ".";
1934 
1935  path += "/";
1936  path += i->first;
1937 
1938  if (i->second.type () == knowledge::KnowledgeRecord::IMAGE_JPEG)
1939  {
1940  path += ".jpg";
1941  }
1942  else
1943  {
1944  path += ".dat";
1945  }
1946 
1947  utility::write_file (path,
1948  (void *)&(*i->second.file_value_)[0], i->second.size ());
1949  buffer << path;
1950 
1951 
1952  buffer << "')";
1953  }
1954 
1955  buffer << ";\n";
1956  }
1957  }
1958 
1959  std::string result = buffer.str ();
1960  file << result;
1961 
1962  bytes_written = (int64_t) result.size ();
1963 
1964  file.close ();
1965  }
1966  else
1967  {
1969  "ThreadSafeContext::save_as_karl:" \
1970  " couldn't open karl file: %s.\n",
1971  settings.filename.c_str ());
1972 
1973  return -1;
1974  }
1975 
1976  return bytes_written;
1977 }
1978 
1979 
1980 int64_t
1982 const std::string & filename) const
1983 {
1984  CheckpointSettings settings;
1985  settings.filename = filename;
1986 
1987  return save_as_json (settings);
1988 }
1989 
1990 
1991 int64_t
1993 const CheckpointSettings & settings) const
1994 {
1996  "ThreadSafeContext::save_as_json:" \
1997  " opening file %s\n", settings.filename.c_str ());
1998 
1999  int64_t bytes_written (0);
2000 
2001  std::stringstream buffer;
2002  std::ofstream file;
2003  file.open (settings.filename.c_str ());
2004 
2005  if (file.is_open ())
2006  {
2007  // lock the context
2008  MADARA_GUARD_TYPE guard (mutex_);
2009 
2010  buffer << "{\n";
2011 
2012  for (KnowledgeMap::const_iterator i = map_.begin ();
2013  i != map_.end (); ++i)
2014  {
2015  if (i->second.exists ())
2016  {
2017  // check if the prefix is allowed
2018  if (settings.prefixes.size () > 0)
2019  {
2021  "ThreadSafeContext::save_as_json:" \
2022  " we have %d prefixes to check against.\n",
2023  (int)settings.prefixes.size ());
2024 
2025  bool prefix_found = false;
2026  for (size_t j = 0;
2027  j < settings.prefixes.size () && !prefix_found; ++j)
2028  {
2030  "ThreadSafeContext::save_as_json:" \
2031  " checking record %s against prefix %s.\n",
2032  i->first.c_str (),
2033  settings.prefixes[j].c_str ());
2034 
2036  i->first, settings.prefixes[j]))
2037  {
2039  "ThreadSafeContext::save_as_json:" \
2040  " the record has the correct prefix.\n");
2041 
2042  prefix_found = true;
2043  }
2044  }
2045 
2046  if (!prefix_found)
2047  {
2049  "ThreadSafeContext::save_as_json:" \
2050  " the record does not have a correct prefix.\n");
2051 
2052  continue;
2053  }
2054  }
2055 
2056  buffer << " \"";
2057  buffer << i->first;
2058  buffer << "\" : ";
2059 
2060  if (!i->second.is_binary_file_type ())
2061  {
2062  // record is a non binary file type
2063  if (i->second.is_string_type ())
2064  {
2065  // strings require quotation marks
2066  buffer << "\"";
2067  }
2068  else if (i->second.type () == knowledge::KnowledgeRecord::INTEGER_ARRAY ||
2069  i->second.type () == knowledge::KnowledgeRecord::DOUBLE_ARRAY)
2070  {
2071  // arrays require brackets
2072  buffer << "[";
2073  }
2074 
2075  buffer << i->second;
2076  if (i->second.is_string_type ())
2077  {
2078  // strings require quotation marks
2079  buffer << "\"";
2080  }
2081  else if (i->second.type () == knowledge::KnowledgeRecord::INTEGER_ARRAY ||
2082  i->second.type () == knowledge::KnowledgeRecord::DOUBLE_ARRAY)
2083  {
2084  // arrays require brackets
2085  buffer << "]";
2086  }
2087  }
2088  else
2089  {
2090  buffer << "#read_file ('";
2091 
2092  std::string path = utility::extract_path (settings.filename);
2093 
2094  if (path == "")
2095  path = ".";
2096 
2097  path += "/";
2098  path += i->first;
2099 
2100  if (i->second.type () == knowledge::KnowledgeRecord::IMAGE_JPEG)
2101  {
2102  path += ".jpg";
2103  }
2104  else
2105  {
2106  path += ".dat";
2107  }
2108 
2109  utility::write_file (path,
2110  (void *)&(*i->second.file_value_)[0], i->second.size ());
2111  buffer << path;
2112 
2113 
2114  buffer << "')";
2115  }
2116 
2117  KnowledgeMap::const_iterator j (i);
2118 
2119  if (++j != map_.end ())
2120  buffer << ",\n";
2121  }
2122  }
2123 
2124  buffer << "\n}\n";
2125 
2126  std::string result = buffer.str ();
2127  file << result;
2128 
2129  bytes_written = (int64_t) result.size ();
2130 
2131  file.close ();
2132  }
2133  else
2134  {
2136  "ThreadSafeContext::save_as_json:" \
2137  " couldn't open json file: %s.\n",
2138  settings.filename.c_str ());
2139 
2140  return -1;
2141  }
2142 
2143  return bytes_written;
2144 }
2145 
2146 
2147 
2148 int64_t
2150  const std::string & filename, std::string & id,
2151  const KnowledgeUpdateSettings & settings)
2152 {
2153  CheckpointSettings checkpoint_settings;
2154  checkpoint_settings.filename = filename;
2155  checkpoint_settings.originator = id;
2156 
2157  return load_context (checkpoint_settings, settings);
2158 }
2159 
2160 int64_t
2162  const std::string & filename,
2163  FileHeader & meta,
2164  const KnowledgeUpdateSettings & settings)
2165 {
2167  "ThreadSafeContext::load_context:" \
2168  " opening file %s for just header info\n", filename.c_str ());
2169 
2170  // read the initial FileHeader in
2171  FILE * file = fopen (filename.c_str (), "rb");
2172 
2173  int64_t total_read (0);
2174 
2175  if (file)
2176  {
2177  int64_t max_buffer (102800);
2178  int64_t buffer_remaining (max_buffer);
2179 
2180  utility::ScopedArray <char> buffer = new char[max_buffer];
2181  const char * current = buffer.get_ptr ();
2182 
2184  "ThreadSafeContext::load_context:" \
2185  " reading file meta data\n");
2186 
2187  total_read = fread (buffer.get_ptr (),
2188  1, max_buffer, file);
2189  buffer_remaining = (int64_t)total_read;
2190 
2191  if (total_read > FileHeader::encoded_size () &&
2192  FileHeader::file_header_test (current))
2193  {
2194  // if there was something in the file, and it was the right header
2195 
2196  current = meta.read (current, buffer_remaining);
2197 
2198  } // end if total_read > 0
2199  else
2200  {
2202  "ThreadSafeContext::load_context:" \
2203  " invalid file. No contextual change.\n");
2204  }
2205 
2206  fclose (file);
2207  }
2208  else
2209  {
2211  "ThreadSafeContext::load_context:" \
2212  " could not open file %s for reading. "
2213  "Check that file exists and that permissions are appropriate.\n",
2214  filename.c_str ());
2215  }
2216 
2217  CheckpointSettings checkpoint_settings;
2218  checkpoint_settings.filename = filename;
2219 
2220  return load_context (checkpoint_settings, settings);
2221 }
2222 
2223 
2224 int64_t
2226  CheckpointSettings & checkpoint_settings,
2227  const KnowledgeUpdateSettings & update_settings)
2228 {
2230  "ThreadSafeContext::load_context:" \
2231  " opening file %s\n", checkpoint_settings.filename.c_str ());
2232 
2233  FILE * file = fopen (checkpoint_settings.filename.c_str (), "rb");
2234 
2235  int64_t total_read (0);
2236 
2237  if (checkpoint_settings.clear_knowledge)
2238  {
2239  this->clear ();
2240  }
2241 
2242  if (file)
2243  {
2244  FileHeader meta;
2245  int64_t max_buffer (checkpoint_settings.buffer_size);
2246  int64_t buffer_remaining (max_buffer);
2247 
2248  utility::ScopedArray <char> buffer = new char[max_buffer];
2249  const char * current = buffer.get_ptr ();
2250 
2251  total_read = fread (buffer.get_ptr (),
2252  1, max_buffer, file);
2253  buffer_remaining = (int64_t)total_read;
2254 
2256  "ThreadSafeContext::load_context:" \
2257  " reading file: %d bytes read.\n",
2258  (int)total_read);
2259 
2260  // call decode with any buffer filters
2261  checkpoint_settings.decode ((unsigned char *)buffer.get_ptr (),
2262  (int)(total_read), (int)max_buffer);
2263 
2264  if (total_read > FileHeader::encoded_size () &&
2265  FileHeader::file_header_test (current))
2266  {
2267  // if there was something in the file, and it was the right header
2268 
2269  current = meta.read (current, buffer_remaining);
2270 
2271  checkpoint_settings.initial_timestamp = meta.initial_timestamp;
2272  checkpoint_settings.last_timestamp = meta.last_timestamp;
2273  checkpoint_settings.originator = meta.originator;
2274  checkpoint_settings.states = meta.states;
2275  checkpoint_settings.version = utility::to_string_version (
2276  meta.karl_version);
2277 
2279  "ThreadSafeContext::load_context:" \
2280  " read File meta. Meta.size=%d\n", (int)meta.size);
2281 
2287  if (meta.states > 0)
2288  {
2289  for (uint64_t state = 0; state < meta.states &&
2290  state <= checkpoint_settings.last_state; ++state)
2291  {
2292  if (buffer_remaining > (int64_t)
2294  {
2295  transport::MessageHeader checkpoint_header;
2296 
2297  current = checkpoint_header.read (current, buffer_remaining);
2298 
2299  if (state == 0)
2300  {
2301  checkpoint_settings.initial_lamport_clock = checkpoint_header.clock;
2302  }
2303 
2304  if (state == meta.states - 1)
2305  {
2306  checkpoint_settings.last_lamport_clock = checkpoint_header.clock;
2307  }
2308 
2309  uint64_t updates_size = checkpoint_header.size -
2310  checkpoint_header.encoded_size ();
2311 
2313  "ThreadSafeContext::load_context:" \
2314  " read Checkpoint header. header.size=%d, updates.size=%d\n",
2315  (int)checkpoint_header.size, (int)updates_size);
2316 
2322  if (updates_size > (uint64_t)buffer_remaining)
2323  {
2328  utility::ScopedArray <char> new_buffer =
2329  new char[updates_size];
2330  memcpy (new_buffer.get_ptr (), current,
2331  (size_t)buffer_remaining);
2332 
2333  // read the rest of checkpoint into new buffer
2334  total_read += fread (new_buffer.get_ptr () + buffer_remaining, 1,
2335  updates_size
2336  - (uint64_t)buffer_remaining
2337  - checkpoint_header.encoded_size (), file);
2338 
2339  // update other variables
2340  max_buffer = updates_size;
2341  buffer_remaining = checkpoint_header.size
2342  - checkpoint_header.encoded_size ();
2343  current = new_buffer.get_ptr ();
2344  buffer = new_buffer;
2345  } // end if allocation is needed
2346 
2348  "ThreadSafeContext::load_context:" \
2349  " state=%d, initial_state=%d, last_state=%d\n",
2350  (int)state, (int)checkpoint_settings.initial_state,
2351  (int)checkpoint_settings.last_state);
2352 
2353  if (state <= checkpoint_settings.last_state &&
2354  state >= checkpoint_settings.initial_state)
2355  {
2356  for (uint32_t update = 0;
2357  update < checkpoint_header.updates; ++update)
2358  {
2359  std::string key;
2361  current = record.read (current, key, buffer_remaining);
2362 
2364  "ThreadSafeContext::load_context:" \
2365  " read record (%d of %d): %s\n",
2366  (int)update, (int)checkpoint_header.updates, key.c_str ());
2367 
2368  // check if the prefix is allowed
2369  if (checkpoint_settings.prefixes.size () > 0)
2370  {
2371  bool prefix_found = false;
2372  for (size_t j = 0; j < checkpoint_settings.prefixes.size ()
2373  && !prefix_found; ++j)
2374  {
2376  "ThreadSafeContext::load_context:" \
2377  " checking record %s against prefix %s\n",
2378  key.c_str (), checkpoint_settings.prefixes[j].c_str ());
2379 
2381  key, checkpoint_settings.prefixes[j]))
2382  {
2384  "ThreadSafeContext::load_context:" \
2385  " record has the correct prefix.\n");
2386 
2387  prefix_found = true;
2388  } // end if prefix success
2389  } // end for all prefixes
2390 
2391  if (!prefix_found)
2392  {
2394  "ThreadSafeContext::load_context:" \
2395  " record does not have the correct prefix. Rejected.\n");
2396 
2397  continue;
2398  } // end if prefix found
2399  } // end if there are prefixes in the checkpoint settings
2400 
2401  update_record_from_external (key, record, update_settings);
2402  } // end for all updates
2403  } // end if state is within acceptable range
2404  else
2405  {
2407  "ThreadSafeContext::load_context:" \
2408  " not a valid state, incrementing by %d bytes.\n",
2409  (int)updates_size);
2410 
2411  current += updates_size;
2412  }
2413  } // end if enough buffer for reading a message header
2414 
2415  if (buffer_remaining == 0 && (uint64_t)total_read < meta.size)
2416  {
2417  buffer_remaining = max_buffer;
2418  current = buffer.get_ptr ();
2419  total_read += fread (buffer.get_ptr (), 1, buffer_remaining, file);
2420  }
2421  } // end for loop of states
2422  }
2423  } // end if total_read > 0
2424  else
2425  {
2427  "ThreadSafeContext::load_context:" \
2428  " invalid file. No contextual change.\n");
2429  }
2430 
2431  fclose (file);
2432  }
2433  else
2434  {
2436  "ThreadSafeContext::load_context:" \
2437  " could not open file %s for reading. "
2438  "Check that file exists and that permissions are appropriate.\n",
2439  checkpoint_settings.filename.c_str ());
2440  }
2441 
2442  return total_read;
2443 }
2444 
2445 
2446 int64_t
2448  const CheckpointSettings & settings) const
2449 {
2451  "ThreadSafeContext::save_checkpoint:" \
2452  " opening file %s\n", settings.filename.c_str ());
2453 
2454  int64_t total_written (0);
2455  FILE * file = fopen (settings.filename.c_str (), "rb+");
2456 
2457  FileHeader meta;
2458  transport::MessageHeader checkpoint_header;
2459 
2460  if (file)
2461  {
2462  int64_t max_buffer (settings.buffer_size);
2463  int64_t buffer_remaining (max_buffer);
2464  utility::ScopedArray <char> buffer = new char [max_buffer];
2465 
2466  char * current = buffer.get_ptr ();
2467  const char * meta_reader = current;
2468 
2469  // read the meta data at the front
2470  fseek (file, 0, SEEK_SET);
2471  size_t ret = fread (current, meta.encoded_size (), 1, file);
2472  if (ret == 0) {
2474  "ThreadSafeContext::save_checkpoint:" \
2475  " failed to read existing file header: size=%d\n",
2476  (int)meta.encoded_size ());
2477 
2478  return -1;
2479  }
2480 
2481  meta_reader = meta.read (meta_reader, buffer_remaining);
2482 
2484  "ThreadSafeContext::save_checkpoint:" \
2485  " init file meta: size=%d, states=%d\n",
2486  (int)meta.size, (int)meta.states);
2487 
2488  if (settings.originator != "")
2489  {
2491  "ThreadSafeContext::save_checkpoint:" \
2492  " setting file meta id to %s\n",
2493  settings.originator.c_str ());
2494 
2495  strncpy (meta.originator, settings.originator.c_str (),
2496  sizeof (meta.originator) < settings.originator.size () + 1 ?
2497  sizeof (meta.originator) : settings.originator.size () + 1);
2498  }
2499 
2500  // save the spot where the file ends
2501  uint64_t checkpoint_start = meta.size;
2502 
2503  checkpoint_header.size = checkpoint_header.encoded_size ();
2504 
2506  "ThreadSafeContext::save_checkpoint:" \
2507  " meta.size=%d, chkpt.header.size=%d \n",
2508  (int)meta.size, (int)checkpoint_header.size);
2509 
2510  if (settings.override_timestamp)
2511  {
2512  meta.initial_timestamp = settings.initial_timestamp;
2513  meta.last_timestamp = settings.last_timestamp;
2514  }
2515 
2516  if (settings.override_lamport)
2517  {
2518  checkpoint_header.clock = settings.initial_lamport_clock;
2519  }
2520  else
2521  {
2522  checkpoint_header.clock = clock_;
2523  }
2524 
2525  const knowledge::VariableReferenceMap & local_records = this->get_local_modified ();
2526 
2527  if (local_records.size () != 0)
2528  {
2529  // skip over the checkpoint header. We'll write this later with the records
2530 
2532  "ThreadSafeContext::save_checkpoint:" \
2533  " fseek set to %d\n",
2534  (int)(checkpoint_start));
2535 
2536  // set the file pointer to the checkpoint header start
2537  fseek (file, (long)checkpoint_start, SEEK_SET);
2538 
2539  // start updates just past the checkpoint header's buffer location
2540  current = checkpoint_header.write (buffer.get_ptr (), buffer_remaining);
2541 
2543  "ThreadSafeContext::save_checkpoint:" \
2544  " chkpt.header.size=%d, current->buffer delta=%d\n",
2545  (int) checkpoint_header.encoded_size (),
2546  (int)(current - buffer.get_ptr ()));
2547 
2548  {
2549  // lock the context
2550  MADARA_GUARD_TYPE guard (mutex_);
2551 
2552  for (const auto &e : local_records)
2553  {
2554  auto record = e.second.get_record_unsafe();
2555 
2556  if (record->exists ())
2557  {
2558  // check if the prefix is allowed
2559  if (settings.prefixes.size () > 0)
2560  {
2561  bool prefix_found = false;
2562  for (size_t j = 0;
2563  j < settings.prefixes.size () && !prefix_found; ++j)
2564  {
2566  record->to_string (), settings.prefixes[j]))
2567  {
2568  prefix_found = true;
2569  }
2570  } // end for j->prefixes.size
2571 
2572  if (!prefix_found)
2573  continue;
2574  } // end if prefixes exists
2575 
2576  // get the encoded size of the record for checking buffer boundaries
2577  int64_t encoded_size = record->get_encoded_size (e.first);
2578 
2580  "ThreadSafeContext::save_checkpoint:" \
2581  " estimated encoded size of update=%d bytes\n",
2582  (int)encoded_size);
2583 
2584  if (encoded_size > buffer_remaining)
2585  {
2586  fwrite (current,
2587  (size_t)(max_buffer - buffer_remaining), 1, file);
2588  total_written += (int64_t)(max_buffer - buffer_remaining);
2589  buffer_remaining = max_buffer;
2590 
2592  "ThreadSafeContext::save_checkpoint:" \
2593  " encoded_size larger than remaining buffer. Flushing\n");
2594 
2595  if (encoded_size > max_buffer)
2596  {
2601  buffer = new char[encoded_size];
2602  max_buffer = encoded_size;
2603  buffer_remaining = max_buffer;
2604  current = buffer.get_ptr ();
2605 
2607  "ThreadSafeContext::save_checkpoint:" \
2608  " encoded_size larger than entire buffer. Reallocating\n");
2609  } // end if larger than buffer
2610  } // end if larger than buffer remaining
2611 
2612  current = record->write (current, e.first, buffer_remaining);
2613 
2614  checkpoint_header.size += (uint64_t)encoded_size;
2615  ++checkpoint_header.updates;
2616 
2618  "ThreadSafeContext::save_checkpoint:" \
2619  " chkpt.header.size=%d, current->buffer delta=%d\n",
2620  (int)checkpoint_header.size, (int)(current - buffer.get_ptr ()));
2621  } // if record exists
2622  } // for all records
2623 
2624  ++meta.states;
2625 
2626  if (settings.reset_checkpoint)
2627  {
2629  "ThreadSafeContext::save_checkpoint:" \
2630  " resetting checkpoint. Next checkpoint starts fresh here\n");
2631 
2632  reset_checkpoint ();
2633  }
2634  } // end scoped Guard for context
2635 
2637  "ThreadSafeContext::save_checkpoint:" \
2638  " writing final data for state #%d\n",
2639  (int)meta.states);
2640 
2641  if (buffer_remaining != max_buffer)
2642  {
2643  fwrite (buffer.get_ptr (),
2644  (size_t)(current - buffer.get_ptr ()), 1, file);
2645  total_written += (size_t)(current - buffer.get_ptr ());
2646 
2648  "ThreadSafeContext::save_checkpoint:" \
2649  " current->buffer=%d bytes, max->remaining=%d bytes\n",
2650  (int)(current - buffer.get_ptr ()), (int)max_buffer - buffer_remaining);
2651  }
2652 
2654  "ThreadSafeContext::save_checkpoint:" \
2655  " chkpt.header: size=%d, updates=%d\n",
2656  (int)checkpoint_header.size, (int)checkpoint_header.updates);
2657 
2658  buffer_remaining = max_buffer;
2659  fseek (file, (long)checkpoint_start, SEEK_SET);
2660  current = checkpoint_header.write (buffer.get_ptr (), buffer_remaining);
2661  fwrite (buffer.get_ptr (), current - buffer.get_ptr (), 1, file);
2662 
2663  meta.size += checkpoint_header.size;
2664 
2666  "ThreadSafeContext::save_checkpoint:" \
2667  " new file meta: size=%d, states=%d, lastchkpt.size=%d\n",
2668  (int)meta.size, (int)meta.states, (int)checkpoint_header.size);
2669 
2670  // update the meta data at the front
2671  fseek (file, 0, SEEK_SET);
2672 
2674  "ThreadSafeContext::save_checkpoint:" \
2675  " updating file meta data in the file\n");
2676 
2677  buffer_remaining = max_buffer;
2678  current = meta.write (buffer.get_ptr (), buffer_remaining);
2679 
2680  fwrite (buffer.get_ptr (), current - buffer.get_ptr (), 1, file);
2681 
2682  } // if there are local checkpointing records
2683 
2684  fclose (file);
2685  } // if file is opened
2686  else
2687  {
2689  "ThreadSafeContext::save_checkpoint:" \
2690  " checkpoint doesn't exist. Creating.\n");
2691 
2692  // someone wants to save the checkpoint diff to a new file
2693  file = fopen (settings.filename.c_str (), "wb");
2694 
2695  meta.states = 1;
2696  strncpy (meta.originator, settings.originator.c_str (),
2697  sizeof (meta.originator) < settings.originator.size () + 1 ?
2698  sizeof (meta.originator) : settings.originator.size () + 1);
2699 
2700  if (file)
2701  {
2702  int64_t max_buffer (settings.buffer_size);
2703  int64_t buffer_remaining (max_buffer);
2704  utility::ScopedArray <char> buffer = new char [max_buffer];
2705 
2706  char * current = buffer.get_ptr ();
2707 
2709  "ThreadSafeContext::save_checkpoint:" \
2710  " creating file meta. file.meta.size=%d, state.size=%d\n",
2711  (int)meta.size, (int)checkpoint_header.encoded_size ());
2712 
2713  meta.size += checkpoint_header.encoded_size ();
2714  checkpoint_header.size = checkpoint_header.encoded_size ();
2715 
2716  if (settings.override_timestamp)
2717  {
2718  meta.initial_timestamp = settings.initial_timestamp;
2719  meta.last_timestamp = settings.last_timestamp;
2720  }
2721 
2722  current = meta.write (current, buffer_remaining);
2723 
2724  if (settings.override_lamport)
2725  {
2726  checkpoint_header.clock = settings.initial_lamport_clock;
2727  }
2728  else
2729  {
2730  checkpoint_header.clock = clock_;
2731  }
2732 
2733  current = checkpoint_header.write (current, buffer_remaining);
2734 
2736  "ThreadSafeContext::save_checkpoint:" \
2737  " writing diff records\n");
2738 
2739  // lock the context
2740  MADARA_GUARD_TYPE guard (mutex_);
2741 
2742  const knowledge::VariableReferenceMap & local_records = this->get_local_modified ();
2743 
2744  for (const auto &e : local_records)
2745  {
2746  auto record = e.second.get_record_unsafe();
2747 
2748  if (record->exists ())
2749  {
2750  // check if the prefix is allowed
2751  if (settings.prefixes.size () > 0)
2752  {
2754  "ThreadSafeContext::save_checkpoint:" \
2755  " we have %d prefixes to check against.\n",
2756  (int)settings.prefixes.size ());
2757 
2758  bool prefix_found = false;
2759  for (size_t j = 0;
2760  j < settings.prefixes.size () && !prefix_found; ++j)
2761  {
2763  "ThreadSafeContext::save_checkpoint:" \
2764  " checking record %s against prefix %s.\n",
2765  e.first,
2766  settings.prefixes[j].c_str ());
2767 
2769  e.first, settings.prefixes[j]))
2770  {
2772  "ThreadSafeContext::save_checkpoint:" \
2773  " record has the correct prefix.\n");
2774 
2775  prefix_found = true;
2776  }
2777  }
2778 
2779  if (!prefix_found)
2780  {
2782  "ThreadSafeContext::save_checkpoint:" \
2783  " record has the wrong prefix. Rejected.\n");
2784 
2785  continue;
2786  }
2787  }
2788 
2789  // get the encoded size of the record for checking buffer boundaries
2790  int64_t encoded_size = record->get_encoded_size (e.first);
2791 
2793  "ThreadSafeContext::save_checkpoint:" \
2794  " estimated encoded size of update=%d bytes\n",
2795  (int)encoded_size);
2796 
2797  ++checkpoint_header.updates;
2798  meta.size += encoded_size;
2799  checkpoint_header.size += encoded_size;
2800 
2801  current = record->write (current, e.first, buffer_remaining);
2802 
2804  "ThreadSafeContext::save_checkpoint:" \
2805  " current->buffer delta=%d bytes\n",
2806  (int)(current - buffer.get_ptr ()));
2807 
2808  }
2809  }
2810 
2811  // write the final sizes
2812  current = meta.write (buffer.get_ptr (), max_buffer);
2813  current = checkpoint_header.write (current, max_buffer);
2814 
2815  // call decode with any buffer filters
2816  int total = settings.encode ((unsigned char *)buffer.get_ptr (),
2817  (int)meta.size, (int)max_buffer);
2818 
2819  // update the meta data at the front
2820  fseek (file, 0, SEEK_SET);
2821 
2823  "ThreadSafeContext::save_checkpoint:" \
2824  " file size: %d bytes written (file:%d, state.size:%d).\n",
2825  (int)total, (int)meta.size, (int)checkpoint_header.size);
2826 
2827  fwrite (buffer.get_ptr (), (size_t)total, 1, file);
2828 
2829  fclose (file);
2830 
2831  if (settings.reset_checkpoint)
2832  {
2833  reset_checkpoint ();
2834  }
2835 
2836  } // if the new file creation for wb was successful
2837  else
2838  {
2840  "ThreadSafeContext::save_checkpoint:" \
2841  " couldn't create checkpoint file: %s.\n",
2842  settings.filename.c_str ());
2843 
2844  return -1;
2845  }
2846  } // end if we need to create a new file
2847 
2848  return checkpoint_header.size;
2849 }
2850 
2851 
2852 int64_t
2854  const std::string & filename,
2855  const std::string & id) const
2856 {
2857  CheckpointSettings settings;
2858  settings.filename = filename;
2859  settings.originator = id;
2860 
2861  return save_checkpoint (settings);
2862 }
2863 
2864 } }
2865 
This class encapsulates an entry in a KnowledgeBase.
bool expand_variables
Toggle for always attempting to expand variables (true) or never expanding variables (false) ...
ExpressionTree interpret(madara::knowledge::ThreadSafeContext &context, const std::string &input)
Compiles an expression into an expression tree.
uint32_t get_quality(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Atomically gets quality of a variable.
std::map< std::string, bool > CopySet
Typedef for set of copyable keys.
uint64_t initial_lamport_clock
initial lamport clock saved in the checkpoint
virtual uint32_t encoded_size(void) const
Returns the size of the encoded MessageHeader class, which may be different from sizeof (MessageHeade...
std::vector< MatchPredicate > predicates
A vector of acceptable predicates to match (prefix and suffix).
virtual const char * read(const char *buffer, int64_t &buffer_remaining)
Reads a FileHeader instance from a buffer and updates the amount of buffer room remaining.
Definition: FileHeader.cpp:40
std::string version
the MADARA version
int update_record_from_external(const std::string &key, const knowledge::KnowledgeRecord &rhs, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings(true))
Atomically sets if the variable value meets update conditions.
madara::knowledge::KnowledgeMap map_
Hash table containing variable names and values.
Defines a file header which is the default for KaRL checkpointing.
Definition: FileHeader.h:36
void get_matches(const std::string &prefix, const std::string &suffix, VariableReferences &matches)
Creates an iteration of VariableReferences to all keys matching the prefix and suffix.
virtual const char * read(const char *buffer, int64_t &buffer_remaining)
Reads a MessageHeader instance from a buffer and updates the amount of buffer room remaining...
size_t buffer_size
the size of the buffer needed for the checkpoint
std::vector< std::string > expansion_splitters_
size_t to_vector(const std::string &subject, unsigned int start, unsigned int end, std::vector< KnowledgeRecord > &target)
Fills a vector with Knowledge Records that begin with a common subject and have a finite range of int...
This class stores a function definition.
Definition: Functions.h:44
std::pair< KnowledgeMap::const_iterator, KnowledgeMap::const_iterator > get_prefix_range(const std::string &prefix) const
uint32_t quality
priority of the update
int64_t save_context(const std::string &filename, const std::string &id="") const
Saves the context to a file.
size_t to_map(const std::string &subject, std::map< std::string, knowledge::KnowledgeRecord > &target)
Fills a variable map with Knowledge Records that match an expression.
int set_file(const std::string &key, const unsigned char *value, size_t size, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets the value of a variable to an arbitrary string.
madara::knowledge::KnowledgeRecord KnowledgeRecord
int set_jpeg(const std::string &key, const unsigned char *value, size_t size, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets the value of a variable to a JPEG image.
void mark_and_signal(VariableReference ref, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
method for marking a record modified and signaling changes
uint32_t updates
the number of knowledge variable updates in the message
const char * read(const char *buffer, int64_t &buffer_remaining)
Reads a KnowledgeRecord instance from a buffer and updates the amount of buffer room remaining...
void copy(const ThreadSafeContext &source, const KnowledgeRequirements &settings)
Copies variables and values from source to this context.
void set_file(const unsigned char *new_value, size_t size)
sets the value to an unknown file type
KnowledgeRecord * get_record_unsafe(void) const
Returns a pointer to the variable&#39;s KnowledgeRecord Do not use this pointer unless you&#39;ve locked the ...
char originator[64]
the originator of the message (host:port)
Definition: FileHeader.h:129
uint64_t last_state
the last state number of interest (useful for loading ranges of checkpoint states.
uint64_t initial_timestamp
the timestamp for the initial checkpointing
Definition: FileHeader.h:109
int encode(unsigned char *source, int size, int max_size) const
Calls encode on the the buffer filter chain.
void set_text(const char *new_value, size_t size)
sets the value to a plaintext string
This class stores variables and their values for use by any entity needing state information in a thr...
int64_t load_context(const std::string &filename, std::string &id, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings(true, true, true, false))
Loads the context from a file.
virtual madara::knowledge::KnowledgeRecord evaluate(const madara::knowledge::KnowledgeUpdateSettings &settings)=0
Evaluates the expression tree.
uint64_t last_timestamp
the timestamp for the last checkpoint
Definition: FileHeader.h:114
void print(unsigned int level) const
Atomically prints all variables and values in the context.
int decode(unsigned char *source, int size, int max_size) const
Calls decode on the the buffer filter chain.
uint64_t size
the size of this header plus the updates
Definition: FileHeader.h:99
MADARA_EXPORT utility::Refcounter< logger::Logger > global_logger
void delete_prefix(const std::string &prefix, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Deletes keys starting with the given prefix.
std::string extract_path(const std::string &name)
Extracts the path of a filename.
Definition: Utility.cpp:380
Holds settings for checkpoints to load or save.
void set_xml(const char *new_value, size_t size)
sets the value to an xml string
std::string logic
the logic that was compiled
int64_t save_as_karl(const std::string &filename) const
Saves the context to a file as karl assignments, rather than binary.
Provides knowledge logging services to files and terminals.
Definition: GlobalLogger.h:11
uint32_t karl_version
Version of KaRL installed when file was created.
Definition: FileHeader.h:124
Optimized reference to a variable within the knowledge base.
Compiled, optimized KaRL logic.
std::vector< KnowledgeRecord > FunctionArguments
static uint32_t encoded_size(void)
Returns the size of the encoded FileHeader class, which may be different from sizeof (FileHeader) bec...
Definition: FileHeader.cpp:32
bool is_valid(void) const
Checks to see if the variable reference has been initialized.
void define_function(const std::string &name, knowledge::KnowledgeRecord(*func)(FunctionArguments &, Variables &), const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Defines an external function.
bool clear_knowledge
If true, during loads, clear the KnowledgeBase first.
static uint32_t static_encoded_size(void)
Returns the size of the encoded MessageHeader class.
void set_value(const KnowledgeRecord &new_value)
Sets the value from another KnowledgeRecord, does not copy clock and write_quality.
std::string originator
the originator id of the checkpoint
uint32_t get_write_quality(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Atomically gets write quality of this process for a variable.
knowledge::KnowledgeRecord evaluate(CompiledExpression expression, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Evaluate a compiled expression.
madara::expression::ExpressionTree expression
the expression tree
void set_jpeg(const unsigned char *new_value, size_t size)
sets the value to a jpeg
uint64_t initial_state
the initial state number of interest (useful for loading ranges of checkpoint states).
int64_t save_checkpoint(const std::string &filename, const std::string &id="") const
Saves a checkpoint of a list of changes to a file.
static struct madara::knowledge::tags::string_t string
DeepIterator< Iterator > deep_iterate(const Iterator &i)
Returns an input iterator from an iterator.
Definition: DeepIterator.h:179
int set_xml(const std::string &key, const char *value, size_t size, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets the value of a variable to an XML string.
uint64_t last_lamport_clock
final lamport clock saved in the checkpoint
std::vector< std::string > prefixes
A list of prefixes to save/load.
T * get_ptr(void)
get the underlying pointer
Definition: ScopedArray.inl:68
FunctionMap functions_
map of function names to functions
void to_string(std::string &target, const std::string &array_delimiter=",", const std::string &record_delimiter=";\n", const std::string &key_val_delimiter="=") const
Saves all keys and values into a string, using the underlying knowledge::KnowledgeRecord::to_string f...
bool override_timestamp
use the timestamps in this class instead of current wallclock time when writing context or checkpoint...
#define madara_logger_ptr_log(logger, level,...)
Fast version of the madara::logger::log method for Logger pointers.
Definition: Logger.h:32
std::string to_string_version(uint32_t version)
Converts a MADARA uint32_t version number to human-readable.
Definition: Utility.cpp:68
Holds settings requirements for knowledge, usually in copying.
uint64_t size
the size of this header plus the updates
::std::map< std::string, KnowledgeRecord > KnowledgeMap
CompiledExpression compile(const std::string &expression)
Compiles a KaRL expression into an expression tree.
uint64_t states
the number of states checkpointed in the file stream
Definition: FileHeader.h:104
const VariableReferenceMap & get_local_modified(void) const
Retrieves a list of modified local variables.
An abstract base class defines a simple abstract implementation of an expression tree node...
Definition: ComponentNode.h:36
uint64_t initial_timestamp
initial wallclock time saved in the checkpoint
int set_if_unequal(const std::string &key, madara::knowledge::KnowledgeRecord::Integer value, uint32_t quality, uint64_t clock, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets if the variable value will be different.
VariableReference get_ref(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Atomically returns a reference to the variable.
static bool file_header_test(const char *buffer)
Tests the buffer for a normal message identifier.
Definition: FileHeader.h:91
bool clear_knowledge
If true, during loads, clear the KnowledgeBase first.
void set_changed(void)
Force a change to be registered, waking up anyone waiting on entry.
std::map< const char *, VariableReference, VariableReferenceMapCompare > VariableReferenceMap
a map of variable references
bool always_overwrite
Toggle for always overwriting records, regardless of quality, clock values, etc.
std::vector< VariableReference > VariableReferences
a vector of variable references
int set_text(const std::string &key, const char *value, size_t size, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets the value of a variable to an XML string.
std::string expand_statement(const std::string &statement) const
Expands a string with variable expansion.
KnowledgeMap::value_type * pair_ptr
logger::Logger * logger_
Logger for printing.
Provides functions and classes for the distributed knowledge base.
uint64_t states
the number of states checkpointed in the file stream
uint64_t last_timestamp
final wallclock time saved in the checkpoint
bool override_lamport
use the lamport clocks in this class instead of KB clock when writing context or checkpoints ...
KnowledgeRecord * get_record(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Retrieves a knowledge record from the key.
void reset_checkpoint(void) const
Reset all checkpoint variables in the modified lists.
uint64_t clock
the clock of the sender when the message was generated
knowledge::KnowledgeMap to_map_stripped(const std::string &prefix) const
Creates a map with Knowledge Records that begin with the given prefix.
bool clear(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Clears a variable.
Settings for applying knowledge updates.
void set_write_quality(const std::string &key, uint32_t quality, const KnowledgeReferenceSettings &settings)
Atomically sets write quality of this process for a variable.
Copyright (c) 2015 Carnegie Mellon University.
uint32_t set_quality(const std::string &key, uint32_t quality, bool force_update, const KnowledgeReferenceSettings &settings)
Atomically sets quality of this process for a variable.
int read_file(const std::string &filename, uint32_t read_as_type=0)
reads a file and sets the type appropriately according to the extension
MADARA_EXPORT bool begins_with(const std::string &input, const std::string &prefix)
Check if input contains prefix at the beginning.
Definition: Utility.inl:317
Function * retrieve_function(const std::string &name, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Retrieves an external function.
Defines a robust message header which is the default for KaRL messages.
Definition: MessageHeader.h:57
Settings for applying knowledge updates.
bool reset_checkpoint
If true, resets the checkpoint to start a new diff from this point forward.
Provides an interface for external functions into the MADARA KaRL variable settings.
ssize_t write_file(const std::string &filename, void *buffer, size_t size)
Writes a file with provided contents.
Definition: Utility.cpp:547
madara::knowledge::KnowledgeRecord evaluate(const madara::knowledge::KnowledgeUpdateSettings &settings=knowledge::KnowledgeUpdateSettings())
Evaluates the expression tree.
madara::expression::Interpreter * interpreter_
KaRL interpreter.
int read_file(const std::string &key, const std::string &filename, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically reads a file into a variable.
MADARA_EXPORT bool ends_with(const std::string &input, const std::string &ending)
Check if input contains a pattern at the end.
Definition: Utility.inl:331
uint64_t clock
last modification time
int64_t save_as_json(const std::string &filename) const
Saves the context to a file as JSON.
virtual char * write(char *buffer, int64_t &buffer_remaining)
Writes a MessageHeader instance to a buffer and updates the amount of buffer room remaining...