MADARA  3.4.1
WorkerThread.cpp
Go to the documentation of this file.
1 #include "WorkerThread.h"
4 
5 #ifdef _MADARA_JAVA_
6 #include <jni.h>
7 #include "madara_jni.h"
9 #endif
10 
11 #ifndef _WIN32
12 #include <pthread.h>
13 #endif
14 
15 #include <iostream>
16 #include <algorithm>
17 
18 namespace madara
19 {
20 namespace threads
21 {
24  double hertz)
25  : name_(name), thread_(thread), control_(control), data_(data), hertz_(hertz)
26 {
27  if (thread)
28  {
29  std::stringstream base_string;
30 
31  knowledge::KnowledgeBase* kb = &control;
32  knowledge::KnowledgeRecord debug_to_kb = control_.get(".debug_to_kb");
33  if (debug_to_kb.exists())
34  {
35  base_string << debug_to_kb.to_string() << ".";
36  kb = &data;
37  data.set(debug_to_kb.to_string() + ".hertz", hertz,
39  }
40  base_string << name;
41 
42  thread->name = name;
43  thread->init_control_vars(control);
44 
45  control_.get(".debug_to_kb").to_string();
46 
47  finished_.set_name(base_string.str() + ".finished", control);
48  started_.set_name(base_string.str() + ".started", control);
49  new_hertz_.set_name(base_string.str() + ".hertz", control);
50 
51  executions_.set_name(base_string.str() + ".executions", *kb);
52  start_time_.set_name(base_string.str() + ".start_time", *kb);
53  last_start_time_.set_name(base_string.str() + ".last_start_time", *kb);
54  end_time_.set_name(base_string.str() + ".end_time", *kb);
55 
56  last_duration_.set_name(base_string.str() + ".last_duration", *kb);
57  min_duration_.set_name(base_string.str() + ".min_duration", *kb);
58  max_duration_.set_name(base_string.str() + ".max_duration", *kb);
59 
60  debug_.set_name(base_string.str() + ".debug", control);
61 
62  paused_.set_name(base_string.str() + ".paused", *kb);
63  running_.set_name(base_string.str() + ".running", *kb);
64 
65  finished_ = 0;
66  started_ = 0;
68  }
69 }
70 
72 {
73  try
74  {
75  if (me_.joinable())
76  {
78  "WorkerThread::~WorkerThread(%s):"
79  " thread wasn't joined before destruction\n",
80  name_.c_str());
81  me_.detach();
82  }
83  }
84  catch (const std::system_error& e)
85  {
87  "WorkerThread::~WorkerThread(%s):"
88  " error trying to detach: %s\n",
89  name_.c_str(), e.what());
90  }
91 }
92 
93  /*
94  void
95  WorkerThread::operator= (const WorkerThread & input)
96  {
97  if (this != &input)
98  {
99  this->name_ = input.name_;
100  this->thread_ = input.thread_;
101  this->control_ = input.control_;
102  this->data_ = input.data_;
103  this->finished_ = input.finished_;
104  this->started_ = input.started_;
105  this->new_hertz_ = input.new_hertz_;
106  this->hertz_ = input.hertz_;
107  }
108  }*/
109 
110 #ifndef _WIN32
111 // Call pthread_setname_np if it exists ...
112 template<typename... Args>
113 auto try_pthread_setname_np(Args&&... args)
114  -> decltype(pthread_setname_np(std::forward<Args>(args)...))
115 {
116  return pthread_setname_np(std::forward<Args>(args)...);
117 }
118 
119 // Otherwise, do nothing
121 #endif
122 
124 {
125  try
126  {
127  me_ = std::thread(&WorkerThread::svc, this);
128 
129 #ifndef _WIN32
130  try_pthread_setname_np(me_.native_handle(), name_.substr(0, 15).c_str());
131 #endif
132 
133  std::ostringstream os;
134  os << std::this_thread::get_id() << " spawned " << me_.get_id()
135  << std::endl;
136 
138  "WorkerThread::WorkerThread(%s):"
139  " thread started %s\n",
140  name_.c_str(), os.str().c_str());
141  }
142  catch (const std::exception& e)
143  {
145  "WorkerThread::WorkerThread(%s):"
146  " failed to create thread: %s\n",
147  name_.c_str(), e.what());
148  throw;
149  }
150 }
151 
153 {
155  "WorkerThread(%s)::svc:"
156  " checking thread existence\n",
157  name_.c_str());
158 
159  if (thread_)
160  {
161  started_ = 1;
162 
163 #ifdef _MADARA_JAVA_
164  // try detaching one more time, just to make sure.
165  utility::java::Acquire_VM jvm(false);
166 #endif
167 
168 #if 0
169  madara::logger::Logger::set_thread_name(name_);
170 #endif
171 
172  thread_->init(data_);
173 
174  {
176  utility::TimeValue next_epoch;
177  utility::Duration frequency;
178 
179  // only allow one-way communication of durations. We never read control
180  int64_t min_duration = -1;
181  int64_t max_duration = 0;
182  int64_t last_duration = 0;
183  bool max_duration_changed = true;
184  bool min_duration_changed = true;
185 
186  bool one_shot = true;
187  bool blaster = false;
188 
189  bool debug = debug_.is_true();
190 
191  knowledge::VariableReference terminated;
192 
193  terminated = control_.get_ref(name_ + ".terminated");
194 
195  // change thread frequency
197  hertz_, current, frequency, next_epoch, one_shot, blaster);
198 #if 0
199  madara::logger::Logger::set_thread_hertz(hertz_);
200 #endif
201 
202  if (debug)
203  {
205  }
206 
207  while (control_.get(terminated).is_false())
208  {
210  "WorkerThread(%s)::svc:"
211  " thread checking for pause\n",
212  name_.c_str());
213 
214  if (paused_.is_false())
215  {
216  running_ = 1;
217 
219  "WorkerThread(%s)::svc:"
220  " thread calling run function\n",
221  name_.c_str());
222 
223  try
224  {
225  int64_t start_time = 0, end_time = 0;
226  debug = debug_.is_true();
227 
228  if (debug)
229  {
230  start_time = utility::get_time();
231  ++executions_;
232  } // debug
233 
234  thread_->run();
235 
236  if (debug)
237  {
238  end_time = utility::get_time();
239 
240  // update duration information
241  last_duration = end_time - start_time;
242  if (min_duration == -1 || last_duration < min_duration)
243  {
244  min_duration = last_duration;
245  min_duration_changed = true;
246  }
247  if (last_duration > max_duration)
248  {
249  max_duration = last_duration;
250  max_duration_changed = true;
251  }
252 
253  // lock control plane and update
254  {
255  // write updates to control
257  last_start_time_ = start_time;
258  end_time_ = end_time;
259 
260  last_duration_ = last_duration;
261  if (max_duration_changed)
262  {
263  max_duration_ = max_duration;
264  }
265  if (min_duration_changed)
266  {
267  min_duration_ = min_duration;
268  }
269  } // end lock of control plane
270  } // end if debug
271  } // end try of the run
272  catch (const std::exception& e)
273  {
276  "WorkerThread(%s)::svc:"
277  " exception thrown: %s\n",
278  name_.c_str(), e.what());
279  }
280 
281  running_ = 0;
282  }
283 
284  if (one_shot)
285  break;
286 
287  // check for a change in frequency/hertz
288  if (new_hertz_ != hertz_)
289  {
291  *new_hertz_, current, frequency, next_epoch, one_shot, blaster);
292  }
293 
294  if (!blaster)
295  {
296  current = utility::get_time_value();
297 
299  "WorkerThread(%s)::svc:"
300  " thread checking for next hertz epoch\n",
301  name_.c_str());
302 
303  if (current < next_epoch)
304  utility::sleep(next_epoch - current);
305 
307  "WorkerThread(%s)::svc:"
308  " thread past epoch\n",
309  name_.c_str());
310 
311  next_epoch += frequency;
312  }
313  } // end while !terminated
314 
316  "WorkerThread(%s)::svc:"
317  " thread has been terminated\n",
318  name_.c_str());
319  }
320 
322  "WorkerThread(%s)::svc:"
323  " calling thread cleanup method\n",
324  name_.c_str());
325 
326  thread_->cleanup();
327 
329  "WorkerThread(%s)::svc:"
330  " deleting thread\n",
331  name_.c_str());
332 
334  "WorkerThread(%s)::svc:"
335  " setting finished to 1\n",
336  finished_.get_name().c_str());
337 
338  finished_ = 1;
339  }
340  else
341  {
343  "WorkerThread(%s)::svc:"
344  " thread creation failed\n",
345  name_.c_str());
346  }
347 
348  return 0;
349 }
350 }
351 }
#define madara_logger_ptr_log(loggering, level,...)
Fast version of the madara::logger::log method for Logger pointers.
Definition: Logger.h:41
A thread-safe guard for a context or knowledge base.
Definition: ContextGuard.h:24
This class provides a distributed knowledge base to users.
Definition: KnowledgeBase.h:45
VariableReference get_ref(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings(false))
Atomically returns a reference to the variable.
int set(const VariableReference &variable, const std::string &value, const EvalSettings &settings=EvalSettings(true, false, true, false, false))
Atomically sets the value of a variable to a string.
madara::knowledge::KnowledgeRecord get(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings(false))
Atomically gets the current value of a variable (without any history).
This class encapsulates an entry in a KnowledgeBase.
std::string to_string(const std::string &delimiter=", ") const
converts the value to a string.
bool exists(void) const
Checks if record exists (i.e., is not uncreated)
bool is_false(void) const
Checks to see if the record is false.
Optimized reference to a variable within the knowledge base.
std::string get_name(void) const
Returns the name of the container.
void set_name(const std::string &var_name, KnowledgeBase &knowledge)
Sets the variable name that this refers to.
Definition: Double.cpp:121
bool is_false(void) const
Determines if the value is zero.
Definition: Integer.inl:246
bool is_true(void) const
Determines if the value is true.
Definition: Integer.inl:226
void set_name(const std::string &var_name, KnowledgeBase &knowledge)
Sets the variable name that this refers to.
Definition: Integer.inl:54
Abstract base class for implementing threads.
Definition: BaseThread.h:39
std::string name
The unique name of your thread.
Definition: BaseThread.h:90
virtual void init_control_vars(knowledge::KnowledgeBase &control)
Initializes the thread implementation's control plane variables.
Definition: BaseThread.h:78
knowledge::containers::Double new_hertz_
thread safe hertz reference
Definition: WorkerThread.h:153
knowledge::containers::Integer end_time_
timestamp of the last svc end
Definition: WorkerThread.h:174
knowledge::containers::Integer max_duration_
maximum duration of all runs
Definition: WorkerThread.h:189
void run(void)
Starts the thread, with entry point svc()
WorkerThread()
Default constructor.
Definition: WorkerThread.h:41
knowledge::containers::Integer start_time_
timestamp of the initial svc start
Definition: WorkerThread.h:164
knowledge::containers::Integer executions_
thread safe start flag that will be sent to the knowledge base on launch of the thread
Definition: WorkerThread.h:159
knowledge::containers::Integer finished_
thread safe finished flag that will be sent to the knowledge base on completion of the thread
Definition: WorkerThread.h:130
knowledge::KnowledgeBase data_
the data plane (the knowledge base)
Definition: WorkerThread.h:124
knowledge::KnowledgeBase control_
the control plane to the knowledge base
Definition: WorkerThread.h:121
knowledge::containers::Integer debug_
flag for whether or not to save debug information in control
Definition: WorkerThread.h:194
knowledge::containers::Integer started_
thread safe start flag that will be sent to the knowledge base on launch of the thread
Definition: WorkerThread.h:148
knowledge::containers::Integer last_duration_
duration of last run
Definition: WorkerThread.h:179
std::unique_ptr< BaseThread > thread_
the contained thread
Definition: WorkerThread.h:118
std::thread me_
Assignment operator.
Definition: WorkerThread.h:88
double hertz_
hertz rate for worker thread executions
Definition: WorkerThread.h:199
madara::knowledge::containers::Integer paused_
thread safe paused flag that will be sent to the knowledge base to indicate thread activities should ...
Definition: WorkerThread.h:136
~WorkerThread() noexcept
Destructor.
std::string name_
the name of the contained thread
Definition: WorkerThread.h:115
madara::knowledge::containers::Integer running_
thread safe paused flag that will be sent to the knowledge base to indicate thread activities should ...
Definition: WorkerThread.h:142
knowledge::containers::Integer min_duration_
minimum duration of all runs
Definition: WorkerThread.h:184
void change_frequency(double hertz, utility::TimeValue &current, utility::Duration &frequency, utility::TimeValue &next_epoch, bool &one_shot, bool &blaster)
Changes the frequency given a hertz rate.
knowledge::containers::Integer last_start_time_
timestamp of the last svc start
Definition: WorkerThread.h:169
This class encapsulates attaching and detaching to a VM.
Definition: Acquire_VM.h:25
constexpr string_t string
T get(const KnowledgeRecord &kr)
Get the value of a KnowlegeRecord.
Definition: GetRecord.h:121
MADARA_EXPORT utility::Refcounter< logger::Logger > global_logger
Provides a quality-of-service-enabled threading library.
Definition: BaseThread.h:28
auto try_pthread_setname_np(Args &&... args) -> decltype(pthread_setname_np(std::forward< Args >(args)...))
std::chrono::time_point< Clock > TimeValue
time point
Definition: Utility.h:36
std::chrono::nanoseconds Duration
default clock duration
Definition: Utility.h:30
double sleep(double sleep_time)
Sleeps for a certain amount of time.
Definition: Utility.cpp:555
int64_t get_time(void)
Returns a time of day in nanoseconds If simtime feature is enabled, this may be simulation time inste...
Definition: Utility.inl:265
TimeValue get_time_value(void)
Returns a time of day as a chrono time value If simtime feature is enabled, this may be simulation ti...
Definition: Utility.inl:256
Copyright(c) 2020 Galois.
static const EvalSettings DELAY_NO_EXPAND
Settings to delay send modifieds and not expand variables.
Definition: EvalSettings.h:48