MADARA  3.4.1
SharedMemoryPush.cpp
Go to the documentation of this file.
1 
4 #include "SharedMemoryPush.h"
5 
7  const std::string & id,
10 : madara::transport::Base (id, new_settings, knowledge.get_context ())
11 {
12  // populate variables like buffer_ based on transport settings
13  Base::setup ();
14 }
15 
17 {
18 }
19 
20 long
22  const madara::knowledge::KnowledgeMap & modifieds)
23 {
27  long result (0);
28 
29  const char * print_prefix = "madara::transport::SharedMemoryPush";
30 
31  // copy the cosnt modifieds list to a nonconst version for filtering
32  knowledge::KnowledgeMap filtered_updates = modifieds;
33 
34  transport::TransportContext send_context(
36  receive_monitor_.get_bytes_per_second(),
37  send_monitor_.get_bytes_per_second(), (uint64_t)utility::get_time(),
38  (uint64_t)utility::get_time(), settings_.write_domain, id_);
39 
44  if(settings_.get_number_of_send_aggregate_filters() > 0 &&
45  filtered_updates.size() > 0)
46  {
47  settings_.filter_send(filtered_updates, send_context);
48  }
49  else
50  {
52  "%s:"
53  " No aggregate send filters were applied...\n",
54  print_prefix);
55  }
56 
61  if(settings_.get_number_of_receive_aggregate_filters() > 0 &&
62  filtered_updates.size() > 0)
63  {
64  settings_.filter_receive(filtered_updates, send_context);
65  }
66  else
67  {
69  "%s:"
70  " No aggregate receive filters were applied...\n",
71  print_prefix);
72  }
73 
77  for (auto kb : kbs_)
78  {
79  // try not to update ourself
80  if (id_ == kb.get_id())
81  {
82  continue;
83  }
84 
85  knowledge::ContextGuard guard(kb);
86  knowledge::ThreadSafeContext & context = kb.get_context();
87 
89  "%s:"
90  " Applying updates to context.\n",
91  print_prefix);
92 
93  uint64_t now = utility::get_time();
94 
95  for(auto i : filtered_updates)
96  {
97  const auto apply = [&](knowledge::KnowledgeRecord& record) {
98  int result = 0;
99 
100  record.set_toi(now);
101  result = record.apply(
102  context, i.first, i.second.quality, i.second.clock, false);
103 
104  if(result != 1)
105  {
107  "%s:"
108  " update %s=%s was rejected\n",
109  print_prefix, i.first.c_str(), record.to_string().c_str());
110  }
111  else
112  {
114  "%s:"
115  " update %s=%s was accepted\n",
116  print_prefix, i.first.c_str(), record.to_string().c_str());
117  }
118  };
119 
120  apply(i.second);
121  }
122  }
123 
124  return result;
125 }
#define madara_logger_log(loggering, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
const ThreadSafeContext * context_
A thread-safe guard for a context or knowledge base.
Definition: ContextGuard.h:24
This class provides a distributed knowledge base to users.
Definition: KnowledgeBase.h:45
This class encapsulates an entry in a KnowledgeBase.
This class stores variables and their values for use by any entity needing state information in a thr...
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
Base class from which all transports must be derived.
Definition: Transport.h:46
virtual int setup(void)
all subclasses should call this method at the end of its setup
Definition: Transport.cpp:32
virtual long send_data(const knowledge::KnowledgeMap &modifieds) override
Sends a list of updates to the domain.
SharedMemoryPush(const std::string &id, madara::transport::TransportSettings &new_settings, knowledge::KnowledgeBase &context)
Constructor.
Provides context about the transport.
Holds basic transport settings.
constexpr string_t string
Provides functions and classes for the distributed knowledge base.
::std::map< std::string, KnowledgeRecord > KnowledgeMap
int64_t get_time(void)
Returns a time of day in nanoseconds If simtime feature is enabled, this may be simulation time inste...
Definition: Utility.inl:265
Copyright(c) 2020 Galois.