MADARA  3.4.1
ThreadSafeContext.inl
Go to the documentation of this file.
1 #ifndef _MADARA_THREADSAFECONTEXT_INL_
2 #define _MADARA_THREADSAFECONTEXT_INL_
3 
13 
14 #include <sstream>
15 
16 namespace madara
17 {
18 namespace knowledge
19 {
21  const std::string& filename, const KnowledgeUpdateSettings& settings)
22 {
23  VariableReference variable = get_ref(key, settings);
24  return read_file(variable, filename, settings);
25 }
26 
28  const std::string& key, const KnowledgeReferenceSettings& settings) const
29 {
30  const KnowledgeRecord* ret = with(key, settings);
31  if (ret)
32  {
33  if (settings.exception_on_unitialized && !ret->exists())
34  {
35  std::stringstream buffer;
36  buffer << "ERROR: settings do not allow reads of unset vars and ";
37  buffer << key << " is uninitialized";
38  throw exceptions::UninitializedException (buffer.str ());
39  }
40 
41  if (ret->has_history())
42  {
43  return ret->get_newest();
44  }
45  else
46  {
47  return *ret;
48  }
49  }
50  else
51  {
52  if (settings.exception_on_unitialized)
53  {
54  std::stringstream buffer;
55  buffer << "ERROR: settings do not allow reads of unset vars and ";
56  buffer << key << " is uninitialized";
57  throw exceptions::UninitializedException (buffer.str ());
58  }
59  }
60  return KnowledgeRecord();
61 }
62 
64  const KnowledgeReferenceSettings& settings) const
65 {
66  const KnowledgeRecord* ret = with(variable, settings);
67  if (ret)
68  {
69  if (settings.exception_on_unitialized && !ret->exists())
70  {
71  std::stringstream buffer;
72  buffer << "ERROR: settings do not allow reads of unset vars and ";
73  buffer << variable.get_name() << " is uninitialized";
74  throw exceptions::UninitializedException (buffer.str ());
75  }
76 
77  if (ret->has_history())
78  {
79  return ret->get_newest();
80  }
81  else
82  {
83  return *ret;
84  }
85  }
86  else
87  {
88  if (settings.exception_on_unitialized)
89  {
90  std::stringstream buffer;
91  buffer << "ERROR: settings do not allow reads of unset vars and ";
92  buffer << variable.get_name() << " is uninitialized";
93  throw exceptions::UninitializedException (buffer.str ());
94  }
95  }
96  return KnowledgeRecord();
97 }
98 
100  const std::string& key, const KnowledgeReferenceSettings& settings) const
101 {
102  const KnowledgeRecord* ret = with(key, settings);
103  if (ret)
104  {
105  if (settings.exception_on_unitialized && !ret->exists())
106  {
107  std::stringstream buffer;
108  buffer << "ERROR: settings do not allow reads of unset vars and ";
109  buffer << key << " is uninitialized";
110  throw exceptions::UninitializedException (buffer.str ());
111  }
112 
113  return *ret;
114  }
115  return KnowledgeRecord();
116 }
117 
119  const VariableReference& variable,
120  const KnowledgeReferenceSettings& settings) const
121 {
122  const KnowledgeRecord* ret = with(variable, settings);
123  if (ret)
124  {
125  if (settings.exception_on_unitialized && !ret->exists())
126  {
127  std::stringstream buffer;
128  buffer << "ERROR: settings do not allow reads of unset vars and ";
129  buffer << variable.get_name() << " is uninitialized";
130  throw exceptions::UninitializedException (buffer.str ());
131  }
132 
133  return *ret;
134  }
135  return KnowledgeRecord();
136 }
137 
139  const std::string& key, const KnowledgeReferenceSettings& settings)
140 {
141  KnowledgeMap::iterator found;
142 
143  MADARA_GUARD_TYPE guard(mutex_);
144 
145  if (settings.expand_variables)
146  {
147  std::string cur_key = expand_statement(key);
148  found = map_.find(cur_key);
149  }
150  else
151  {
152  found = map_.find(key);
153  }
154 
155  if (found != map_.end())
156  {
157  return &found->second;
158  }
159 
160  return nullptr;
161 }
162 
163 // return the value of a variable
165  const VariableReference& variable,
166  const KnowledgeReferenceSettings& settings)
167 {
168  MADARA_GUARD_TYPE guard(mutex_);
169 
170  KnowledgeRecord* ret = variable.get_record_unsafe();
171 
172  if (settings.exception_on_unitialized && !ret->exists())
173  {
174  std::stringstream buffer;
175  buffer << "ERROR: settings do not allow reads of unset vars and ";
176  buffer << variable.get_name() << " is uninitialized";
177  throw exceptions::UninitializedException (buffer.str ());
178  }
179 
180  return ret;
181 }
182 
184  const std::string& key, const KnowledgeReferenceSettings& settings) const
185 {
186  KnowledgeMap::const_iterator found;
187 
188  MADARA_GUARD_TYPE guard(mutex_);
189 
190  if (settings.expand_variables)
191  {
192  std::string cur_key = expand_statement(key);
193  found = map_.find(cur_key);
194  }
195  else
196  {
197  found = map_.find(key);
198  }
199 
200  if (found != map_.end())
201  {
202  if (settings.exception_on_unitialized && !found->second.exists())
203  {
204  std::stringstream buffer;
205  buffer << "ERROR: settings do not allow reads of unset vars and ";
206  buffer << found->first << " is uninitialized";
207  throw exceptions::UninitializedException (buffer.str ());
208  }
209 
210  return &found->second;
211  }
212 
213  return nullptr;
214 }
215 
216 // return the value of a variable
218  const VariableReference& variable,
219  const KnowledgeReferenceSettings& settings) const
220 {
221  MADARA_GUARD_TYPE guard(mutex_);
222 
223  KnowledgeRecord* ret = variable.get_record_unsafe();
224 
225  if (settings.exception_on_unitialized && !ret->exists())
226  {
227  std::stringstream buffer;
228  buffer << "ERROR: settings do not allow reads of unset vars and ";
229  buffer << variable.get_name() << " is uninitialized";
230  throw exceptions::UninitializedException (buffer.str ());
231  }
232 
233  return ret;
234 }
235 
236 // return the value of a variable
238  const VariableReference& variable,
239  const KnowledgeReferenceSettings& settings) const
240 {
241  MADARA_GUARD_TYPE guard(mutex_);
242 
243  auto ret = variable.get_record_unsafe();
244 
245  if (settings.exception_on_unitialized && !ret->exists())
246  {
247  std::stringstream buffer;
248  buffer << "ERROR: settings do not allow reads of unset vars and ";
249  buffer << variable.get_name() << " is uninitialized";
250  throw exceptions::UninitializedException (buffer.str ());
251  }
252 
253  return ret && ret->exists();
254 }
255 
256 // return the value of a variable
258  const VariableReference& variable, size_t index,
259  const KnowledgeReferenceSettings& settings)
260 {
261  MADARA_GUARD_TYPE guard(mutex_);
262 
263  auto record = variable.get_record_unsafe();
264 
265  if (settings.exception_on_unitialized && !record->exists())
266  {
267  std::stringstream buffer;
268  buffer << "ERROR: settings do not allow reads of unset vars and ";
269  buffer << variable.get_name() << " is uninitialized";
270  throw exceptions::UninitializedException (buffer.str ());
271  }
272 
273  if (record)
274  return record->retrieve_index(index);
275  else
277 }
278 
280  size_t index, const KnowledgeReferenceSettings& settings)
281 {
282  VariableReference variable = get_ref(key, settings);
283 
284  return retrieve_index(variable, index, settings);
285 }
286 
287 template<typename T>
289  const std::string& key, T&& value, const KnowledgeUpdateSettings& settings)
290 {
291  VariableReference variable = get_ref(key, settings);
292  return set(variable, std::forward<T>(value), settings);
293 }
294 
295 template<typename T>
296 inline int ThreadSafeContext::set(const VariableReference& variable, T&& value,
297  const KnowledgeUpdateSettings& settings)
298 {
299  MADARA_GUARD_TYPE guard(mutex_);
300 
301  if (variable.is_valid())
302  return set_unsafe(variable, std::forward<T>(value), settings);
303  else
304  return -1;
305 }
306 
307 template<typename T>
308 inline int ThreadSafeContext::set(const std::string& key, const T* value,
309  uint32_t size, const KnowledgeUpdateSettings& settings)
310 {
311  VariableReference variable = get_ref(key, settings);
312  return set(variable, value, size, settings);
313 }
314 
315 // set the value of a variable
316 template<typename T>
317 inline int ThreadSafeContext::set(const VariableReference& variable,
318  const T* value, uint32_t size, const KnowledgeUpdateSettings& settings)
319 {
320  MADARA_GUARD_TYPE guard(mutex_);
321  if (variable.is_valid())
322  {
323  return set_unsafe_impl(variable, settings, value, size);
324  }
325  else
326  return -1;
327 }
328 
329 template<typename... Args>
331  const KnowledgeUpdateSettings& settings, Args&&... args)
332 {
333  auto record = variable.get_record_unsafe();
334 
335  // check if we have the appropriate write quality
336  if (!settings.always_overwrite && record->write_quality < record->quality)
337  return -2;
338  else
339  record->quality = 0;
340 
341  record->set_value(std::forward<Args>(args)...);
342  record->quality = record->write_quality;
343  record->clock = clock_;
344  record->set_toi(utility::get_time());
345 
346  return 0;
347 }
348 
349 template<typename T>
351  T&& value, const KnowledgeUpdateSettings& settings)
352 {
353  int ret = set_unsafe_impl(variable, settings, std::forward<T>(value));
354 
355  if (ret == 0)
356  mark_and_signal(variable, settings);
357 
358  return ret;
359 }
360 
361 template<typename T>
362 // set the value of a variable
364  const T* value, size_t size, const KnowledgeUpdateSettings& settings)
365 {
366  int ret = set_unsafe_impl(variable, settings, value, size);
367 
368  if (ret == 0)
369  mark_and_signal(variable, settings);
370 
371  return ret;
372 }
373 
374 inline int ThreadSafeContext::set_xml(const std::string& key, const char* value,
375  size_t size, const KnowledgeUpdateSettings& settings)
376 {
377  VariableReference variable = get_ref(key, settings);
378  return set_xml(variable, value, size, settings);
379 }
380 
382  const char* value, size_t size, const KnowledgeUpdateSettings& settings)
383 {
384  VariableReference variable = get_ref(key, settings);
385  return set_text(variable, value, size, settings);
386 }
387 
389  const unsigned char* value, size_t size,
390  const KnowledgeUpdateSettings& settings)
391 {
392  VariableReference variable = get_ref(key, settings);
393  return set_jpeg(variable, value, size, settings);
394 }
395 
397  const unsigned char* value, size_t size,
398  const KnowledgeUpdateSettings& settings)
399 {
400  VariableReference variable = get_ref(key, settings);
401  return set_file(variable, value, size, settings);
402 }
403 
404 template<typename T>
405 inline int ThreadSafeContext::set_index(const std::string& key, size_t index,
406  T&& value, const KnowledgeUpdateSettings& settings)
407 {
408  VariableReference variable = get_ref(key, settings);
409  return set_index(variable, index, std::forward<T>(value), settings);
410 }
411 
412 template<typename T>
414  size_t index, T&& value, const KnowledgeUpdateSettings& settings)
415 {
416  MADARA_GUARD_TYPE guard(mutex_);
417  if (variable.is_valid())
418  return set_index_unsafe(variable, index, std::forward<T>(value), settings);
419  else
420  return -1;
421 }
422 
423 template<typename T>
425  const VariableReference& variable, size_t index, T&& value,
426  const KnowledgeUpdateSettings& settings)
427 {
428  auto record = variable.get_record_unsafe();
429  // check if we have the appropriate write quality
430  if (!settings.always_overwrite && record->write_quality < record->quality)
431  return -2;
432  else
433  record->quality = 0;
434 
435  record->set_index(index, std::forward<T>(value));
436  record->quality = record->write_quality;
437  record->clock = clock_;
438  record->set_toi(utility::get_time());
439 
440  return 0;
441 }
442 
443 template<typename T>
445  const VariableReference& variable, size_t index, T&& value,
446  const KnowledgeUpdateSettings& settings)
447 {
448  int ret =
449  set_index_unsafe_impl(variable, index, std::forward<T>(value), settings);
450 
451  if (ret == 0)
452  mark_and_signal(variable, settings);
453 
454  return ret;
455 }
456 
458  const std::string& key, const KnowledgeUpdateSettings& settings)
459 {
460  VariableReference variable = get_ref(key, settings);
461  return inc(variable, settings);
462 }
463 
465  const std::string& key, const KnowledgeUpdateSettings& settings)
466 {
467  VariableReference variable = get_ref(key, settings);
468  return dec(variable, settings);
469 }
470 
472  const VariableReference& variable, const KnowledgeUpdateSettings& settings)
473 {
474  MADARA_GUARD_TYPE guard(mutex_);
475  auto record = variable.get_record_unsafe();
476  if (record)
477  {
478  // check if we have the appropriate write quality
479  if (settings.always_overwrite || record->write_quality >= record->quality)
480  {
481  ++(*record);
482  record->quality = record->write_quality;
483  record->clock = clock_;
484  record->set_toi(utility::get_time());
485  mark_and_signal(variable, settings);
486  }
487 
488  return *record;
489  }
490 
492 }
493 
494 #ifndef _MADARA_NO_KARL_
495 
496 // return whether or not the key exists
497 inline bool ThreadSafeContext::delete_expression(const std::string& expression)
498 {
499  MADARA_GUARD_TYPE guard(mutex_);
500 
501  return interpreter_->delete_expression(expression);
502 }
503 
504 #endif // _MADARA_NO_KARL_
505 
507  const std::string& key, const KnowledgeReferenceSettings& settings)
508 {
509  // enter the mutex
510  bool found(false);
511  std::string key_actual;
512  const std::string* key_ptr;
513  MADARA_GUARD_TYPE guard(mutex_);
514 
515  if (settings.expand_variables)
516  {
517  key_actual = expand_statement(key);
518  key_ptr = &key_actual;
519  }
520  else
521  key_ptr = &key;
522 
523  // find the key and update found with result of find
524  KnowledgeMap::iterator record = map_.find(*key_ptr);
525  found = record != map_.end();
526 
527  if (found)
528  {
529  record->second.clear_value();
530  }
531 
532  return found;
533 }
534 
535 inline bool ThreadSafeContext::clear(const VariableReference& variable)
536 {
537  if (variable.is_valid())
538  {
539  MADARA_GUARD_TYPE guard(mutex_);
540 
541  // erase any changed or local changed map entries
542  // changed_map_.erase (variable.entry_->first.c_str ());
543  // local_changed_map_.erase (variable.entry_->first.c_str ());
544 
545  variable.entry_->second.clear_value();
546 
547  return true;
548  }
549  else
550  {
551  return false;
552  }
553 }
554 
555 // return whether or not the key exists
557  const std::string& key, const KnowledgeReferenceSettings& settings)
558 {
559  // enter the mutex
560  std::string key_actual;
561  bool result(false);
562 
563  const std::string* key_ptr;
564  MADARA_GUARD_TYPE guard(mutex_);
565 
566  if (settings.expand_variables)
567  {
568  key_actual = expand_statement(key);
569  key_ptr = &key_actual;
570  }
571  else
572  key_ptr = &key;
573 
574  // erase any changed or local changed map entries
575  changed_map_.erase(key_ptr->c_str());
576  local_changed_map_.erase(key_ptr->c_str());
577 
578  // erase the map
579  result = map_.erase(*key_ptr) == 1;
580 
581  return result;
582 }
583 
584 // return whether or not the key exists
587 {
588  // enter the mutex
589  MADARA_GUARD_TYPE guard(mutex_);
590 
591  // erase any changed or local changed map entries
592  changed_map_.erase(var.entry_->first.c_str());
593  local_changed_map_.erase(var.entry_->first.c_str());
594 
595  // erase the map
596  return map_.erase(var.entry_->first.c_str()) == 1;
597 }
598 
599 inline void ThreadSafeContext::delete_variables(KnowledgeMap::iterator begin,
600  KnowledgeMap::iterator end, const KnowledgeReferenceSettings&)
601 {
602  for (auto cur = begin; cur != end; ++cur)
603  {
604  changed_map_.erase(cur->first.c_str());
605  local_changed_map_.erase(cur->first.c_str());
606  }
607  map_.erase(begin, end);
608 }
609 
610 // return whether or not the key exists
612  const std::string& key, const KnowledgeReferenceSettings& settings) const
613 {
614  // enter the mutex
615  std::string key_actual;
616  const std::string* key_ptr;
617  MADARA_GUARD_TYPE guard(mutex_);
618 
619  if (settings.expand_variables)
620  {
621  key_actual = expand_statement(key);
622  key_ptr = &key_actual;
623  }
624  else
625  key_ptr = &key;
626 
627  // if key is not null
628  if (*key_ptr != "")
629  {
630  // find the key in the knowledge base
631  KnowledgeMap::const_iterator found = map_.find(*key_ptr);
632 
633  // if it's found, then return the value
634  if (found != map_.end())
635  return found->second.status() != knowledge::KnowledgeRecord::UNCREATED;
636  }
637 
638  // if no match, return empty (0)
639  return false;
640 }
641 
642 // Atomically decrement a stored value. Only reason we are inlining this
643 // function is because it is called by only one function, and we can save a bit
644 // of execution time via expansion into that function call.
646  const VariableReference& variable, const KnowledgeUpdateSettings& settings)
647 {
648  MADARA_GUARD_TYPE guard(mutex_);
649  auto record = variable.get_record_unsafe();
650  if (record)
651  {
652  // check if we have the appropriate write quality
653  if (settings.always_overwrite || record->write_quality >= record->quality)
654  {
655  --(*record);
656  record->quality = record->write_quality;
657  record->clock = clock_;
658  record->set_toi(utility::get_time());
659 
660  mark_and_signal(variable, settings);
661  }
662 
663  return *record;
664  }
665 
667 }
668 
671 inline uint64_t ThreadSafeContext::set_clock(uint64_t clock)
672 {
673  MADARA_GUARD_TYPE guard(mutex_);
674 
675  // clock_ is always increasing. We never reset it to a lower clock value
676  // user can check return value to see if the clock was set.
677  if (clock_ < clock)
678  clock_ = clock;
679 
680  return clock_;
681 }
682 
685 inline uint64_t ThreadSafeContext::set_clock(const std::string& key,
686  uint64_t clock, const KnowledgeReferenceSettings& settings)
687 {
688  // enter the mutex
689  std::string key_actual;
690  const std::string* key_ptr;
691  MADARA_GUARD_TYPE guard(mutex_);
692 
693  if (settings.expand_variables)
694  {
695  key_actual = expand_statement(key);
696  key_ptr = &key_actual;
697  }
698  else
699  key_ptr = &key;
700 
701  // check for null key
702  if (*key_ptr == "")
703  return 0;
704 
705  // create the key if it didn't exist
706  knowledge::KnowledgeRecord& record = map_[*key_ptr];
707 
708  // check for value already set
709  if (record.clock < clock)
710  {
711  record.clock = clock;
712 
713  // try to update the global clock as well
714  this->set_clock(clock);
715  }
716 
717  return record.clock;
718 }
719 
723  const std::string& key, const KnowledgeUpdateSettings& settings)
724 {
725  // enter the mutex
726  std::string key_actual;
727  const std::string* key_ptr;
728  MADARA_GUARD_TYPE guard(mutex_);
729 
730  if (settings.expand_variables)
731  {
732  key_actual = expand_statement(key);
733  key_ptr = &key_actual;
734  }
735  else
736  key_ptr = &key;
737 
738  // check for null key
739  if (*key_ptr == "")
740  return 0;
741 
742  // create the key if it didn't exist
743  knowledge::KnowledgeRecord& record = map_[*key_ptr];
744 
745  return record.clock += settings.clock_increment;
746 }
747 
750  const KnowledgeUpdateSettings& settings)
751 {
752  MADARA_GUARD_TYPE guard(mutex_);
753  return clock_ += settings.clock_increment;
754 }
755 
758 inline uint64_t ThreadSafeContext::get_clock(void) const
759 {
760  MADARA_GUARD_TYPE guard(mutex_);
761  return clock_;
762 }
763 
765 {
766  MADARA_GUARD_TYPE guard(mutex_);
767  return *logger_;
768 }
769 
771 {
772  MADARA_GUARD_TYPE guard(mutex_);
773  logger_ = &logger;
774 }
775 
778  const std::string& key, const KnowledgeReferenceSettings& settings) const
779 {
780  // enter the mutex
781  std::string key_actual;
782  const std::string* key_ptr;
783  MADARA_GUARD_TYPE guard(mutex_);
784 
785  if (settings.expand_variables)
786  {
787  key_actual = expand_statement(key);
788  key_ptr = &key_actual;
789  }
790  else
791  key_ptr = &key;
792 
793  // check for null key
794  if (*key_ptr == "")
795  return 0;
796 
797  // find the key in the knowledge base
798  KnowledgeMap::const_iterator found = map_.find(*key_ptr);
799 
800  // if it's found, then compare the value
801  if (found != map_.end())
802  {
803  return found->second.clock;
804  }
805  else
806  // key does not exist
807  return 0;
808 }
809 
812 inline void ThreadSafeContext::lock(void) const
813 {
814  mutex_.lock();
815 }
816 
817 inline bool ThreadSafeContext::try_lock(void) const
818 {
819  return mutex_.try_lock();
820 }
821 
823 inline void ThreadSafeContext::unlock(void) const
824 {
825  mutex_.unlock();
826 }
827 
831  const std::string& statement, unsigned int level) const
832 {
834  logger_, (int)level, this->expand_statement(statement).c_str());
835 }
836 
837 // clear all variables and their values
838 inline void ThreadSafeContext::clear(bool erase)
839 {
840  // enter the mutex
841  MADARA_GUARD_TYPE guard(mutex_);
842 
843  changed_map_.clear();
844  local_changed_map_.clear();
845 
846  if (erase)
847  {
848  map_.clear();
849  }
850  else
851  {
852  for (KnowledgeMap::iterator i = map_.begin(); i != map_.end(); ++i)
853  {
854  i->second.reset_value();
855  }
856  }
857 
858  changed_.MADARA_CONDITION_NOTIFY_ONE();
859 }
860 
863 inline void ThreadSafeContext::wait_for_change(bool extra_release)
864 {
865  // enter the mutex
866  MADARA_GUARD_TYPE guard(mutex_);
867 
868  // if the caller is relying on a recursive call (e.g. KnowlegeBase::wait),
869  // we'll need to call an extra release for this to work. Otherwise, the
870  // context would remain locked to the calling thread - even though it will
871  // now be put to sleep
872  if (extra_release)
873  mutex_.MADARA_LOCK_UNLOCK();
874 
875  changed_.wait(mutex_);
876 
877  // if (extra_release)
878  // mutex_.MADARA_LOCK_LOCK ();
879 }
880 
882  const std::string& key, const KnowledgeUpdateSettings& settings)
883 {
884  VariableReference ref = get_ref(key, settings);
885  if (ref.is_valid())
886  mark_to_send(ref, settings);
887 }
888 
890  const VariableReference& ref, const KnowledgeUpdateSettings& settings)
891 {
892  MADARA_GUARD_TYPE guard(mutex_);
893  if (ref.is_valid())
894  {
895  mark_to_send_unsafe(ref, settings);
896  }
897 }
898 
901 {
902  changed_map_[ref.get_name()] = std::move(ref);
903 
904  KnowledgeRecord& record = *ref.get_record_unsafe();
905  if (record.status() != KnowledgeRecord::MODIFIED)
906  record.set_modified();
907 }
908 
910  const std::string& key, const KnowledgeUpdateSettings& settings)
911 {
912  VariableReference ref = get_ref(key, settings);
913  if (ref.is_valid())
914  mark_to_checkpoint(ref, settings);
915 }
916 
918  const VariableReference& ref, const KnowledgeUpdateSettings& settings)
919 {
920  MADARA_GUARD_TYPE guard(mutex_);
921  if (ref.is_valid())
922  {
923  mark_to_checkpoint_unsafe(ref, settings);
924  }
925 }
926 
929 {
930  local_changed_map_[ref.get_name()] = std::move(ref);
931 
932  KnowledgeRecord& record = *ref.get_record_unsafe();
933  if (record.status() != KnowledgeRecord::MODIFIED)
934  record.set_modified();
935 }
936 
938  VariableReference ref, const KnowledgeUpdateSettings& settings)
939 {
940  // otherwise set the value
941  if (ref.get_name()[0] != '.' || settings.treat_locals_as_globals)
942  {
943  if (!settings.treat_globals_as_locals)
944  {
945  mark_to_send_unsafe(std::move(ref));
946  }
947  }
948 
949  // track local changes now checkpoints all state, not just local
950  if (settings.track_local_changes)
951  {
952  mark_to_checkpoint_unsafe(std::move(ref));
953  }
954 
955  if (settings.stream_changes && streamer_ != nullptr)
956  {
957  auto rec_ptr = ref.get_record_unsafe();
958  if (rec_ptr->has_history())
959  {
960  rec_ptr = &rec_ptr->ref_newest();
961  }
962 
963  streamer_->enqueue(ref.get_name(), *rec_ptr);
964  }
965 
966  if (settings.signal_changes)
967  changed_.MADARA_CONDITION_NOTIFY_ALL();
968 }
969 
971  const std::string& key, const KnowledgeUpdateSettings& settings)
972 {
973  VariableReference ref = get_ref(key, settings);
974  if (ref.is_valid())
975  mark_modified(ref, settings);
976 }
977 
979  const VariableReference& ref, const KnowledgeUpdateSettings& settings)
980 {
981  MADARA_GUARD_TYPE guard(mutex_);
982 
983  auto record = ref.get_record_unsafe();
984 
985  record->clock = clock_;
986  record->set_toi(utility::get_time());
987 
988  mark_and_signal(ref, settings);
989 }
990 
992 {
993  MADARA_GUARD_TYPE guard(mutex_);
994  std::stringstream result;
995 
996  result << changed_map_.size() << " modifications ready to send:\n";
997 
998  for (auto& entry : changed_map_)
999  {
1000  KnowledgeRecord& record = *entry.second.get_record_unsafe();
1001  if (record.is_binary_file_type())
1002  {
1003  result << "File: ";
1004  }
1005  else if (record.type() == knowledge::KnowledgeRecord::DOUBLE)
1006  {
1007  result << "Double: ";
1008  }
1009  else if (record.type() == knowledge::KnowledgeRecord::DOUBLE_ARRAY)
1010  {
1011  result << "Double array: ";
1012  }
1013  else if (record.type() == knowledge::KnowledgeRecord::INTEGER)
1014  {
1015  result << "Integer: ";
1016  }
1017  else if (record.type() == knowledge::KnowledgeRecord::INTEGER_ARRAY)
1018  {
1019  result << "Integer array: ";
1020  }
1021  else if (record.is_string_type())
1022  {
1023  result << "String: ";
1024  }
1025  else
1026  {
1027  result << "Unknown: ";
1028  }
1029 
1030  result << entry.first << " = " << record.to_string() << "\n";
1031  }
1032 
1033  return result.str();
1034 }
1035 
1038 {
1039  MADARA_GUARD_TYPE guard(mutex_);
1040 
1041  return changed_map_;
1042 }
1043 
1045  const std::map<std::string, bool> & send_list, bool reset)
1046 {
1047  MADARA_GUARD_TYPE guard(mutex_);
1048 
1049  KnowledgeMap map;
1050 
1051  // if there are no limiting prefixes, iterate through and reset
1052  if (send_list.size() == 0)
1053  {
1054  for (auto i = changed_map_.begin();
1055  i != changed_map_.end(); )
1056  {
1057  map.emplace_hint (map.end(),
1058  i->first, *i->second.get_record_unsafe());
1059 
1060  if (reset)
1061  {
1062  i = changed_map_.erase(i);
1063  }
1064  }
1065  }
1066  // if there are limiting prefixes, only copy over the prefixes
1067  else
1068  {
1069  // if the prefixes list is smaller than the changed_map_
1070  if (send_list.size() < changed_map_.size())
1071  {
1072  for (auto var : send_list)
1073  {
1074  auto found = changed_map_.find(var.first.c_str());
1075 
1076  if (found != changed_map_.end())
1077  {
1078  map.emplace_hint(
1079  map.end(), found->first, *found->second.get_record_unsafe());
1080  changed_map_.erase(found);
1081  }
1082  }
1083  }
1084  // else if the changed_map_ is smaller than the prefixes list
1085  else
1086  {
1087  for (auto i = changed_map_.begin(); i != changed_map_.end();)
1088  {
1089  if (send_list.find (i->first) != send_list.end())
1090  {
1091  map.emplace_hint (map.end(),
1092  i->first, *i->second.get_record_unsafe());
1093 
1094  if (reset)
1095  {
1096  i = changed_map_.erase (i);
1097  }
1098  }
1099  }
1100  }
1101  }
1102 
1103  return map;
1104 }
1105 
1107 {
1108  MADARA_GUARD_TYPE guard(mutex_);
1109 
1110  VariableReferences snapshot;
1111  snapshot.reserve(changed_map_.size());
1112  int cur = 0;
1113 
1115  "ThreadSafeContext::save_modifieds:"
1116  " changed_map.size=%d, snapshot.size=%d\n",
1117  (int)changed_map_.size(), (int)snapshot.size());
1118 
1119  for (auto& entry : changed_map_)
1120  {
1122  "ThreadSafeContext::save_modifieds:"
1123  " snapshot[%d].name=%s\n",
1124  cur, entry.first);
1125 
1126  snapshot.emplace_back(entry.second);
1127  }
1128 
1129  return snapshot;
1130 }
1131 
1133  const VariableReferences& modifieds) const
1134 {
1135  MADARA_GUARD_TYPE guard(mutex_);
1136 
1137  for (auto& entry : modifieds)
1138  {
1139  changed_map_.insert(std::make_pair(entry.get_name(), entry));
1140  }
1141 }
1142 
1145  void) const
1146 {
1147  MADARA_GUARD_TYPE guard(mutex_);
1148 
1149  return local_changed_map_;
1150 }
1151 
1154 {
1155  MADARA_GUARD_TYPE guard(mutex_);
1156 
1157  changed_map_.clear();
1158 }
1159 
1162 {
1163  MADARA_GUARD_TYPE guard(mutex_);
1164 
1165  // each synchronization counts as an event, since this is a
1166  // pretty important networking event
1167 
1168  //++this->clock_;
1169 
1170  for (KnowledgeMap::iterator i = map_.begin(); i != map_.end(); ++i)
1171  for (auto& entry : map_)
1172  {
1173  if (entry.first.size() > 0 && entry.first[0] != '.')
1174  {
1175  // local or global doesn't matter. Clock and modification
1176  // aren't really a part of local variable checking anyway
1177  // i->second.status = KnowledgeRecord::MODIFIED;
1178 
1179  if (entry.second.status() != knowledge::KnowledgeRecord::UNCREATED)
1180  {
1182  }
1183  else
1184  {
1185  entry.second.set_value(KnowledgeRecord::Integer(0));
1186  }
1187 
1188  // i->second.clock = this->clock_;
1189  }
1190  }
1191 }
1192 
1195 {
1196  MADARA_GUARD_TYPE guard(mutex_);
1197 
1198  changed_map_.erase(variable.c_str());
1199 }
1200 
1202 {
1203  MADARA_GUARD_TYPE guard(mutex_);
1204 
1205  local_changed_map_.clear();
1206 }
1207 
1209 inline void ThreadSafeContext::signal(bool lock) const
1210 {
1211  if (lock)
1212  {
1213  MADARA_GUARD_TYPE guard(mutex_);
1214  changed_.MADARA_CONDITION_NOTIFY_ONE();
1215  }
1216  else
1217  changed_.MADARA_CONDITION_NOTIFY_ONE();
1218 }
1219 
1220 inline void ThreadSafeContext::add_logger(const std::string& filename)
1221 {
1222  logger_->add_file(filename);
1223 }
1224 
1226 {
1227  return logger_->get_level();
1228 }
1229 
1230 inline void ThreadSafeContext::set_log_level(int level)
1231 {
1232  logger_->set_level(level);
1233 }
1234 }
1235 }
1236 
1237 #endif // _MADARA_THREADSAFECONTEXT_INL_
#define madara_logger_checked_ptr_log(loggering, level,...)
Fast version of the madara::logger::log method for Logger pointers.
Definition: Logger.h:62
madara::knowledge::KnowledgeRecord KnowledgeRecord
An exception for attempting to access an invalid context1.
bool delete_expression(const std::string &expression)
Attempts to delete an expression from cache.
Definition: Interpreter.inl:70
This class encapsulates an entry in a KnowledgeBase.
bool has_history() const
Return true if this record has a circular buffer history.
std::string to_string(const std::string &delimiter=", ") const
converts the value to a string.
uint32_t type(void) const
returns the type of the value
uint64_t clock
last modification lamport clock time
size_t get_newest(OutputIterator out, size_t count) const
Copy the count newest stored history entries of this record to the given output iterator,...
int status(void) const
returns the status of the record.
bool exists(void) const
Checks if record exists (i.e., is not uncreated)
bool is_binary_file_type(void) const
returns true if the knowledge record has a binary file type
bool is_string_type(void) const
returns true if the record is a string type (STRING, XML, TEXT_FILE)
void set_modified(void)
sets the status to modified
uint32_t quality
priority of the update
Settings for applying knowledge updates.
bool expand_variables
Toggle for always attempting to expand variables (true) or never expanding variables (false)
bool exception_on_unitialized
throw an exception if reference is on uninitialized variable
Settings for applying knowledge updates.
uint64_t clock_increment
Default clock increment.
bool signal_changes
Toggle whether to signal changes have happened.
bool treat_locals_as_globals
Toggle whether updates to local variables are treated as global variables that should be sent over th...
bool always_overwrite
Toggle for always overwriting records, regardless of quality, clock values, etc.
bool stream_changes
Toggle for streaming support.
bool treat_globals_as_locals
Toggle whether updates to global variables are treated as local variables and not marked as modified ...
bool track_local_changes
Toggle for checkpointing support.
int set_index_unsafe(const VariableReference &variable, size_t index, T &&value, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
NON-Atomically sets the value of an array index to a value.
void unlock(void) const
Unlocks the mutex on this context.
bool exists(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings()) const
Atomically checks to see if a variable already exists.
void delete_variables(KnowledgeMap::iterator begin, KnowledgeMap::iterator end, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Deletes variables.
void attach_logger(logger::Logger &logger) const
Attaches a logger to be used for printing.
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
void add_logger(const std::string &filename)
Adds a file to the logger.
bool clear(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Clears a variable.
const VariableReferenceMap & get_modifieds(void) const
Retrieves a list of modified variables.
int read_file(const std::string &key, const std::string &filename, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically reads a file into a variable.
madara::knowledge::KnowledgeRecord get(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings()) const
Atomically returns the current value of a variable.
madara::expression::Interpreter * interpreter_
KaRL interpreter.
logger::Logger * logger_
Logger for printing.
int set_unsafe(const VariableReference &variable, T &&value, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
NON-Atomically sets the value of a variable to the specific value.
void mark_to_checkpoint(const VariableReference &variable, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Changes variable reference to modified at current clock for the purposes of checkpointing (even if it...
void apply_modified(void)
Changes all global variables to modified at current clock.
void add_modifieds(const VariableReferences &modifieds) const
Adds a list of VariableReferences to the current modified list.
std::unique_ptr< BaseStreamer > streamer_
Streaming provider for saving all updates.
void signal(bool lock=true) const
Signals that this thread is done with the context.
void mark_modified(const VariableReference &variable, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Marks the variable reference as updated for the purposes of sending or checkpointing knowledge (for g...
const VariableReferenceMap & get_local_modified(void) const
Retrieves a list of modified local variables.
int set_index_unsafe_impl(const VariableReference &variable, size_t index, T &&value, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
bool delete_expression(const std::string &expression)
Deletes the expression from the interpreter cache.
int set_text(const std::string &key, const char *value, size_t size, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets the value of a variable to an XML string.
madara::knowledge::KnowledgeRecord get_actual(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings()) const
Atomically returns the underlying value of a variable, including history.
int set_index(const std::string &key, size_t index, T &&value, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets the value of an array index to a value.
std::string debug_modifieds(void) const
Retrieves a stringified list of all modified variables that are ready to send over transport on next ...
madara::knowledge::KnowledgeMap map_
Hash table containing variable names and values.
int set_xml(const std::string &key, const char *value, size_t size, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets the value of a variable to an XML string.
int set(const std::string &key, T &&value, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets the value of a variable to the specific record.
bool delete_variable(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Deletes the key.
void mark_to_send(const VariableReference &variable, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Changes variable reference to modified at current clock, and queues it to send, even if it is a local...
int set_jpeg(const std::string &key, const unsigned char *value, size_t size, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets the value of a variable to a JPEG image.
bool try_lock(void) const
Locks the mutex on this context.
VariableReference get_ref(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Atomically returns a reference to the variable.
void mark_to_checkpoint_unsafe(VariableReference ref, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Changes variable to modified at current clock for the purposes of checkpointing.
int set_file(const std::string &key, const unsigned char *value, size_t size, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets the value of a variable to an arbitrary string.
VariableReferences save_modifieds(void) const
Saves the list of modified records to use later for resending.
madara::knowledge::KnowledgeRecord inc(const std::string &key, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically increments the value of the variable.
void mark_and_signal(VariableReference ref, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
method for marking a record modified and signaling changes
int set_unsafe_impl(const VariableReference &variable, const KnowledgeUpdateSettings &settings, Args &&... args)
KnowledgeRecord retrieve_index(const std::string &key, size_t index, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Retrieves a value at a specified index within a knowledge array.
std::string expand_statement(const std::string &statement) const
Expands a string with variable expansion.
int get_log_level(void)
Gets the log level.
KnowledgeMap get_modifieds_current(const std::map< std::string, bool > &send_list, bool reset=true)
Retrieves the current modifieds map.
void lock(void) const
Locks the mutex on this context.
uint64_t set_clock(uint64_t clock)
Atomically sets the lamport clock.
madara::knowledge::KnowledgeRecord dec(const std::string &key, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically decrements the value of the variable.
uint64_t inc_clock(const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically increments the Lamport clock and returns the new clock time (intended for sending knowledg...
void set_log_level(int level)
Sets the log level.
void mark_to_send_unsafe(VariableReference ref, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Changes variable to modified at current clock, and queues it to send, even if it is a local that woul...
void reset_checkpoint(void) const
Reset all checkpoint variables in the modified lists.
void print(unsigned int level) const
Atomically prints all variables and values in the context.
uint64_t get_clock(void) const
Atomically gets the Lamport clock.
void reset_modified(void)
Reset all variables to be unmodified.
void wait_for_change(bool extra_release=false)
Wait for a change to happen to the context.
madara::knowledge::KnowledgeRecord * with(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Atomically returns a reference to the variable.
Optimized reference to a variable within the knowledge base.
const char * get_name(void) const
Returns the name of the variable.
bool is_valid(void) const
Checks to see if the variable reference has been initialized.
KnowledgeRecord * get_record_unsafe(void) const
Returns a pointer to the variable's KnowledgeRecord Do not use this pointer unless you've locked the ...
A multi-threaded logger for logging to one or more destinations.
Definition: Logger.h:165
int get_level(void)
Gets the maximum logging detail level.
Definition: Logger.inl:99
void add_file(const std::string &filename)
Adds a file to the logger.
Definition: Logger.inl:12
void set_level(int level)
Sets the maximum logging detail level.
Definition: Logger.inl:39
constexpr string_t string
Provides functions and classes for the distributed knowledge base.
std::map< const char *, VariableReference, utility::ComparisonLessThan > VariableReferenceMap
a map of variable references
::std::map< std::string, KnowledgeRecord > KnowledgeMap
std::vector< VariableReference > VariableReferences
a vector of variable references
Provides knowledge logging services to files and terminals.
Definition: GlobalLogger.h:12
int64_t get_time(void)
Returns a time of day in nanoseconds If simtime feature is enabled, this may be simulation time inste...
Definition: Utility.inl:265
Copyright(c) 2020 Galois.