MADARA  3.2.3
KnowledgeBaseImpl.cpp
Go to the documentation of this file.
11 #include "madara/Boost.h"
12 
13 #include <sstream>
14 
15 #ifdef _MADARA_USING_ZMQ_
17 #endif
18 
19 #ifdef _USE_OPEN_SPLICE_
21 #endif // _USE_OPEN_SPLICE_
22 
23 #ifdef _USE_NDDS_
25 #endif // _USE_NDDS_
26 
27 #include <iostream>
28 
29 namespace utility = madara::utility;
30 
32 
33 namespace madara { namespace knowledge {
34 
37 {
38  // placeholder for our ip address
39  std::string actual_host (std::move(host));
40 
41  if (actual_host == "")
42  {
43  try {
44  actual_host = boost::asio::ip::host_name();
45  } catch (const std::exception &) {
46  actual_host = "localhost";
47  }
48  }
49 
50  if (actual_host.size () > 30) {
51  actual_host.resize (30);
52  }
53 
54  actual_host += ":";
55 
56  auto uuid = boost::uuids::random_generator{}();
57  auto uuid_str = boost::uuids::to_string(uuid);
58  uuid_str.erase(std::remove(uuid_str.begin(), uuid_str.end(), '-'), uuid_str.end());
59 
60  actual_host += uuid_str;
61 
62  return actual_host;
63 }
64 
65 size_t
68 {
69  madara::transport::Base * transport (0);
70  std::string originator (id);
71 
72  if (originator == "")
73  {
74  if (id_.size () > 0)
75  originator = id_;
76  else
77  originator = id_ = setup_unique_hostport ();
78  }
79 
81  "KnowledgeBaseImpl::attach_transport:" \
82  " activating transport type %d\n", settings.type);
83 
84  if (settings.type == madara::transport::BROADCAST)
85  {
86  transport = new madara::transport::BroadcastTransport (originator, map_,
87  settings, true);
88  }
89  else if (settings.type == madara::transport::MULTICAST)
90  {
91  transport = new madara::transport::MulticastTransport (originator, map_,
92  settings, true);
93  }
94  else if (settings.type == madara::transport::SPLICE)
95  {
96 #ifdef _USE_OPEN_SPLICE_
98  "KnowledgeBaseImpl::activate_transport:" \
99  " creating Open Splice DDS transport.\n");
100 
101  transport = new madara::transport::SpliceDDSTransport (originator, map_,
102  settings, true);
103 #else
105  "KnowledgeBaseImpl::activate_transport:" \
106  " project was not generated with opensplice=1. Transport is invalid.\n");
107 #endif
108  }
109  else if (settings.type == madara::transport::NDDS)
110  {
111 #ifdef _USE_NDDS_
113  "KnowledgeBaseImpl::activate_transport:" \
114  " creating NDDS transport.\n");
115 
116  transport = new madara::transport::NddsTransport (originator, map_,
117  settings, true);
118 #else
120  "KnowledgeBaseImpl::activate_transport:" \
121  " project was not generated with ndds=1. Transport is invalid.\n");
122 #endif
123  }
124  else if (settings.type == madara::transport::UDP)
125  {
127  "KnowledgeBaseImpl::activate_transport:" \
128  " creating UDP transport.\n");
129 
130  transport = new madara::transport::UdpTransport (originator, map_,
131  settings, true);
132  }
133  else if (settings.type == madara::transport::ZMQ)
134  {
135 #ifdef _MADARA_USING_ZMQ_
137  "KnowledgeBaseImpl::activate_transport:" \
138  " creating ZMQ transport.\n");
139 
140  transport = new madara::transport::ZMQTransport (originator, map_,
141  settings, true);
142 #else
144  "KnowledgeBaseImpl::activate_transport:" \
145  " project was not generated with zmq=1. Transport is invalid.\n");
146 #endif
147  }
148  else if (settings.type == madara::transport::REGISTRY_SERVER)
149  {
151  "KnowledgeBaseImpl::activate_transport:" \
152  " creating UDP Registry Server transport.\n");
153 
154  transport = new madara::transport::UdpRegistryServer (originator, map_,
155  settings, true);
156  }
157  else if (settings.type == madara::transport::REGISTRY_CLIENT)
158  {
160  "KnowledgeBaseImpl::activate_transport:" \
161  " creating UDP Registry Client transport.\n");
162 
163  transport = new madara::transport::UdpRegistryClient (originator, map_,
164  settings, true);
165  }
166  else
167  {
169  "KnowledgeBaseImpl::activate_transport:" \
170  " no transport was specified. Setting transport to null.\n");
171  }
172 
173  MADARA_GUARD_TYPE guard (map_.mutex_);
174 
175  // if we have a valid transport, add it to the transports vector
176  if (transport != 0)
177  {
178  transports_.emplace_back (transport);
179  }
180 
181  return transports_.size ();
182 }
183 
184 void
186 {
187  decltype(transports_) old_transports;
188  {
189  MADARA_GUARD_TYPE guard (map_.mutex_);
190  using std::swap;
191  swap(old_transports, transports_);
192  }
193 
194  for (auto &transport : old_transports)
195  {
196  transport->close ();
197 
199  "KnowledgeBaseImpl::close_transport:" \
200  " transport has been closed\n");
201  }
202 }
203 
204 
205 #ifndef _MADARA_NO_KARL_
206 
209 const std::string & expression)
210 {
212  "KnowledgeBaseImpl::compile:" \
213  " compiling %s\n", expression.c_str ());
214 
215  return map_.compile (expression);
216 }
217 
220 const WaitSettings & settings)
221 {
222  CompiledExpression compiled = compile (expression);
223  return wait (compiled, settings);
224 }
225 
228 CompiledExpression & ce,
229 const WaitSettings & settings)
230 {
231  // use the EpochEnforcer utility to keep track of sleeps
232  EpochEnforcer enforcer (settings.poll_frequency, settings.max_wait_time);
233 
234  // print the post statement at highest log level (cannot be masked)
235  if (settings.pre_print_statement != "")
237 
238  // lock the context
239 
240  KnowledgeRecord last_value;
241  {
242  MADARA_GUARD_TYPE guard (map_.mutex_);
243 
245  "KnowledgeBaseImpl::wait:" \
246  " waiting on %s\n", ce.logic.c_str ());
247 
248  last_value = ce.expression.evaluate (settings);
249 
251  "KnowledgeBaseImpl::wait:" \
252  " completed first eval to get %s\n",
253  last_value.to_string ().c_str ());
254 
255  send_modifieds ("KnowledgeBaseImpl:wait", settings);
256  }
257 
258  // wait for expression to be true
259  while (!last_value.to_integer () &&
260  (settings.max_wait_time < 0 || !enforcer.is_done ()))
261  {
263  "KnowledgeBaseImpl::wait:" \
264  " last value didn't result in success\n");
265 
266  // Unlike the other wait statements, we allow for a time based wait.
267  // To do this, we allow a user to specify a
268  if (settings.poll_frequency > 0)
269  {
270  enforcer.sleep_until_next ();
271  }
272  else
273  {
274  map_.wait_for_change (true);
275  }
276 
277  // relock - basically we need to evaluate the tree again, and
278  // we can't have a bunch of people changing the variables as
279  // while we're evaluating the tree.
280  {
281  MADARA_GUARD_TYPE guard (map_.mutex_);
282 
284  "KnowledgeBaseImpl::wait:" \
285  " waiting on %s\n", ce.logic.c_str ());
286 
287  last_value = ce.expression.evaluate (settings);
288 
290  "KnowledgeBaseImpl::wait:" \
291  " completed eval to get %s\n",
292  last_value.to_string ().c_str ());
293 
294  send_modifieds ("KnowledgeBaseImpl:wait", settings);
295 
296  }
297  map_.signal ();
298 
299  } // end while (!last)
300 
301  if (enforcer.is_done ())
302  {
304  "KnowledgeBaseImpl::wait:" \
305  " Evaluate did not succeed. Timeout occurred\n");
306  }
307 
308  // print the post statement at highest log level (cannot be masked)
309  if (settings.post_print_statement != "")
311 
312  return last_value;
313 }
314 
317 CompiledExpression & ce,
318 const EvalSettings & settings)
319 {
320  KnowledgeRecord last_value;
321 
323  "KnowledgeBaseImpl::evaluate:" \
324  " evaluating %s.\n", ce.logic.c_str ());
325 
326  // iterators and tree for evaluation of interpreter results
327  //madara::expression::ExpressionTree tree;
328 
329  // print the post statement at highest log level (cannot be masked)
330  if (settings.pre_print_statement != "")
332 
333  // lock the context from being updated by any ongoing threads
334  {
335  MADARA_GUARD_TYPE guard (map_.mutex_);
336 
337  // interpret the current expression and then evaluate it
338  //tree = interpreter_.interpret (map_, expression);
339  last_value = ce.expression.evaluate (settings);
340 
341  send_modifieds ("KnowledgeBaseImpl:evaluate", settings);
342 
343  // print the post statement at highest log level (cannot be masked)
344  if (settings.post_print_statement != "")
346  }
347 
348  return last_value;
349 }
350 
354 const EvalSettings & settings)
355 {
356  KnowledgeRecord last_value;
357 
359  "KnowledgeBaseImpl::evaluate:" \
360  " evaluating ComponentNode rooted tree\n");
361 
362  // iterators and tree for evaluation of interpreter results
363  //madara::expression::ExpressionTree tree;
364 
365  // print the post statement at highest log level (cannot be masked)
366  if (settings.pre_print_statement != "")
368 
369  // lock the context from being updated by any ongoing threads
370  {
371  MADARA_GUARD_TYPE guard (map_.mutex_);
372 
373  // interpret the current expression and then evaluate it
374  //tree = interpreter_.interpret (map_, expression);
375  last_value = map_.evaluate (root, settings);
376 
377  send_modifieds ("KnowledgeBaseImpl:evaluate", settings);
378 
379  // print the post statement at highest log level (cannot be masked)
380  if (settings.post_print_statement != "")
382  }
383 
384  return last_value;
385 }
386 
387 #endif // _MADARA_NO_KARL_
388 
389 int
391  const std::string & prefix,
392  const EvalSettings & settings)
393 {
394  int result = 0;
395 
396  MADARA_GUARD_TYPE guard (map_.mutex_);
397 
398  if (transports_.size () > 0 && !settings.delay_sending_modifieds)
399  {
400  const VariableReferenceMap & modified = map_.get_modifieds ();
401 
402  if (modified.size () > 0)
403  {
404  // if there is not an allowed send_list list
405  if (settings.send_list.size () == 0)
406  {
407  // send across each transport
408  for (auto &transport : transports_) {
409  transport->send_data (modified);
410  }
411 
412 
413  // reset the modified map
414  map_.reset_modified ();
415  }
416  // if there is a send_list
417  else
418  {
419  VariableReferenceMap allowed_modifieds;
420  // otherwise, we are only allowed to send a subset of modifieds
421  for (const auto &entry : modified)
422  {
423  if (settings.send_list.find (entry.first) != settings.send_list.end ())
424  {
425  allowed_modifieds.emplace_hint(allowed_modifieds.end(), entry);
426  }
427  }
428 
429  // if the subset was greater than zero, we send the subset
430  if (allowed_modifieds.size () > 0)
431  {
432  // send across each transport
433  for (auto &transport : transports_) {
434  transport->send_data (allowed_modifieds);
435  }
436 
437  // reset modified list for the allowed modifications
438  for (const auto &entry : allowed_modifieds)
439  {
440  map_.reset_modified (entry.first);
441  }
442  }
443  }
444 
445  map_.inc_clock (settings);
446 
447  if (settings.signal_changes)
448  map_.signal (false);
449  }
450  else
451  {
453  "%s: no modifications to send\n", prefix.c_str ());
454 
455  result = -1;
456  }
457  }
458  else
459  {
460  if (transports_.size () == 0)
461  {
463  "%s: no transport configured\n", prefix.c_str ());
464 
465  result = -2;
466  }
467  else if (settings.delay_sending_modifieds)
468  {
470  "%s: user requested to not send modifieds\n", prefix.c_str ());
471 
472  result = -3;
473  }
474  }
475 
476  return result;
477 }
478 
479 } }
This class encapsulates an entry in a KnowledgeBase.
double max_wait_time
Maximum time to wait for an expression to become true (in seconds)
Definition: WaitSettings.h:113
std::map< std::string, bool > send_list
Map of record names that are allowed to be sent after operation.
Definition: EvalSettings.h:131
Multicast-based transport for knowledge.
Broadcast-based transport for knowledge.
std::string pre_print_statement
Statement to print before evaluations.
Definition: EvalSettings.h:120
bool signal_changes
Toggle whether to signal changes have happened.
void signal(bool lock=true) const
Signals that this thread is done with the context.
const VariableReferenceMap & get_modifieds(void) const
Retrieves a list of modified variables.
void print(unsigned int level) const
Atomically prints all variables and values in the context.
std::string logic
the logic that was compiled
void close_transport(void)
Closes the transport mechanism so no dissemination is possible.
Holds basic transport settings.
Compiled, optimized KaRL logic.
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
size_t attach_transport(madara::transport::Base *transport)
Attaches a transport to the Knowledge Engine.
std::string post_print_statement
Statement to print after evaluations.
Definition: EvalSettings.h:125
uint32_t type
Type of transport. See madara::transport::Types for options.
madara::knowledge::KnowledgeRecord wait(const std::string &expression)
Waits for an expression to be non-zero.
bool is_done(void) const
Checks to see if max duration is finished.
knowledge::KnowledgeRecord evaluate(CompiledExpression expression, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Evaluate a compiled expression.
madara::expression::ExpressionTree expression
the expression tree
static struct madara::knowledge::tags::string_t string
std::vector< std::unique_ptr< transport::Base > > transports_
This class provides an interface into the Open Splice dissemination transport.
Enforces a periodic epoch.
Definition: EpochEnforcer.h:17
CompiledExpression compile(const std::string &expression)
Compiles a KaRL expression into an expression tree.
An abstract base class defines a simple abstract implementation of an expression tree node...
Definition: ComponentNode.h:36
double poll_frequency
Frequency to poll an expression for truth (in seconds)
Definition: WaitSettings.h:108
ZMQ-based transport for knowledge.
Definition: ZMQTransport.h:43
CompiledExpression compile(const std::string &expression)
Compiles a KaRL expression into an expression tree.
Integer to_integer(void) const
converts the value to an integer.
This class provides an interface into the NDDS dissemination transport.
Definition: NddsTransport.h:22
std::string setup_unique_hostport(std::string host="")
Creates a random UUID for unique tie breakers in global ordering.
void wait_for_change(bool extra_release=false)
Wait for a change to happen to the context.
uint64_t inc_clock(const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically increments the Lamport clock and returns the new clock time (intended for sending knowledg...
std::map< const char *, VariableReference, VariableReferenceMapCompare > VariableReferenceMap
a map of variable references
Encapsulates settings for an evaluation statement.
Definition: EvalSettings.h:27
Provides utility functions and classes for common tasks and needs.
Definition: IteratorImpl.h:14
madara::knowledge::KnowledgeRecord evaluate(const std::string &expression)
Evaluates an expression.
Provides functions and classes for the distributed knowledge base.
MADARA_EXPORT int send_modifieds(const std::string &prefix, const EvalSettings &settings=EvalSettings())
Sends all modified variables through the attached transports.
Copyright (c) 2015 Carnegie Mellon University.
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
UDP-based transport for knowledge.
bool delay_sending_modifieds
Toggle for sending modifieds in a single update event after each evaluation.
Definition: EvalSettings.h:115
Encapsulates settings for a wait statement.
Definition: WaitSettings.h:23
UDP-based transport for knowledge.
Definition: UdpTransport.h:37
utility::EpochEnforcer< std::chrono::steady_clock > EpochEnforcer
Base class from which all transports must be derived.
Definition: Transport.h:45
UDP-based server that handles a registry of UDP endpoints, which makes it ideal for any NAT-protected...
madara::knowledge::KnowledgeRecord evaluate(const madara::knowledge::KnowledgeUpdateSettings &settings=knowledge::KnowledgeUpdateSettings())
Evaluates the expression tree.
void sleep_until_next(void)
Sleeps until the next epoch.
Definition: EpochEnforcer.h:92
void reset_modified(void)
Reset all variables to be unmodified.