MADARA  3.2.3
KnowledgeRecordFilters.cpp
Go to the documentation of this file.
3 
5 #include <memory>
7 
8 #ifdef _MADARA_PYTHON_CALLBACKS_
9 
10 #include <boost/python/call.hpp>
12 
13 #endif
14 
15 #ifdef _MADARA_JAVA_
16 
17 #include <jni.h>
18 #include "madara_jni.h"
20 
21 #endif
22 
24  : context_ (0)
25 {
26 }
27 
29  const knowledge::KnowledgeRecordFilters & filters)
30  : filters_ (filters.filters_),
32  context_ (filters.context_)
33 {
34 }
35 
37 {
38 }
39 
40 void
43 {
44  if (this != &rhs)
45  {
46  filters_ = rhs.filters_;
48  context_ = rhs.context_;
49  }
50 }
51 
52 void
55 {
56  if (function != 0)
57  {
61  "KnowledgeRecordFilters::add: "
62  "Adding C record filter\n");
63 
64  // start with 1st bit, check every bit until types is 0
65  for (uint32_t cur = 1; types > 0; cur <<= 1)
66  {
67  // if current is set in the bitmask
68  if (madara::utility::bitmask_check (types, cur))
69  {
70  // remove the filter list from the type cur
71  filters_[cur].push_back (Function (function));
72  }
73 
74  // remove the current flag from the types
75  types = madara::utility::bitmask_remove (types, cur);
76  }
77  }
78 }
79 
80 void
82  void (*function) (
84 {
85  if (function != 0)
86  {
90  "KnowledgeRecordFilters::add: "
91  "Adding C aggregate filter\n");
92 
93  aggregate_filters_.push_back (AggregateFilter (function));
94  }
95 }
96 
97 void
99  filters::AggregateFilter * functor)
100 {
101  if (functor != 0)
102  {
106  "KnowledgeRecordFilters::add: "
107  "Adding aggregate functor filter\n");
108 
109  aggregate_filters_.push_back (AggregateFilter (functor));
110  }
111 }
112 
113 void
115 filters::BufferFilter * functor)
116 {
117  if (functor != 0)
118  {
122  "KnowledgeRecordFilters::add: "
123  "Adding buffer functor filter\n");
124 
125  buffer_filters_.push_back (functor);
126  }
127 }
128 
129 void
131  uint32_t types,
132  filters::RecordFilter * functor)
133 {
134  if (functor != 0)
135  {
139  "KnowledgeRecordFilters::add: "
140  "Adding record function filter to types\n");
141 
142  // start with 1st bit, check every bit until types is 0
143  for (uint32_t cur = 1; types > 0; cur <<= 1)
144  {
145  // if current is set in the bitmask
146  if (madara::utility::bitmask_check (types, cur))
147  {
148  // remove the filter list from the type cur
149  filters_[cur].push_back (Function (functor));
150  }
151 
152  // remove the current flag from the types
153  types = madara::utility::bitmask_remove (types, cur);
154  }
155  }
156 }
157 
158 #ifdef _MADARA_JAVA_
159 
160 void
162  jobject & callable)
163 {
164  if (callable != NULL)
165  {
169  "KnowledgeRecordFilters::add: "
170  "Adding Java record filter\n");
171 
172  // start with 1st bit, check every bit until types is 0
173  for (uint32_t cur = 1; types > 0; cur <<= 1)
174  {
175  // if current is set in the bitmask
176  if (madara::utility::bitmask_check (types, cur))
177  {
178  // remove the filter list from the type cur
179  filters_[cur].push_back (Function (callable));
180  }
181 
182  // remove the current flag from the types
183  types = madara::utility::bitmask_remove (types, cur);
184  }
185  }
186 }
187 
188 void
190  jobject & callable)
191 {
192  if (callable != NULL)
193  {
197  "KnowledgeRecordFilters::add: "
198  "Adding Java aggregate filter\n");
199 
200  aggregate_filters_.push_back (AggregateFilter (callable));
201  }
202 }
203 
204 #endif
205 
206 #ifdef _MADARA_PYTHON_CALLBACKS_
207 
208 void
210  boost::python::object & callable)
211 {
212  if (!callable.is_none ())
213  {
214  // start with 1st bit, check every bit until types is 0
215  for (uint32_t cur = 1; types > 0; cur <<= 1)
216  {
217  // if current is set in the bitmask
218  if (madara::utility::bitmask_check (types, cur))
219  {
220  // remove the filter list from the type cur
221  filters_[cur].push_back (Function (callable));
222  }
223 
224  // remove the current flag from the types
225  types = madara::utility::bitmask_remove (types, cur);
226  }
227  }
228 }
229 
230 void
232  boost::python::object & callable)
233 {
234  if (!callable.is_none ())
235  {
236  aggregate_filters_.push_back (AggregateFilter (callable));
237  }
238 }
239 
240 #endif
241 
242 void
244  ThreadSafeContext * context)
245 {
246  context_ = context;
247 }
248 
249 void
251 {
252  // start with 1st bit, check every bit until types is 0
253  for (uint32_t cur = 1; types > 0; cur <<= 1)
254  {
255  // if current is set in the bitmask
256  if (madara::utility::bitmask_check (types, cur))
257  {
258  // remove the filter list from the type cur
259  filters_.erase (cur);
260  }
261 
262  // remove the current flag from the types
263  types = madara::utility::bitmask_remove (types, cur);
264  }
265 }
266 
267 void
269  void)
270 {
271  aggregate_filters_.clear ();
272 }
273 
274 void
276 void)
277 {
278  buffer_filters_.clear ();
279 }
280 
281 void
283  void) const
284 {
285  if (context_)
286  {
290  "Printing Knowledge Record Filter Chains by Type...\n");
291 
292  for (FilterMap::const_iterator i = filters_.begin ();
293  i != filters_.end (); ++i)
294  {
298  "%d = %d chained filters\n", i->first, i->second.size ());
299  }
300 
304  "%d chained aggregate filters\n", aggregate_filters_.size ());
305 
309  "%d chained buffer filters\n", buffer_filters_.size ());
310  }
311 }
312 
313 
316  const knowledge::KnowledgeRecord & input,
317  const std::string & name,
318  transport::TransportContext & transport_context) const
319 {
320  // grab the filter chain entry for the type
321  uint32_t type = input.type ();
322  FilterMap::const_iterator type_match = filters_.find (type);
323  knowledge::KnowledgeRecord result (input);
324 
325  // if there are filters for this type
326  if (type_match != filters_.end ())
327  {
331  "KnowledgeRecordFilters::filter: "
332  "Entering record filter logic\n");
333 
334  const FilterChain & chain = type_match->second;
335  FunctionArguments arguments;
336 
337  // JVMs appear to do strange things with the stack on jni_attach
338  std::unique_ptr <Variables> heap_variables (
339  new Variables ());
340 
341  heap_variables->context_ = context_;
342 
343  for (FilterChain::const_iterator i = chain.begin ();
344  i != chain.end (); ++i)
345  {
349  "KnowledgeRecordFilters::filter: "
350  "Preparing args for filter\n");
351 
357  arguments.resize (madara::filters::TOTAL_ARGUMENTS);
358 
359  if (name != "")
360  {
361  // second argument is the variable name, if applicable
362  arguments[1].set_value (name);
363  }
364 
365  // third argument is the operation being performed
366  arguments[2].set_value (KnowledgeRecord::Integer (
367  transport_context.get_operation ()));
368 
369  // fourth argument is the send/rebroadcast bandwidth utilization
370  arguments[3].set_value (KnowledgeRecord::Integer (
371  transport_context.get_send_bandwidth ()));
372 
373  // fifth argument is the send/rebroadcast bandwidth utilization
374  arguments[4].set_value (KnowledgeRecord::Integer (
375  transport_context.get_receive_bandwidth ()));
376 
377  // sixth argument is the message timestamp
378  arguments[5].set_value (KnowledgeRecord::Integer (
379  transport_context.get_message_time ()));
380 
381  // sixth argument is the message timestamp
382  arguments[6].set_value (KnowledgeRecord::Integer (
383  transport_context.get_current_time ()));
384 
385  // seventh argument is the networking domain
386  arguments[7].set_value (transport_context.get_domain ());
387 
388  // eighth argument is the update originator
389  arguments[8].set_value (transport_context.get_originator ());
390 
391  // setup arguments to the function
392  arguments[0] = result;
393 
397  "KnowledgeRecordFilters::filter: "
398  "Checking filter type\n");
399 
400  // optimize selection for functors, the preferred filter impl
401  if (i->is_functor ())
402  {
406  "KnowledgeRecordFilters::filter: "
407  "Calling functor filter\n");
408 
409  result = i->functor->filter (arguments, *heap_variables.get ());
410  }
411 #ifdef _MADARA_JAVA_
412  else if (i->is_java_callable ())
413  {
417  "KnowledgeRecordFilters::filter: "
418  "Calling Java filter\n");
419 
421 
426  jclass jvarClass = madara::utility::java::find_class (
427  jvm.env, "ai/madara/knowledge/Variables");
428  jclass jlistClass = madara::utility::java::find_class (
429  jvm.env, "ai/madara/knowledge/KnowledgeList");
430 
431  jmethodID fromPointerCall = jvm.env->GetStaticMethodID (jvarClass,
432  "fromPointer", "(J)Lai/madara/knowledge/Variables;");
433  jobject jvariables = jvm.env->CallStaticObjectMethod (jvarClass,
434  fromPointerCall, (jlong)heap_variables.get ());
435 
436  // prep to create the KnowledgeList
437  jmethodID listConstructor = jvm.env->GetMethodID (jlistClass,
438  "<init>", "([J)V");
439 
440  jlongArray ret = jvm.env->NewLongArray ((jsize)arguments.size ());
441  jlong * tmp = new jlong[(jsize)arguments.size ()];
442 
443  for (unsigned int x = 0; x < arguments.size (); x++)
444  {
445  tmp[x] = (jlong)arguments[x].clone ();
446  }
447 
448  jvm.env->SetLongArrayRegion (ret, 0, (jsize)arguments.size (), tmp);
449  delete[] tmp;
450 
451  // create the KnowledgeList
452  jobject jlist = jvm.env->NewObject (jlistClass, listConstructor, ret);
453 
454  // get the filter's class
455  jclass filterClass = jvm.env->GetObjectClass (i->java_object);
456 
457  // get the filter method
458  jmethodID filterMethod = jvm.env->GetMethodID (filterClass,
459  "filter",
460  "(Lai/madara/knowledge/KnowledgeList;"
461  "Lai/madara/knowledge/Variables;)Lai/madara/knowledge/KnowledgeRecord;");
462 
466  "KnowledgeRecordFilters::filter: "
467  "Calling Java method\n");
468 
469  // call the filter and hold the result
470  jobject jresult = jvm.env->CallObjectMethod (i->java_object,
471  filterMethod, jlist, jvariables);
472 
476  "KnowledgeRecordFilters::filter: "
477  "Obtaining pointer from Knowledge Record result\n");
478 
479  jmethodID getPtrMethod = jvm.env->GetMethodID (
480  jvm.env->GetObjectClass (jresult), "getCPtr", "()J");
481  jlong cptr = jvm.env->CallLongMethod (jresult, getPtrMethod);
482 
486  "KnowledgeRecordFilters::filter: "
487  "Checking return for origination outside of filter\n");
488 
489  bool do_delete = true;
490  //We need to see if they returned an arg we sent them, or a new value
491  for (unsigned int x = 0; x < arguments.size (); x++)
492  {
493  if (cptr == (jlong)&(arguments[x]))
494  {
495  do_delete = false;
496  break;
497  }
498  }
499 
500  if (cptr != 0)
501  {
505  "KnowledgeRecordFilters::filter: "
506  "Doing a deep copy of the return value\n");
507 
508  result = *(madara::knowledge::KnowledgeRecord *)cptr;
509  }
510 
511  if (do_delete)
512  {
516  "KnowledgeRecordFilters::filter: "
517  "Deleting the original pointer\n");
518 
519  delete (KnowledgeRecord*)cptr;
520  }
521 
522  jvm.env->DeleteWeakGlobalRef (jvarClass);
523  jvm.env->DeleteLocalRef (jvariables);
524  jvm.env->DeleteLocalRef (jresult);
525  jvm.env->DeleteLocalRef (jlist);
526  jvm.env->DeleteWeakGlobalRef (jlistClass);
527  jvm.env->DeleteLocalRef (ret);
528  }
529 #endif
530 
531 #ifdef _MADARA_PYTHON_CALLBACKS_
532 
533  else if (i->is_python_callable ())
534  {
538  "KnowledgeRecordFilters::filter: "
539  "Calling Python filter\n");
540 
541  // acquire the interpreter lock to use the python function
542  madara::python::Acquire_GIL acquire_gil;
543 
544  // some guides have stated that we should let python handle exceptions
545  result = boost::python::call <madara::knowledge::KnowledgeRecord> (
546  i->python_function.ptr (), boost::ref (arguments),
547  boost::ref (*heap_variables.get ()));
548  }
549 
550 #endif
551 
552  // if the function is not zero
553  else if (i->is_extern_unnamed ())
554  {
558  "KnowledgeRecordFilters::filter: "
559  "Calling unnamed C filter\n");
560 
561  result = i->extern_unnamed (arguments, *heap_variables.get ());
562  }
563 
564  // did the filter add records to be sent?
565  if (arguments.size () > madara::filters::TOTAL_ARGUMENTS)
566  {
567  for (unsigned int j = madara::filters::TOTAL_ARGUMENTS;
568  j + 1 < arguments.size (); j += 2)
569  {
570  if (arguments[j].is_string_type ())
571  {
575  "KnowledgeRecordFilters::filter: Adding %s "
576  "to transport context.\n", arguments[j].to_string ().c_str ());
577 
578  transport_context.add_record (
579  arguments[j].to_string (), arguments[j + 1]);
580  }
581  else
582  {
586  "KnowledgeRecordFilters::filter: ERROR. Filter attempted to"
587  " add records to transport context, but args[%d] was not"
588  " a string value.\n", j);
589 
590  break;
591  }
592  }
593  }
594  }
595  }
596 
597  return result;
598 }
599 
600 void
602  KnowledgeMap & records,
603  const transport::TransportContext & transport_context) const
604 {
605  // if there are aggregate filters
606  if (aggregate_filters_.size () > 0)
607  {
611  "KnowledgeRecordFilters::filter: "
612  "Entering aggregate filter method\n");
613 
614  // JVMs appear to do strange things with the stack on jni_attach
615  std::unique_ptr <Variables> heap_variables (
616  new Variables ());
617 
618  heap_variables->context_ = context_;
619 
620  for (AggregateFilters::const_iterator i = aggregate_filters_.begin ();
621  i != aggregate_filters_.end (); ++i)
622  {
623  // optimize selection for functors, the preferred filter impl
624  if (i->is_functor ())
625  {
629  "KnowledgeRecordFilters::filter: "
630  "Checking vars for null\n");
631 
632  Variables * vars = heap_variables.get ();
633  if (vars)
634  {
638  "KnowledgeRecordFilters::filter: "
639  "Calling C++ filter\n");
640 
641  i->functor->filter (records, transport_context,
642  *vars);
643 
647  "KnowledgeRecordFilters::filter: "
648  "Finished calling C++ filter\n");
649  }
650  else
651  {
655  "KnowledgeRecordFilters::filter: "
656  "Memory issues caused vars to be null before filtering\n");
657  }
658  }
659 #ifdef _MADARA_JAVA_
660  else if (i->is_java_callable ())
661  {
662  // manage VM attachment
664 
665  // JVMs appear to do strange things with the stack on jni_attach
666  std::unique_ptr <KnowledgeMap> heap_records (new KnowledgeMap (records));
667  std::unique_ptr <transport::TransportContext> heap_context (
668  new transport::TransportContext (transport_context));
669 
673  jclass jvarClass = madara::utility::java::find_class (jvm.env,
674  "ai/madara/knowledge/Variables");
675  jclass jpacketClass = madara::utility::java::find_class (jvm.env,
676  "ai/madara/transport/filters/Packet");
677  jclass jcontextClass = madara::utility::java::find_class (jvm.env,
678  "ai/madara/transport/TransportContext");
679 
680  jmethodID varfromPointerCall = jvm.env->GetStaticMethodID (
681  jvarClass,
682  "fromPointer", "(J)Lai/madara/knowledge/Variables;");
683  jobject jvariables = jvm.env->CallStaticObjectMethod (
684  jvarClass,
685  varfromPointerCall, (jlong)heap_variables.get ());
686 
687  jmethodID packetfromPointerCall = jvm.env->GetStaticMethodID (
688  jpacketClass,
689  "fromPointer", "(J)Lai/madara/transport/filters/Packet;");
690  jobject jpacket = jvm.env->CallStaticObjectMethod (jpacketClass,
691  packetfromPointerCall, (jlong)heap_records.get ());
692 
693  jmethodID contextfromPointerCall = jvm.env->GetStaticMethodID (
694  jcontextClass,
695  "fromPointer", "(J)Lai/madara/transport/TransportContext;");
696  jobject jcontext = jvm.env->CallStaticObjectMethod (jcontextClass,
697  contextfromPointerCall, (jlong)heap_context.get ());
698 
699  // get the filter's class and method
700  jclass filterClass = jvm.env->GetObjectClass (i->java_object);
701  jmethodID filterMethod = jvm.env->GetMethodID (filterClass,
702  "filter",
703  "(Lai/madara/transport/filters/Packet;"
704  "Lai/madara/transport/TransportContext;Lai/madara/knowledge/Variables;)V");
705 
706 
710  "KnowledgeRecordFilters::filter: "
711  "Calling Java method\n");
712 
713  jvm.env->CallVoidMethod (i->java_object, filterMethod,
714  jpacket, jcontext, jvariables);
715 
719  "KnowledgeRecordFilters::filter: "
720  "Overwriting resulting records from heap records\n");
721 
722  // overwrite the old records
723  //records = *heap_records.get ();
724 
725  for (KnowledgeMap::const_iterator i = heap_records->begin ();
726  i != heap_records->end (); ++i)
727  {
728  records[i->first] = i->second;
729  }
730 
731  jvm.env->DeleteLocalRef (filterClass);
732  jvm.env->DeleteLocalRef (jcontext);
733  jvm.env->DeleteLocalRef (jpacket);
734  jvm.env->DeleteWeakGlobalRef (jvarClass);
735  jvm.env->DeleteWeakGlobalRef (jpacketClass);
736  jvm.env->DeleteWeakGlobalRef (jcontextClass);
737  // the auto_ptr should clear up the heap-allocated records
738  }
739 #endif
740 
741 #ifdef _MADARA_PYTHON_CALLBACKS_
742 
743  else if (i->is_python_callable ())
744  {
745  // acquire the interpreter lock to use the python function
746  madara::python::Acquire_GIL acquire_gil;
747 
748  // some guides have stated that we should let python handle exceptions
749  boost::python::call <madara::knowledge::KnowledgeRecord> (
750  i->python_function.ptr (),
751  boost::ref (records), boost::ref (transport_context),
752  boost::ref (*heap_variables.get ()));
753  }
754 
755 #endif
756 
757  // if the function is not zero
758  else if (i->is_extern_unnamed ())
759  {
760  i->unnamed_filter (records, transport_context,
761  *heap_variables.get ());
762  }
763 
764  }
765  }
766 }
767 
769  unsigned char * source, int size, int max_size) const
770 {
771  // decode from back to front
772  for (filters::BufferFilters::const_reverse_iterator i = buffer_filters_.rbegin ();
773  i != buffer_filters_.rend (); ++i)
774  {
775  (*i)->decode (source, size, max_size);
776  }
777 }
778 
780  unsigned char * source, int size, int max_size) const
781 {
782  // encode from front to back
783  for (filters::BufferFilters::const_iterator i = buffer_filters_.begin ();
784  i != buffer_filters_.end (); ++i)
785  {
786  (*i)->encode (source, size, max_size);
787  }
788 }
789 
790 size_t
792  void) const
793 {
794  return filters_.size ();
795 }
796 
797 size_t
799  void) const
800 {
801  return aggregate_filters_.size ();
802 }
803 
804 size_t
806 void) const
807 {
808  return buffer_filters_.size ();
809 }
This class encapsulates an entry in a KnowledgeBase.
JNIEnv * env
The Java environment.
Definition: Acquire_VM.h:58
knowledge::KnowledgeRecord filter(const knowledge::KnowledgeRecord &input, const std::string &name, transport::TransportContext &context) const
Filters an input according to its filter chain.
This class encapsulates attaching and detaching to a VM.
Definition: Acquire_VM.h:24
void clear(uint32_t types)
Clears the list of filters for the specified types.
int32_t type(void) const
returns the size of the value
bool bitmask_check(T mask, T values)
Returns true if mask contains values.
Definition: Utility.inl:356
helper type for specifying template type parameters using a function argument instead of inside expli...
Definition: KnowledgeCast.h:72
This class stores a function definition.
Definition: Functions.h:44
void clear_aggregate_filters(void)
Clears the aggregate filters.
void filter_encode(unsigned char *source, int size, int max_size) const
Calls encode on the the buffer filter chain.
void filter_decode(unsigned char *source, int size, int max_size) const
Calls decode on the the buffer filter chain.
void print_num_filters(void) const
Prints the number of filters chained for each type.
Abstract base class for implementing aggregate record filters via a functor interface.
filters::BufferFilters buffer_filters_
List of buffer filters.
ThreadSafeContext * context_
Context used by this filter.
void attach(ThreadSafeContext *context)
Attaches a context.
This class stores variables and their values for use by any entity needing state information in a thr...
MADARA_EXPORT utility::Refcounter< logger::Logger > global_logger
Abstract base class for implementing individual record filters via a functor interface.
Definition: RecordFilter.h:33
Provides map of data types to a filter chain to apply to the data.
std::vector< KnowledgeRecord > FunctionArguments
Provides context about the transport.
This class stores a function definition.
uint64_t get_receive_bandwidth(void) const
Gets the receive bandwidth in bytes per second.
#define madara_logger_cond_log(conditional, logger, alt_logger_ptr, level,...)
High-performance logger that performs conditional logging based on first arg.
Definition: Logger.h:56
size_t get_number_of_buffer_filters(void) const
Returns the number of buffer filters.
static struct madara::knowledge::tags::string_t string
uint64_t get_current_time(void) const
Gets the current timestamp.
void clear_buffer_filters(void)
Clears the buffer filters.
int64_t get_operation(void) const
Get operation that the context is performing.
AggregateFilters aggregate_filters_
List of aggregate filters.
::std::map< std::string, KnowledgeRecord > KnowledgeMap
void add_record(const std::string &key, const madara::knowledge::KnowledgeRecord &record)
Adds a record to the list that should be appended to send or rebroadcast.
size_t get_number_of_filtered_types(void) const
Returns the number of types that have filters.
uint64_t get_message_time(void) const
Gets the message timestamp.
size_t get_number_of_aggregate_filters(void) const
Returns the number of aggregate update filters.
T bitmask_remove(T mask, T values)
Removes values from a bit mask.
Definition: Utility.inl:365
void operator=(const KnowledgeRecordFilters &rhs)
Assignment operator.
This class encapsulates accessing the global interpreter lock.
Definition: Acquire_GIL.h:24
const std::string & get_domain(void) const
Returns the network domain.
const std::string & get_originator(void) const
Returns the current message originator.
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
Abstract base class for implementing buffer filters via a functor interface.
Definition: BufferFilter.h:26
void add(uint32_t types, knowledge::KnowledgeRecord(*function)(FunctionArguments &, Variables &))
Adds a filter to the list of types.
knowledge::KnowledgeRecord get(const std::string &key, const KnowledgeReferenceSettings &settings=knowledge::KnowledgeReferenceSettings(false))
Retrieves the value of a variable.
Provides an interface for external functions into the MADARA KaRL variable settings.
std::list< Function > FilterChain
a chain of filters
FilterMap filters_
Container for mapping types to filter chains.
uint64_t get_send_bandwidth(void) const
Gets the send/rebroadcast bandwidth in bytes per second.