MADARA  3.2.3
Threader.cpp
Go to the documentation of this file.
1 #include "Threader.h"
4 
5 #ifdef _MADARA_JAVA_
6 
7 #include "java/JavaThread.h"
8 
9 #endif // MADARA_JAVA
10 
12 {
13 
14 }
15 
17  knowledge::KnowledgeBase data_plane)
18  : data_ (std::move(data_plane))
19 {
20 }
21 
23 {
24  terminate ();
25  wait ();
26 }
27 
28 void
30 {
31  NamedWorkerThreads::iterator found = threads_.find (name);
32 
33  if (found != threads_.end ())
34  {
35  control_.set (name + ".paused", knowledge::KnowledgeRecord::Integer (1));
36  }
37 }
38 
39 void
41 {
42  for (NamedWorkerThreads::iterator i = threads_.begin ();
43  i != threads_.end (); ++i)
44  {
45  control_.set (i->first + ".paused", knowledge::KnowledgeRecord::Integer (1));
46  }
47 }
48 
49 void
51 {
52  NamedWorkerThreads::iterator found = threads_.find (name);
53 
54  if (found != threads_.end ())
55  {
56  control_.set (name + ".paused", knowledge::KnowledgeRecord::Integer (0));
57  }
58 }
59 
60 void
62 {
63  for (NamedWorkerThreads::iterator i = threads_.begin ();
64  i != threads_.end (); ++i)
65  {
66  control_.set (i->first + ".paused", knowledge::KnowledgeRecord::Integer (0));
67  }
68 }
69 
70 void
72  const std::string name, BaseThread * thread, bool paused)
73 {
74  if (name != "" && thread != 0)
75  {
76  std::unique_ptr<WorkerThread> worker (new WorkerThread (name, thread,
77  control_, data_));
78 
79  if (paused)
80  thread->paused = 1;
81 
82  (threads_[name] = std::move (worker))->run ();
83  }
84  else if (thread != 0 && name == "")
85  {
86  delete thread;
87 
89  "Threader::run: named thread has an empty name. Deleting new thread.");
90 
92  "Threader::run: named thread has an empty name. Deleting new thread.");
93  }
94  else
95  {
97  "Threader::run: named thread has an empty name.");
98 
100  "Threader::run: named thread has an empty name.");
101  }
102 }
103 
104 #ifdef _MADARA_JAVA_
105 
106 void
108  const std::string name, jobject thread, bool paused)
109 {
110  if (name != "" && thread != 0)
111  {
112  // attempt to create a Java Thread
113  JavaThread * new_thread = JavaThread::create (thread);
114 
115  // if successful, run the thread
116  if (new_thread)
117  run (name, new_thread, paused);
118  }
119 }
120 
121 
122 void
124  double hertz, const std::string name, jobject thread, bool paused)
125 {
126  if (name != "" && thread != 0)
127  {
128  // attempt to create a Java Thread
129  JavaThread * new_thread = JavaThread::create (thread);
130 
131  // if successful, run the thread
132  if (new_thread)
133  run (hertz, name, new_thread, paused);
134  }
135 }
136 
137 #endif // _MADARA_JAVA_
138 
139 void
141  const std::string name, BaseThread * thread, bool paused)
142 {
143  if (name != "" && thread != 0)
144  {
145  std::unique_ptr<WorkerThread> worker (new WorkerThread (name, thread,
146  control_, data_, hertz));
147 
148  if (paused)
149  thread->paused = 1;
150 
151  (threads_[name] = std::move (worker))->run ();
152  }
153  else if (thread != 0 && name == "")
154  {
155  delete thread;
156 
158  "Threader::run: named thread has an empty name. Deleting new thread.");
159 
161  "Threader::run: named thread has an empty name. Deleting new thread.");
162  }
163  else
164  {
166  "Threader::run: named thread has an empty name.");
167 
169  "Threader::run: named thread has an empty name.");
170  }
171 }
172 
174  knowledge::KnowledgeBase data_plane)
175 {
176  data_ = std::move(data_plane);
177 }
178 
179 void
181 {
182  NamedWorkerThreads::iterator found = threads_.find (name);
183 
184  if (found != threads_.end ())
185  {
186  control_.set (name + ".terminated", knowledge::KnowledgeRecord::Integer (1));
187  }
188 }
189 
190 void
192 {
193  for (NamedWorkerThreads::iterator i = threads_.begin ();
194  i != threads_.end (); ++i)
195  {
196  control_.set (i->first + ".terminated", knowledge::KnowledgeRecord::Integer (1));
197  }
198 }
199 
200 bool
202  const knowledge::WaitSettings & ws)
203 {
204  bool result (false);
205 
206 #ifndef _MADARA_NO_KARL_
207  NamedWorkerThreads::iterator found = threads_.find (name);
208 
209  if (found != threads_.end ())
210  {
211  std::string condition = found->second->finished_.get_name ();
212 
213  result = this->control_.wait (condition, ws).is_true ();
214 
215  if (result)
216  {
217  threads_.erase (found);
218  }
219  }
220 #endif // _MADARA_NO_KARL_
221 
222  return result;
223 }
224 
225 bool
227 {
228  bool result (false);
229 
230 #ifndef _MADARA_NO_KARL_
231  std::stringstream condition;
232 
233  NamedWorkerThreads::iterator i = threads_.begin ();
234 
235  // create a condition with the first thread's finished state
236  if (i != threads_.end ())
237  {
238  condition << i->second->finished_.get_name ();
239  ++i;
240  }
241 
242  // add each other thread to the condition
243  for (; i != threads_.end (); ++i)
244  {
245  condition << "&&";
246  condition << i->second->finished_.get_name ();
247  }
248 
249  if (threads_.size () > 0)
250  {
251  result = this->control_.wait (condition.str (), ws).is_true ();
252  }
253 
254  if (result)
255  {
256  threads_.clear ();
257  }
258 #endif // _MADARA_NO_KARL_
259 
260  return result;
261 }
bool is_true(void) const
Checks to see if the record is true.
madara::knowledge::containers::Integer paused
thread safe paused flag that may be set by the Threader
Definition: BaseThread.h:107
STL namespace.
MADARA_EXPORT utility::Refcounter< logger::Logger > global_logger
void terminate(void)
Requests all threads to terminate.
Definition: Threader.cpp:191
knowledge::KnowledgeBase control_
The control plane used by threads for termination and pause information.
Definition: Threader.h:234
Threader()
Default constructor.
Definition: Threader.cpp:11
NamedWorkerThreads threads_
the threads that are still active
Definition: Threader.h:239
void run(const std::string name, BaseThread *thread, bool paused=false)
Starts a new thread and executes the provided user thread once.
Definition: Threader.cpp:71
static struct madara::knowledge::tags::string_t string
void pause(void)
Requests all threads to pause.
Definition: Threader.cpp:40
Abstract base class for implementing threads.
Definition: BaseThread.h:38
madara::knowledge::KnowledgeRecord wait(const std::string &expression, const WaitSettings &settings=WaitSettings())
Waits for an expression to be non-zero.
knowledge::KnowledgeBase data_
The data plane used by threads.
Definition: Threader.h:227
#define madara_logger_ptr_log(logger, level,...)
Fast version of the madara::logger::log method for Logger pointers.
Definition: Logger.h:32
A thread that executes BaseThread logic.
Definition: WorkerThread.h:33
This class provides a distributed knowledge base to users.
Definition: KnowledgeBase.h:45
~Threader()
Destructor.
Definition: Threader.cpp:22
An exception for general thread-related errors.
void resume(void)
Requests all threads to resume (unpause)
Definition: Threader.cpp:61
int set(const VariableReference &variable, const std::string &value, const EvalSettings &settings=EvalSettings(false, false, true, false, false))
Atomically sets the value of a variable to a string.
void set_data_plane(knowledge::KnowledgeBase data_plane)
Sets the data plane for new threads.
Definition: Threader.cpp:173
Encapsulates settings for a wait statement.
Definition: WaitSettings.h:23
bool wait(const std::string name, const knowledge::WaitSettings &ws=knowledge::WaitSettings())
Wait for a specific thread to complete.
Definition: Threader.cpp:201
A facade for a user-defined Java thread class.
Definition: JavaThread.h:26
static JavaThread * create(jobject obj)
Creates a JavaThread.
Definition: JavaThread.cpp:121