MADARA  3.4.1
Threader.cpp
Go to the documentation of this file.
1 #include "Threader.h"
4 
5 #ifdef _MADARA_JAVA_
6 
7 #include "java/JavaThread.h"
8 
9 #endif // MADARA_JAVA
10 
12 
14  : data_(std::move(data_plane))
15 {
16 }
17 
19 {
20  terminate();
21  wait();
22 }
23 
25 {
26  NamedWorkerThreads::iterator found = threads_.find(name);
27 
28  if (found != threads_.end())
29  {
30  control_.set(name + ".paused", knowledge::KnowledgeRecord::Integer(1));
31  }
32 }
33 
35 {
36  for (NamedWorkerThreads::iterator i = threads_.begin(); i != threads_.end();
37  ++i)
38  {
39  control_.set(i->first + ".paused", knowledge::KnowledgeRecord::Integer(1));
40  }
41 }
42 
44 {
45  NamedWorkerThreads::iterator found = threads_.find(name);
46 
47  if (found != threads_.end())
48  {
49  control_.set(name + ".paused", knowledge::KnowledgeRecord::Integer(0));
50  }
51 }
52 
54 {
55  for (NamedWorkerThreads::iterator i = threads_.begin(); i != threads_.end();
56  ++i)
57  {
58  control_.set(i->first + ".paused", knowledge::KnowledgeRecord::Integer(0));
59  }
60 }
61 
63  const std::string& name, BaseThread* thread, bool paused)
64 {
65  if (name != "" && thread != 0)
66  {
67  std::unique_ptr<WorkerThread> worker(
68  new WorkerThread(name, thread, control_, data_));
69 
70  if (paused)
71  thread->paused = 1;
72 
73  if (debug_)
74  worker->debug_ = 1;
75 
76  (threads_[name] = std::move(worker))->run();
77  }
78  else if (thread != 0 && name == "")
79  {
80  delete thread;
81 
83  "Threader::run: named thread has an empty name. Deleting new thread.");
84 
86  "Threader::run: named thread has an empty name. Deleting new thread.");
87  }
88  else
89  {
91  "Threader::run: named thread has an empty name.");
92 
94  "Threader::run: named thread has an empty name.");
95  }
96 }
97 
98 #ifdef _MADARA_JAVA_
99 
101  const std::string& name, jobject thread, bool paused)
102 {
103  if (name != "" && thread != 0)
104  {
105  // attempt to create a Java Thread
106  JavaThread* new_thread = JavaThread::create(thread);
107 
108  // if successful, run the thread
109  if (new_thread)
110  {
111  run(name, new_thread, paused);
112  }
113  }
114 }
115 
117  double hertz, const std::string& name, jobject thread, bool paused)
118 {
119  if (name != "" && thread != 0)
120  {
121  // attempt to create a Java Thread
122  JavaThread* new_thread = JavaThread::create(thread);
123 
124  // if successful, run the thread
125  if (new_thread)
126  run(hertz, name, new_thread, paused);
127  }
128 }
129 
130 #endif // _MADARA_JAVA_
131 
133  double hertz, const std::string& name, BaseThread* thread, bool paused)
134 {
135  if (name != "" && thread != 0)
136  {
137  std::unique_ptr<WorkerThread> worker(
138  new WorkerThread(name, thread, control_, data_, hertz));
139 
140  if (paused)
141  thread->paused = 1;
142 
143  if (debug_)
144  worker->debug_ = 1;
145 
146  (threads_[name] = std::move(worker))->run();
147  }
148  else if (thread != 0 && name == "")
149  {
150  delete thread;
151 
153  "Threader::run: named thread has an empty name. Deleting new thread.");
154 
156  "Threader::run: named thread has an empty name. Deleting new thread.");
157  }
158  else
159  {
161  "Threader::run: named thread has an empty name.");
162 
164  "Threader::run: named thread has an empty name.");
165  }
166 }
167 
169  knowledge::KnowledgeBase& data_plane)
170 {
171  data_ = std::move(data_plane);
172 }
173 
175 {
176  NamedWorkerThreads::iterator found = threads_.find(name);
177 
178  if (found != threads_.end())
179  {
180  control_.set(name + ".terminated", knowledge::KnowledgeRecord::Integer(1));
181  }
182 }
183 
185 {
186  for (NamedWorkerThreads::iterator i = threads_.begin(); i != threads_.end();
187  ++i)
188  {
189  control_.set(
190  i->first + ".terminated", knowledge::KnowledgeRecord::Integer(1));
191  }
192 }
193 
195  const std::string& name, const knowledge::WaitSettings& ws)
196 {
197  bool result(false);
198 
199 #ifndef _MADARA_NO_KARL_
200  NamedWorkerThreads::iterator found = threads_.find(name);
201 
202  if (found != threads_.end())
203  {
204  std::string condition = found->second->finished_.get_name();
205 
206  result = this->control_.wait(condition, ws).is_true();
207 
208  if (result)
209  {
210  threads_.erase(found);
211  }
212  }
213 #endif // _MADARA_NO_KARL_
214 
215  return result;
216 }
217 
219 {
220  bool result(false);
221 
222 #ifndef _MADARA_NO_KARL_
223  std::stringstream condition;
224 
225  NamedWorkerThreads::iterator i = threads_.begin();
226 
227  // create a condition with the first thread's finished state
228  if (i != threads_.end())
229  {
230  condition << i->second->finished_.get_name();
231  ++i;
232  }
233 
234  // add each other thread to the condition
235  for (; i != threads_.end(); ++i)
236  {
237  condition << "&&";
238  condition << i->second->finished_.get_name();
239  }
240 
241  if (threads_.size() > 0)
242  {
243  result = this->control_.wait(condition.str(), ws).is_true();
244  }
245 
246  if (result)
247  {
248  threads_.clear();
249  }
250 #endif // _MADARA_NO_KARL_
251 
252  return result;
253 }
254 
256  const std::string& name, const knowledge::WaitSettings& ws)
257 {
258  bool result(false);
259 
260 #ifndef _MADARA_NO_KARL_
261  NamedWorkerThreads::iterator found = threads_.find(name);
262 
263  if (found != threads_.end())
264  {
265  std::string condition = "!";
266  condition += found->second->running_.get_name();
267 
268  result = this->control_.wait(condition, ws).is_true();
269  }
270 #endif // _MADARA_NO_KARL_
271 
272  return result;
273 }
274 
276  const knowledge::WaitSettings& ws)
277 {
278  bool result(false);
279 
280 #ifndef _MADARA_NO_KARL_
281  std::stringstream condition;
282 
283  NamedWorkerThreads::iterator i = threads_.begin();
284 
285  // create a condition with the first thread's finished state
286  if (i != threads_.end())
287  {
288  condition << "!" <<
289  i->second->running_.get_name();
290  ++i;
291  }
292 
293  // add each other thread to the condition
294  for (; i != threads_.end(); ++i)
295  {
296  condition << "&& !";
297  condition << i->second->running_.get_name();
298  }
299 
300  if (threads_.size() > 0)
301  {
302  result = this->control_.wait(condition.str(), ws).is_true();
303  }
304 
305 #endif // _MADARA_NO_KARL_
306 
307  return result;
308 }
#define madara_logger_ptr_log(loggering, level,...)
Fast version of the madara::logger::log method for Logger pointers.
Definition: Logger.h:41
An exception for general thread-related errors.
This class provides a distributed knowledge base to users.
Definition: KnowledgeBase.h:45
Abstract base class for implementing threads.
Definition: BaseThread.h:39
madara::knowledge::containers::Integer paused
thread safe paused flag that may be set by the Threader
Definition: BaseThread.h:101
A facade for a user-defined Java thread class.
Definition: JavaThread.h:28
static JavaThread * create(jobject obj)
Creates a JavaThread.
Definition: JavaThread.cpp:120
void pause(void)
Requests all threads to pause.
Definition: Threader.cpp:34
void terminate(void)
Requests all threads to terminate.
Definition: Threader.cpp:184
void run(const std::string &name, BaseThread *thread, bool paused=false)
Starts a new thread and executes the provided user thread once.
Definition: Threader.cpp:62
bool wait_for_paused(const std::string &name, const knowledge::WaitSettings &ws=knowledge::WaitSettings())
Wait for a specific thread to reach paused state.
Definition: Threader.cpp:255
~Threader()
Destructor.
Definition: Threader.cpp:18
Threader()
Default constructor.
Definition: Threader.cpp:11
void resume(void)
Requests all threads to resume (unpause)
Definition: Threader.cpp:53
bool wait(const std::string &name, const knowledge::WaitSettings &ws=knowledge::WaitSettings())
Wait for a specific thread to complete.
Definition: Threader.cpp:194
void set_data_plane(knowledge::KnowledgeBase &data_plane)
Sets the data plane for new threads.
Definition: Threader.cpp:168
A thread that executes BaseThread logic.
Definition: WorkerThread.h:33
constexpr string_t string
T get(const KnowledgeRecord &kr)
Get the value of a KnowlegeRecord.
Definition: GetRecord.h:121
MADARA_EXPORT utility::Refcounter< logger::Logger > global_logger
Encapsulates settings for a wait statement.
Definition: WaitSettings.h:25