MADARA  3.4.1
TransportSettings.cpp
Go to the documentation of this file.
1 #include "TransportSettings.h"
2 #include "Fragmentation.h"
6 
9 
11 {
12  if (NO_TRANSPORT == id)
13  {
14  return "None";
15  }
16  if (SPLICE == id)
17  {
18  return "Splice DDS";
19  }
20  if (NDDS == id)
21  {
22  return "RTI DDS";
23  }
24  if (UDP == id)
25  {
26  return "UDP";
27  }
28  if (TCP == id)
29  {
30  return "TCP (unsupported)";
31  }
32  if (MULTICAST == id)
33  {
34  return "UDP Multicast";
35  }
36  if (BROADCAST == id)
37  {
38  return "UDP Broadcast";
39  }
40  if (REGISTRY_SERVER == id)
41  {
42  return "UDP Registry Server";
43  }
44  if (REGISTRY_CLIENT == id)
45  {
46  return "UDP Registry Client";
47  }
48  if (ZMQ == id)
49  {
50  return "0MQ";
51  }
52 
53  // otherwise, it's a custom transport
54  return "Custom";
55 }
56 
58  const TransportSettings& settings)
59  : write_domain(settings.write_domain),
60  read_threads(settings.read_threads),
61  queue_length(settings.queue_length),
62  type(settings.type),
63  max_fragment_size(settings.max_fragment_size),
64  resend_attempts(settings.resend_attempts),
65  fragment_queue_length(settings.fragment_queue_length),
66  reliability(settings.reliability),
67  id(settings.id),
68  processes(settings.processes),
69  on_data_received_logic(settings.on_data_received_logic),
70  delay_launch(settings.delay_launch),
71  never_exit(settings.never_exit),
72  send_reduced_message_header(settings.send_reduced_message_header),
73  slack_time(settings.slack_time),
74  read_thread_hertz(settings.read_thread_hertz),
75  max_send_hertz(settings.max_send_hertz),
76  hosts(),
77  no_sending(settings.no_sending),
78  no_receiving(settings.no_receiving),
79  send_history(settings.send_history),
80  debug_to_kb_prefix(settings.debug_to_kb_prefix),
81  read_domains_(settings.read_domains_)
82 {
83  hosts.resize(settings.hosts.size());
84  for (unsigned int i = 0; i < settings.hosts.size(); ++i)
85  hosts[i] = settings.hosts[i];
86 }
87 
89  const TransportSettings& settings)
90 {
91  read_threads = settings.read_threads;
92  write_domain = settings.write_domain;
93  read_domains_ = settings.read_domains_;
94  queue_length = settings.queue_length;
95  type = settings.type;
96  max_fragment_size = settings.max_fragment_size;
97  resend_attempts = settings.resend_attempts;
98  fragment_queue_length = settings.fragment_queue_length;
99  reliability = settings.reliability;
100  id = settings.id;
101  processes = settings.processes;
102 
103  on_data_received_logic = settings.on_data_received_logic;
104  delay_launch = settings.delay_launch;
105  never_exit = settings.never_exit;
106 
107  send_reduced_message_header = settings.send_reduced_message_header;
108  slack_time = settings.slack_time;
109  read_thread_hertz = settings.read_thread_hertz;
110  max_send_hertz = settings.max_send_hertz;
111 
112  hosts.resize(settings.hosts.size());
113  for (unsigned int i = 0; i < settings.hosts.size(); ++i)
114  hosts[i] = settings.hosts[i];
115 
116  no_sending = settings.no_sending;
117  no_receiving = settings.no_receiving;
118 
119  send_history = settings.send_history;
120 
121  debug_to_kb_prefix = settings.debug_to_kb_prefix;
122 }
123 
125 {
126  for (OriginatorFragmentMap::iterator originator = fragment_map.begin();
127  originator != fragment_map.end(); ++originator)
128  {
129  for (ClockFragmentMap::iterator clock = originator->second.begin();
130  clock != originator->second.end(); ++clock)
131  {
132  delete_fragments(clock->second);
133  }
134  }
135 
136  hosts.clear();
137 }
138 
140  const std::string & host)
141 {
142  hosts.push_back(host);
143 }
144 
146 {
147  hosts.clear();
148 }
149 
151  const std::string& filename, const std::string& prefix)
152 {
154  knowledge.load_context(filename);
155 
157 
158  value = knowledge.get(prefix + ".read_threads");
159  if (value.exists())
160  {
161  read_threads = (uint32_t)value.to_integer();
162  }
163 
164  value = knowledge.get(prefix + ".write_domain");
165  if (value.exists())
166  {
167  write_domain = value.to_string();
168  }
169 
170  value = knowledge.get(prefix + ".queue_length");
171  if (value.exists())
172  {
173  queue_length = (uint32_t)value.to_integer();
174  }
175 
176  value = knowledge.get(prefix + ".type");
177  if (value.exists())
178  {
179  type = (uint32_t)value.to_integer();
180  }
181 
182  value = knowledge.get(prefix + ".max_fragment_size");
183  if (value.exists())
184  {
185  max_fragment_size = (uint32_t)value.to_integer();
186  }
187 
188  value = knowledge.get(prefix + ".resend_attempts");
189  if (value.exists())
190  {
191  resend_attempts = (uint32_t)value.to_integer();
192  }
193 
194  value = knowledge.get(prefix + ".fragment_queue_length");
195  if (value.exists())
196  {
197  fragment_queue_length = (uint32_t)value.to_integer();
198  }
199 
200  value = knowledge.get(prefix + ".reliability");
201  if (value.exists())
202  {
203  reliability = (uint32_t)value.to_integer();
204  }
205 
206  value = knowledge.get(prefix + ".id");
207  if (value.exists())
208  {
209  id = (uint32_t)value.to_integer();
210  }
211 
212  value = knowledge.get(prefix + ".processes");
213  if (value.exists())
214  {
215  processes = (uint32_t)value.to_integer();
216  }
217 
218  value = knowledge.get(prefix + ".on_data_received_logic");
219  if (value.exists())
220  {
221  on_data_received_logic = value.to_string();
222  }
223 
224  value = knowledge.get(prefix + ".delay_launch");
225  if (value.exists())
226  {
227  delay_launch = value.is_true();
228  }
229 
230  value = knowledge.get(prefix + ".never_exit");
231  if (value.exists())
232  {
233  never_exit = value.is_true();
234  }
235 
236  value = knowledge.get(prefix + ".send_reduced_message_header");
237  if (value.exists())
238  {
239  send_reduced_message_header = value.is_true();
240  }
241 
242  value = knowledge.get(prefix + ".slack_time");
243  if (value.exists())
244  {
245  slack_time = value.to_double();
246  }
247 
248  value = knowledge.get(prefix + ".max_send_hertz");
249  if (value.exists())
250  {
251  max_send_hertz = value.to_double();
252  }
253 
254  value = knowledge.get(prefix + ".hosts.size");
255  if (value.exists())
256  {
257  containers::StringVector kb_hosts(prefix + ".hosts", knowledge);
258 
259  hosts.resize(kb_hosts.size());
260  for (unsigned int i = 0; i < hosts.size(); ++i)
261  hosts[i] = kb_hosts[i];
262 
263  }
264 
265  containers::Map kb_read_domains(prefix + ".read_domains", knowledge);
266 
267  std::vector<std::string> keys;
268  kb_read_domains.keys(keys);
269 
270  for (unsigned int i = 0; i < keys.size(); ++i)
271  {
272  read_domains_[keys[i]] = 1;
273  }
274 
275  value = knowledge.get(prefix + ".no_sending");
276  if (value.exists())
277  {
278  no_sending = value.is_true();
279  }
280 
281  value = knowledge.get(prefix + ".no_receiving");
282  if (value.exists())
283  {
284  no_receiving = value.is_true();
285  }
286 
287  value = knowledge.get(prefix + ".debug_to_kb_prefix");
288  if (value.exists())
289  {
290  debug_to_kb_prefix = value.is_true();
291  }
292 }
293 
295  const std::string& filename, const std::string& prefix)
296 {
298 
299 #ifndef _MADARA_NO_KARL_
300 
301  knowledge.evaluate(madara::utility::file_to_string(filename));
302 
303 #endif // end karl support
304 
306 
307  value = knowledge.get(prefix + ".read_threads");
308  if (value.exists())
309  {
310  read_threads = (uint32_t)value.to_integer();
311  }
312 
313  value = knowledge.get(prefix + ".write_domain");
314  if (value.exists())
315  {
316  write_domain = value.to_string();
317  }
318 
319  value = knowledge.get(prefix + ".queue_length");
320  if (value.exists())
321  {
322  queue_length = (uint32_t)value.to_integer();
323  }
324 
325  value = knowledge.get(prefix + ".type");
326  if (value.exists())
327  {
328  type = (uint32_t)value.to_integer();
329  }
330 
331  value = knowledge.get(prefix + ".max_fragment_size");
332  if (value.exists())
333  {
334  max_fragment_size = (uint32_t)value.to_integer();
335  }
336 
337  value = knowledge.get(prefix + ".resend_attempts");
338  if (value.exists())
339  {
340  resend_attempts = (uint32_t)value.to_integer();
341  }
342 
343  value = knowledge.get(prefix + ".fragment_queue_length");
344  if (value.exists())
345  {
346  fragment_queue_length = (uint32_t)value.to_integer();
347  }
348 
349  value = knowledge.get(prefix + ".reliability");
350  if (value.exists())
351  {
352  reliability = (uint32_t)value.to_integer();
353  }
354 
355  value = knowledge.get(prefix + ".id");
356  if (value.exists())
357  {
358  id = (uint32_t)value.to_integer();
359  }
360 
361  value = knowledge.get(prefix + ".processes");
362  if (value.exists())
363  {
364  processes = (uint32_t)value.to_integer();
365  }
366 
367  value = knowledge.get(prefix + ".on_data_received_logic");
368  if (value.exists())
369  {
370  on_data_received_logic = value.to_string();
371  }
372 
373  value = knowledge.get(prefix + ".delay_launch");
374  if (value.exists())
375  {
376  delay_launch = value.is_true();
377  }
378 
379  value = knowledge.get(prefix + ".never_exit");
380  if (value.exists())
381  {
382  never_exit = value.is_true();
383  }
384 
385  value = knowledge.get(prefix + ".send_reduced_message_header");
386  if (value.exists())
387  {
388  send_reduced_message_header = value.is_true();
389  }
390 
391  value = knowledge.get(prefix + ".slack_time");
392  if (value.exists())
393  {
394  slack_time = value.to_double();
395  }
396 
397  value = knowledge.get(prefix + ".read_thread_hertz");
398  if (value.exists())
399  {
400  read_thread_hertz = value.to_double();
401  }
402 
403  value = knowledge.get(prefix + ".max_send_hertz");
404  if (value.exists())
405  {
406  max_send_hertz = value.to_double();
407  }
408 
409  value = knowledge.get(prefix + ".hosts.size");
410  if (value.exists())
411  {
412  containers::StringVector kb_hosts(prefix + ".hosts", knowledge);
413 
414  hosts.resize(kb_hosts.size());
415  for (unsigned int i = 0; i < hosts.size(); ++i)
416  hosts[i] = kb_hosts[i];
417 
418  }
419 
420  containers::Map kb_read_domains(prefix + ".read_domains", knowledge);
421 
422  std::vector<std::string> keys;
423  kb_read_domains.keys(keys);
424 
425  for (unsigned int i = 0; i < keys.size(); ++i)
426  {
427  read_domains_[keys[i]] = 1;
428  }
429 
430  value = knowledge.get(prefix + ".no_sending");
431  if (value.exists())
432  {
433  no_sending = value.is_true();
434  }
435 
436  value = knowledge.get(prefix + ".no_receiving");
437  if (value.exists())
438  {
439  no_receiving = value.is_true();
440  }
441 
442  value = knowledge.get(prefix + ".debug_to_kb_prefix");
443  if (value.exists())
444  {
445  debug_to_kb_prefix = value.is_true();
446  }
447 }
448 
450  const std::string& filename, const std::string& prefix) const
451 {
453 
454  // load what exists at the file so we can append/overwrite it
455  knowledge.load_context(madara::utility::file_to_string(filename));
456 
457  containers::StringVector kb_hosts(
458  prefix + ".hosts", knowledge, (int)hosts.size());
459 
460  knowledge.set(prefix + ".read_threads", Integer(read_threads));
461  knowledge.set(prefix + ".write_domain", write_domain);
462  knowledge.set(prefix + ".queue_length", Integer(queue_length));
463  knowledge.set(prefix + ".type", Integer(type));
464  knowledge.set(prefix + ".max_fragment_size", Integer(max_fragment_size));
465  knowledge.set(prefix + ".resend_attempts", Integer(resend_attempts));
466  knowledge.set(
467  prefix + ".fragment_queue_length", Integer(fragment_queue_length));
468  knowledge.set(prefix + ".reliability", Integer(reliability));
469  knowledge.set(prefix + ".id", Integer(id));
470  knowledge.set(prefix + ".processes", Integer(processes));
471 
472  knowledge.set(prefix + ".on_data_received_logic", on_data_received_logic);
473  knowledge.set(prefix + ".delay_launch", Integer(delay_launch));
474  knowledge.set(prefix + ".never_exit", Integer(never_exit));
475 
476  knowledge.set(prefix + ".send_reduced_message_header",
477  Integer(send_reduced_message_header));
478  knowledge.set(prefix + ".slack_time", slack_time);
479  knowledge.set(prefix + ".read_thread_hertz", read_thread_hertz);
480  knowledge.set(prefix + ".max_send_hertz", max_send_hertz);
481 
482  for (size_t i = 0; i < hosts.size(); ++i)
483  kb_hosts.set(i, hosts[i]);
484 
485  knowledge.set(prefix + ".no_sending", Integer(no_sending));
486  knowledge.set(prefix + ".no_receiving", Integer(no_receiving));
487  knowledge.set(prefix + ".debug_to_kb_prefix", debug_to_kb_prefix);
488 
489  knowledge::containers::Map kb_read_domains(
490  prefix + ".read_domains", knowledge);
491  for (std::map<std::string, int>::const_iterator i = read_domains_.begin();
492  i != read_domains_.end(); ++i)
493  {
494  kb_read_domains.set(
495  i->first, (knowledge::KnowledgeRecord::Integer)i->second);
496  }
497 
498  knowledge.save_context(filename);
499 }
500 
502  const std::string& filename, const std::string& prefix) const
503 {
505 
506  // load what exists at the file so we can append/overwrite it
507 #ifndef _MADARA_NO_KARL_
508 
509  knowledge.evaluate(madara::utility::file_to_string(filename));
510 
511 #endif // end karl support
512 
513 
514  containers::StringVector kb_hosts(
515  prefix + ".hosts", knowledge, (int)hosts.size());
516 
517  knowledge.set(prefix + ".read_threads", Integer(read_threads));
518  knowledge.set(prefix + ".write_domain", write_domain);
519  knowledge.set(prefix + ".queue_length", Integer(queue_length));
520  knowledge.set(prefix + ".type", Integer(type));
521  knowledge.set(prefix + ".max_fragment_size", Integer(max_fragment_size));
522  knowledge.set(prefix + ".resend_attempts", Integer(resend_attempts));
523  knowledge.set(
524  prefix + ".fragment_queue_length", Integer(fragment_queue_length));
525  knowledge.set(prefix + ".reliability", Integer(reliability));
526  knowledge.set(prefix + ".id", Integer(id));
527  knowledge.set(prefix + ".processes", Integer(processes));
528 
529  knowledge.set(prefix + ".on_data_received_logic", on_data_received_logic);
530  knowledge.set(prefix + ".delay_launch", Integer(delay_launch));
531  knowledge.set(prefix + ".never_exit", Integer(never_exit));
532 
533  knowledge.set(prefix + ".send_reduced_message_header",
534  Integer(send_reduced_message_header));
535  knowledge.set(prefix + ".slack_time", slack_time);
536  knowledge.set(prefix + ".read_thread_hertz", read_thread_hertz);
537  knowledge.set(prefix + ".max_send_hertz", max_send_hertz);
538 
539  for (size_t i = 0; i < hosts.size(); ++i)
540  kb_hosts.set(i, hosts[i]);
541 
542  knowledge.set(prefix + ".no_sending", Integer(no_sending));
543  knowledge.set(prefix + ".no_receiving", Integer(no_receiving));
544  knowledge.set(prefix + ".debug_to_kb_prefix", debug_to_kb_prefix);
545 
546  knowledge::containers::Map kb_read_domains(
547  prefix + ".read_domains", knowledge);
548  for (std::map<std::string, int>::const_iterator i = read_domains_.begin();
549  i != read_domains_.end(); ++i)
550  {
551  kb_read_domains.set(
552  i->first, (knowledge::KnowledgeRecord::Integer)i->second);
553  }
554 
555  knowledge.save_as_karl(filename);
556 }
madara::knowledge::KnowledgeRecord::Integer Integer
This class provides a distributed knowledge base to users.
Definition: KnowledgeBase.h:45
This class encapsulates an entry in a KnowledgeBase.
std::string to_string(const std::string &delimiter=", ") const
converts the value to a string.
double to_double(void) const
converts the value to a float/double.
bool is_true(void) const
Checks to see if the record is true.
bool exists(void) const
Checks if record exists (i.e., is not uncreated)
Integer to_integer(void) const
converts the value to an integer.
This class stores a map of strings to KaRL variables.
Definition: Map.h:33
int set(const std::string &key, madara::knowledge::KnowledgeRecord::Integer value=madara::knowledge::KnowledgeRecord::MODIFIED)
Sets a location within the map to the specified value.
Definition: Map.cpp:582
void keys(std::vector< std::string > &curkeys) const
Returns the keys within the map.
Definition: Map.cpp:501
This class stores a vector of strings inside of KaRL.
Definition: StringVector.h:32
int set(size_t index, const type &value)
Sets a knowledge variable to a specified value.
size_t size(void) const
Returns the size of the local vector.
Holds basic transport settings.
double max_send_hertz
Maximum rate of sending messages.
uint32_t fragment_queue_length
Indicates queue length for holding clock-keyed fragments.
bool no_receiving
if true, never receive over transport
virtual void save_text(const std::string &filename, const std::string &prefix="transport") const
Saves the settings from a text file.
uint32_t max_fragment_size
Maximum allowed fragment size for partitioning large messages.
bool delay_launch
Delay launching transports until explicit activate call.
uint32_t reliability
Reliability required of the transport.
uint32_t id
The id of this process (DEPRECATED). You do not need to set this.
uint32_t processes
Number of processes (DEPRECATED). You do not need to set this.
TransportSettings()=default
Constructor for this class.
std::string on_data_received_logic
Logic to be evaluated after every successful update.
virtual void load_text(const std::string &filename, const std::string &prefix="transport")
Loads the settings from a text file.
std::vector< std::string > hosts
Host information for transports that require it.
uint32_t queue_length
Length of the buffer used to store history of events.
bool send_history
if true, send all updates since last send, for records that have history enabled (if the history capa...
uint32_t type
Type of transport. See madara::transport::Types for options.
bool never_exit
Prevent MADARA from exiting on fatal errors and invalid state.
std::map< std::string, int > read_domains_
Any acceptable read domain is added here.
void operator=(const TransportSettings &settings)
Assignment operator.
uint32_t read_threads
the number of read threads to start
void clear_hosts(void)
Safely clears hosts from the settings.
virtual void save(const std::string &filename, const std::string &prefix="transport") const
Saves the settings from a binary file.
std::string debug_to_kb_prefix
if not empty, save debug information to knowledge base at prefix
std::string write_domain
All class members are accessible to users for easy setup.
bool no_sending
if true, never send over transport
int resend_attempts
Maximum number of attempts to resend if transport is busy.
double read_thread_hertz
Number of valid messages allowed to be received per second.
double slack_time
Time to sleep between sends and rebroadcasts.
bool send_reduced_message_header
Send a reduced message header (clock, size, updates, KaRL id)
void add_host(const std::string &host)
Safely add hosts to the settings.
virtual void load(const std::string &filename, const std::string &prefix="transport")
Loads the settings from a binary file.
Provides container classes for fast knowledge base access and mutation.
Definition: Barrier.h:27
constexpr string_t string
Provides functions and classes for the distributed knowledge base.
MADARA_EXPORT std::string types_to_string(int id)
Converts a transport type enum to a string equivalent.
MADARA_EXPORT void delete_fragments(FragmentMap &map)
Deletes fragments within a fragment map and clears the map.
std::string file_to_string(const std::string &filename)
Reads a file into a string.
Definition: Utility.cpp:324