MADARA  3.4.1
KnowledgeBaseImpl.cpp
Go to the documentation of this file.
11 #include "madara/Boost.h"
12 
13 #include <sstream>
14 
15 #ifdef _MADARA_USING_ZMQ_
17 #endif
18 
19 #ifdef _USE_OPEN_SPLICE_
21 #endif // _USE_OPEN_SPLICE_
22 
23 #ifdef _USE_NDDS_
25 #endif // _USE_NDDS_
26 
27 #include <iostream>
28 #include <random>
29 
30 namespace utility = madara::utility;
31 
33 
34 namespace madara
35 {
36 namespace knowledge
37 {
39 {
40  // placeholder for our ip address
41  std::string actual_host(std::move(host));
42 
43  if (actual_host == "")
44  {
45  try
46  {
47  actual_host = boost::asio::ip::host_name();
48  }
49  catch (const std::exception&)
50  {
51  actual_host = "localhost";
52  }
53  }
54 
55  if (actual_host.size() > 30)
56  {
57  actual_host.resize(30);
58  }
59 
60  actual_host += ":";
61 
62  boost::mt19937 ran(std::random_device{}());
63  auto uuid = boost::uuids::basic_random_generator<boost::mt19937>(ran)();
64  auto uuid_str = boost::uuids::to_string(uuid);
65  uuid_str.erase(
66  std::remove(uuid_str.begin(), uuid_str.end(), '-'), uuid_str.end());
67 
68  actual_host += uuid_str;
69 
70  return actual_host;
71 }
72 
74  const std::string& id, transport::TransportSettings& settings)
75 {
76  madara::transport::Base* transport(0);
77  std::string originator(id);
78 
79  if (originator == "")
80  {
81  if (id_.size() > 0)
82  originator = id_;
83  else
84  originator = id_ = setup_unique_hostport();
85  }
86 
88  "KnowledgeBaseImpl::attach_transport:"
89  " activating transport type %d\n",
90  settings.type);
91 
92  if (settings.type == madara::transport::BROADCAST)
93  {
95  originator, map_, settings, true);
96  }
97  else if (settings.type == madara::transport::MULTICAST)
98  {
100  originator, map_, settings, true);
101  }
102  else if (settings.type == madara::transport::SPLICE)
103  {
104 #ifdef _USE_OPEN_SPLICE_
106  "KnowledgeBaseImpl::activate_transport:"
107  " creating Open Splice DDS transport.\n");
108 
110  originator, map_, settings, true);
111 #else
113  "KnowledgeBaseImpl::activate_transport:"
114  " project was not generated with opensplice=1. Transport is "
115  "invalid.\n");
116 #endif
117  }
118  else if (settings.type == madara::transport::NDDS)
119  {
120 #ifdef _USE_NDDS_
122  "KnowledgeBaseImpl::activate_transport:"
123  " creating NDDS transport.\n");
124 
125  transport =
126  new madara::transport::NddsTransport(originator, map_, settings, true);
127 #else
129  "KnowledgeBaseImpl::activate_transport:"
130  " project was not generated with ndds=1. Transport is invalid.\n");
131 #endif
132  }
133  else if (settings.type == madara::transport::UDP)
134  {
136  "KnowledgeBaseImpl::activate_transport:"
137  " creating UDP transport.\n");
138 
139  transport =
140  new madara::transport::UdpTransport(originator, map_, settings, true);
141  }
142  else if (settings.type == madara::transport::ZMQ)
143  {
144 #ifdef _MADARA_USING_ZMQ_
146  "KnowledgeBaseImpl::activate_transport:"
147  " creating ZMQ transport.\n");
148 
149  transport =
150  new madara::transport::ZMQTransport(originator, map_, settings, true);
151 #else
153  "KnowledgeBaseImpl::activate_transport:"
154  " project was not generated with zmq=1. Transport is invalid.\n");
155 #endif
156  }
157  else if (settings.type == madara::transport::REGISTRY_SERVER)
158  {
160  "KnowledgeBaseImpl::activate_transport:"
161  " creating UDP Registry Server transport.\n");
162 
163  transport = new madara::transport::UdpRegistryServer(
164  originator, map_, settings, true);
165  }
166  else if (settings.type == madara::transport::REGISTRY_CLIENT)
167  {
169  "KnowledgeBaseImpl::activate_transport:"
170  " creating UDP Registry Client transport.\n");
171 
172  transport = new madara::transport::UdpRegistryClient(
173  originator, map_, settings, true);
174  }
175  else
176  {
178  "KnowledgeBaseImpl::activate_transport:"
179  " no transport was specified. Setting transport to null.\n");
180  }
181 
182  {
183  MADARA_GUARD_TYPE guard(transport_mutex_);
184 
185  // if we have a valid transport, add it to the transports vector
186  if (transport != 0)
187  {
188  transports_.emplace_back(transport);
189  }
190 
191  return transports_.size();
192  }
193 }
194 
196 {
197  decltype(transports_) old_transports;
198  {
199  MADARA_GUARD_TYPE guard(transport_mutex_);
200  using std::swap;
201  swap(old_transports, transports_);
202  }
203 
204  for (auto& transport : old_transports)
205  {
206  transport->close();
207 
209  "KnowledgeBaseImpl::close_transport:"
210  " transport has been closed\n");
211  }
212 }
213 
214 #ifndef _MADARA_NO_KARL_
215 
217 {
219  "KnowledgeBaseImpl::compile:"
220  " compiling %s\n",
221  expression.c_str());
222 
223  return map_.compile(expression);
224 }
225 
227  const std::string& expression, const WaitSettings& settings)
228 {
229  CompiledExpression compiled = compile(expression);
230  return wait(compiled, settings);
231 }
232 
234  CompiledExpression& ce, const WaitSettings& settings)
235 {
236  // use the EpochEnforcer utility to keep track of sleeps
237  EpochEnforcer enforcer(settings.poll_frequency, settings.max_wait_time);
238 
239  // print the post statement at highest log level (cannot be masked)
240  if (settings.pre_print_statement != "")
242 
243  // lock the context
244 
245  KnowledgeRecord last_value;
246  {
247  MADARA_GUARD_TYPE guard(map_.mutex_);
248 
250  "KnowledgeBaseImpl::wait:"
251  " waiting on %s\n",
252  ce.logic.c_str());
253 
254  last_value = ce.expression.evaluate(settings);
255 
257  "KnowledgeBaseImpl::wait:"
258  " completed first eval to get %s\n",
259  last_value.to_string().c_str());
260  }
261 
262  send_modifieds("KnowledgeBaseImpl:wait", settings);
263 
264  // wait for expression to be true
265  while (!last_value.to_integer() &&
266  (settings.max_wait_time < 0 || !enforcer.is_done()))
267  {
269  "KnowledgeBaseImpl::wait:"
270  " last value didn't result in success\n");
271 
272  // Unlike the other wait statements, we allow for a time based wait.
273  // To do this, we allow a user to specify a
274  if (settings.poll_frequency > 0)
275  {
276  enforcer.sleep_until_next();
277  }
278  else
279  {
280  map_.wait_for_change(true);
281  }
282 
283  // relock - basically we need to evaluate the tree again, and
284  // we can't have a bunch of people changing the variables as
285  // while we're evaluating the tree.
286  {
287  MADARA_GUARD_TYPE guard(map_.mutex_);
288 
290  "KnowledgeBaseImpl::wait:"
291  " waiting on %s\n",
292  ce.logic.c_str());
293 
294  last_value = ce.expression.evaluate(settings);
295 
297  "KnowledgeBaseImpl::wait:"
298  " completed eval to get %s\n",
299  last_value.to_string().c_str());
300  }
301 
302  send_modifieds("KnowledgeBaseImpl:wait", settings);
303  map_.signal();
304 
305  } // end while (!last)
306 
307  if (enforcer.is_done())
308  {
310  "KnowledgeBaseImpl::wait:"
311  " Evaluate did not succeed. Timeout occurred\n");
312  }
313 
314  // print the post statement at highest log level (cannot be masked)
315  if (settings.post_print_statement != "")
317 
318  return last_value;
319 }
320 
322  CompiledExpression& ce, const EvalSettings& settings)
323 {
324  KnowledgeRecord last_value;
325 
327  "KnowledgeBaseImpl::evaluate:"
328  " evaluating %s.\n",
329  ce.logic.c_str());
330 
331  // iterators and tree for evaluation of interpreter results
332  // madara::expression::ExpressionTree tree;
333 
334  // print the post statement at highest log level (cannot be masked)
335  if (settings.pre_print_statement != "")
337 
338  // lock the context from being updated by any ongoing threads
339  {
340  {
341  MADARA_GUARD_TYPE guard(map_.mutex_);
342 
343  // interpret the current expression and then evaluate it
344  // tree = interpreter_.interpret (map_, expression);
345  last_value = ce.expression.evaluate(settings);
346  }
347 
348  send_modifieds("KnowledgeBaseImpl:evaluate", settings);
349 
350  // print the post statement at highest log level (cannot be masked)
351  if (settings.post_print_statement != "")
353  }
354 
355  return last_value;
356 }
357 
359  expression::ComponentNode* root, const EvalSettings& settings)
360 {
361  KnowledgeRecord last_value;
362 
364  "KnowledgeBaseImpl::evaluate:"
365  " evaluating ComponentNode rooted tree\n");
366 
367  // iterators and tree for evaluation of interpreter results
368  // madara::expression::ExpressionTree tree;
369 
370  // print the post statement at highest log level (cannot be masked)
371  if (settings.pre_print_statement != "")
373 
374  // lock the context from being updated by any ongoing threads
375  {
376  {
377  MADARA_GUARD_TYPE guard(map_.mutex_);
378 
379  // interpret the current expression and then evaluate it
380  // tree = interpreter_.interpret (map_, expression);
381  last_value = map_.evaluate(root, settings);
382  }
383 
384  send_modifieds("KnowledgeBaseImpl:evaluate", settings);
385 
386  // print the post statement at highest log level (cannot be masked)
387  if (settings.post_print_statement != "")
389  }
390 
391  return last_value;
392 }
393 
394 #endif // _MADARA_NO_KARL_
395 
397  const std::string& prefix, const EvalSettings& settings)
398 {
399  if (settings.delay_sending_modifieds)
400  {
402  "%s: user requested to not send modifieds\n", prefix.c_str());
403 
404  return -3;
405  }
406 
407  {
408  MADARA_GUARD_TYPE done_sending_guard(done_sending_mutex_);
409  done_sending_ = false;
410  }
411 
412  // Loop until threads stop asking us to repeat, which will occur if they
413  // try to send while we're sending.
414  for (;;)
415  {
416  // Limit scope of send_guard
417  {
418  std::unique_lock<MADARA_LOCK_TYPE> send_guard;
419 
420  // Limit scope of done_sending_guard
421  {
422  MADARA_GUARD_TYPE done_sending_guard(done_sending_mutex_);
423 
424  std::unique_lock<MADARA_LOCK_TYPE>
425  send_guard_tmp(send_mutex_, std::try_to_lock);
426 
427  if (!send_guard_tmp.owns_lock())
428  {
429  // Some other thread is currently doing send_modifieds. Signal it to
430  // repeat in case there are new updates to send.
431  done_sending_ = false;
432  return 0;
433  }
434 
435  // If flag is already set, stop looping. Other threads will clear while
436  // we're in this loop if they fail to take send_mutex_.
437  if (done_sending_)
438  {
439  return 0;
440  }
441 
442  send_guard = std::move(send_guard_tmp);
443 
444  done_sending_ = true;
445  }
446  // release done_sending_mutex_
447 
448  // We hold send_mutex_ here
449 
450  auto transports = get_transports();
451 
452  if (transports.size() == 0)
453  {
455  "%s: no transport configured\n", prefix.c_str());
456 
457  return -2;
458  }
459 
460  // get the modifieds and reset those that will be sent, atomically
461  auto modified = map_.get_modifieds_current(settings.send_list, true);
462 
463  if (modified.size() == 0)
464  {
466  "%s: no modifications to send\n", prefix.c_str());
467 
468  return -1;
469  }
470 
471  // send across each transport
472  for (auto& transport : transports)
473  {
474  transport->send_data(modified);
475  }
476  }
477  // Released send_mutex_
478 
479  map_.inc_clock(settings);
480 
481  if (settings.signal_changes)
482  map_.signal(false);
483  }
484 }
485 
486 }
487 }
utility::EpochEnforcer< std::chrono::steady_clock > EpochEnforcer
#define madara_logger_log(loggering, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
An abstract base class defines a simple abstract implementation of an expression tree node.
Definition: ComponentNode.h:37
madara::knowledge::KnowledgeRecord evaluate(const madara::knowledge::KnowledgeUpdateSettings &settings=knowledge::KnowledgeUpdateSettings())
Evaluates the expression tree.
Compiled, optimized KaRL logic.
std::string logic
the logic that was compiled
madara::expression::ExpressionTree expression
the expression tree
std::string setup_unique_hostport(std::string host="")
Creates a random UUID for unique tie breakers in global ordering.
size_t attach_transport(madara::transport::Base *transport)
Attaches a transport to the Knowledge Engine.
std::vector< std::shared_ptr< transport::Base > > get_transports()
Atomically retrieve the set of transports.
void close_transport(void)
Closes the transport mechanism so no dissemination is possible.
madara::knowledge::KnowledgeRecord evaluate(const std::string &expression)
Evaluates an expression.
CompiledExpression compile(const std::string &expression)
Compiles a KaRL expression into an expression tree.
MADARA_EXPORT int send_modifieds(const std::string &prefix, const EvalSettings &settings=EvalSettings::SEND)
Sends all modified variables through the attached transports.
madara::knowledge::KnowledgeRecord wait(const std::string &expression)
Waits for an expression to be non-zero.
std::vector< std::shared_ptr< transport::Base > > transports_
This class encapsulates an entry in a KnowledgeBase.
std::string to_string(const std::string &delimiter=", ") const
converts the value to a string.
Integer to_integer(void) const
converts the value to an integer.
bool signal_changes
Toggle whether to signal changes have happened.
CompiledExpression compile(const std::string &expression)
Compiles a KaRL expression into an expression tree.
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
void signal(bool lock=true) const
Signals that this thread is done with the context.
knowledge::KnowledgeRecord evaluate(CompiledExpression expression, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Evaluate a compiled expression.
KnowledgeMap get_modifieds_current(const std::map< std::string, bool > &send_list, bool reset=true)
Retrieves the current modifieds map.
uint64_t inc_clock(const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically increments the Lamport clock and returns the new clock time (intended for sending knowledg...
void print(unsigned int level) const
Atomically prints all variables and values in the context.
void wait_for_change(bool extra_release=false)
Wait for a change to happen to the context.
Base class from which all transports must be derived.
Definition: Transport.h:46
Broadcast-based transport for knowledge.
Multicast-based transport for knowledge.
This class provides an interface into the Open Splice dissemination transport.
Holds basic transport settings.
uint32_t type
Type of transport. See madara::transport::Types for options.
UDP-based transport for knowledge.
UDP-based server that handles a registry of UDP endpoints, which makes it ideal for any NAT-protected...
UDP-based transport for knowledge.
Definition: UdpTransport.h:39
ZMQ-based transport for knowledge.
Definition: ZMQTransport.h:44
Enforces a periodic epoch.
Definition: EpochEnforcer.h:18
bool is_done(void) const
Checks to see if max duration is finished.
void sleep_until_next(void)
Sleeps until the next epoch.
Definition: EpochEnforcer.h:91
constexpr string_t string
Provides functions and classes for the distributed knowledge base.
Provides utility functions and classes for common tasks and needs.
Definition: IteratorImpl.h:15
Copyright(c) 2020 Galois.
Encapsulates settings for an evaluation statement.
Definition: EvalSettings.h:26
std::map< std::string, bool > send_list
Map of record names that are allowed to be sent after operation.
Definition: EvalSettings.h:157
std::string post_print_statement
Statement to print after evaluations.
Definition: EvalSettings.h:151
bool delay_sending_modifieds
Toggle for sending modifieds in a single update event after each evaluation.
Definition: EvalSettings.h:141
std::string pre_print_statement
Statement to print before evaluations.
Definition: EvalSettings.h:146
Encapsulates settings for a wait statement.
Definition: WaitSettings.h:25
double max_wait_time
Maximum time to wait for an expression to become true (in seconds)
Definition: WaitSettings.h:136
double poll_frequency
Frequency to poll an expression for truth (in seconds)
Definition: WaitSettings.h:131