MADARA  3.2.3
TransportSettings.cpp
Go to the documentation of this file.
1 #include "TransportSettings.h"
2 #include "Fragmentation.h"
6 
9 
12 {
13  if (NO_TRANSPORT == id)
14  {
15  return "None";
16  }
17  if (SPLICE == id)
18  {
19  return "Splice DDS";
20  }
21  if (NDDS == id)
22  {
23  return "RTI DDS";
24  }
25  if (UDP == id)
26  {
27  return "UDP";
28  }
29  if (TCP == id)
30  {
31  return "TCP (unsupported)";
32  }
33  if (MULTICAST == id)
34  {
35  return "UDP Multicast";
36  }
37  if (BROADCAST == id)
38  {
39  return "UDP Broadcast";
40  }
41  if (REGISTRY_SERVER == id)
42  {
43  return "UDP Registry Server";
44  }
45  if (REGISTRY_CLIENT == id)
46  {
47  return "UDP Registry Client";
48  }
49  if (ZMQ == id)
50  {
51  return "0MQ";
52  }
53 
54  // otherwise, it's a custom transport
55  return "Custom";
56 }
57 
59  write_domain (DEFAULT_DOMAIN),
60  read_threads (1),
61  queue_length (DEFAULT_QUEUE_LENGTH),
62  type (DEFAULT_TRANSPORT),
63  max_fragment_size (62000),
64  resend_attempts (MAXIMUM_RESEND_ATTEMPTS),
65  fragment_queue_length (5),
66  reliability (DEFAULT_RELIABILITY),
67  id (DEFAULT_ID),
68  processes (DEFAULT_PROCESSES),
69  on_data_received_logic (),
70  delay_launch (false),
71  never_exit (false),
72  send_reduced_message_header (false),
73  slack_time (0),
74  read_thread_hertz (0.0),
75  hosts (),
76  no_sending (false),
77  no_receiving (false)
78 {
79 }
80 
82  const TransportSettings & settings) :
83  write_domain (settings.write_domain),
84  read_threads (settings.read_threads),
85  queue_length (settings.queue_length),
86  type (settings.type),
90  reliability (settings.reliability),
91  id (settings.id),
92  processes (settings.processes),
94  delay_launch (settings.delay_launch),
95  never_exit (settings.never_exit),
97  slack_time (settings.slack_time),
99  hosts (),
100  no_sending (settings.no_sending),
101  no_receiving (settings.no_receiving),
102  read_domains_ (settings.read_domains_)
103 {
104  hosts.resize (settings.hosts.size ());
105  for (unsigned int i = 0; i < settings.hosts.size (); ++i)
106  hosts[i] = settings.hosts[i];
107 }
108 
109 void
111  const TransportSettings & settings)
112 {
113  read_threads = settings.read_threads;
114  write_domain = settings.write_domain;
115  read_domains_ = settings.read_domains_;
116  queue_length = settings.queue_length;
117  type = settings.type;
118  max_fragment_size = settings.max_fragment_size;
119  resend_attempts = settings.resend_attempts;
120  fragment_queue_length = settings.fragment_queue_length;
121  reliability = settings.reliability;
122  id = settings.id;
123  processes = settings.processes;
124 
125  on_data_received_logic = settings.on_data_received_logic;
126  delay_launch = settings.delay_launch;
127  never_exit = settings.never_exit;
128 
129  send_reduced_message_header = settings.send_reduced_message_header;
130  slack_time = settings.slack_time;
131  read_thread_hertz = settings.read_thread_hertz;
132 
133  hosts.resize (settings.hosts.size ());
134  for (unsigned int i = 0; i < settings.hosts.size (); ++i)
135  hosts[i] = settings.hosts[i];
136 
137  no_sending = settings.no_sending;
138  no_receiving = settings.no_receiving;
139 }
140 
142 {
143  for (OriginatorFragmentMap::iterator originator = fragment_map.begin ();
144  originator != fragment_map.end (); ++originator)
145  {
146  for (ClockFragmentMap::iterator clock = originator->second.begin ();
147  clock != originator->second.end (); ++clock)
148  {
149  delete_fragments (clock->second);
150  }
151  }
152 }
153 
154 void
156  const std::string & prefix)
157 {
159  knowledge.load_context (filename);
160 
161  read_threads = (uint32_t) knowledge.get (prefix + ".read_threads").to_integer ();
162  write_domain = knowledge.get (prefix + ".write_domain").to_string ();
163  queue_length = (uint32_t)knowledge.get (prefix + ".queue_length").to_integer ();
164  type = (uint32_t)knowledge.get (prefix + ".type").to_integer ();
165  max_fragment_size = (uint32_t)knowledge.get (prefix + ".max_fragment_size").to_integer ();
166  resend_attempts = (uint32_t)knowledge.get (prefix + ".resend_attempts").to_integer ();
167  fragment_queue_length = (uint32_t)knowledge.get (prefix + ".fragment_queue_length").to_integer ();
168  reliability = (uint32_t)knowledge.get (prefix + ".reliability").to_integer ();
169  id = (uint32_t)knowledge.get (prefix + ".id").to_integer ();
170  processes = (uint32_t)knowledge.get (prefix + ".processes").to_integer ();
171 
172  on_data_received_logic = knowledge.get (prefix + ".on_data_received_logic").to_string ();
173  delay_launch = knowledge.get (prefix + ".delay_launch").is_true ();
174  never_exit = knowledge.get (prefix + ".never_exit").is_true ();
175 
176  send_reduced_message_header = knowledge.get (prefix + ".send_reduced_message_header").is_true ();
177  slack_time = knowledge.get (prefix + ".slack_time").to_double ();
178  read_thread_hertz = knowledge.get (prefix + ".read_thread_hertz").to_double ();
179 
180  containers::StringVector kb_hosts (prefix + ".hosts", knowledge);
181 
182  hosts.resize (kb_hosts.size ());
183  for (unsigned int i = 0; i < hosts.size (); ++i)
184  hosts[i] = kb_hosts[i];
185 
186 
187  containers::Map kb_read_domains (prefix + ".read_domains", knowledge);
188 
189  std::vector <std::string> keys;
190  kb_read_domains.keys (keys);
191 
192  for (unsigned int i = 0; i < keys.size (); ++i)
193  {
194  read_domains_[keys[i]] = 1;
195  }
196 
197  no_sending = knowledge.get (prefix + ".no_sending").is_true ();
198  no_receiving = knowledge.get (prefix + ".no_receiving").is_true ();
199 }
200 
201 void
203  const std::string & prefix)
204 {
206  knowledge.evaluate (madara::utility::file_to_string (filename));
207 
208  read_threads = (uint32_t) knowledge.get (prefix + ".read_threads").to_integer ();
209  write_domain = knowledge.get (prefix + ".write_domain").to_string ();
210  queue_length = (uint32_t)knowledge.get (prefix + ".queue_length").to_integer ();
211  type = (uint32_t)knowledge.get (prefix + ".type").to_integer ();
212  max_fragment_size = (uint32_t)knowledge.get (prefix + ".max_fragment_size").to_integer ();
213  resend_attempts = (uint32_t)knowledge.get (prefix + ".resend_attempts").to_integer ();
214  fragment_queue_length = (uint32_t)knowledge.get (prefix + ".fragment_queue_length").to_integer ();
215  reliability = (uint32_t)knowledge.get (prefix + ".reliability").to_integer ();
216  id = (uint32_t)knowledge.get (prefix + ".id").to_integer ();
217  processes = (uint32_t)knowledge.get (prefix + ".processes").to_integer ();
218 
219  on_data_received_logic = knowledge.get (prefix + ".on_data_received_logic").to_string ();
220  delay_launch = knowledge.get (prefix + ".delay_launch").is_true ();
221  never_exit = knowledge.get (prefix + ".never_exit").is_true ();
222 
223  send_reduced_message_header = knowledge.get (prefix + ".send_reduced_message_header").is_true ();
224  slack_time = knowledge.get (prefix + ".slack_time").to_double ();
225  read_thread_hertz = knowledge.get (prefix + ".read_thread_hertz").to_double ();
226 
227  containers::StringVector kb_hosts (prefix + ".hosts", knowledge);
228 
229  hosts.resize (kb_hosts.size ());
230  for (unsigned int i = 0; i < hosts.size (); ++i)
231  hosts[i] = kb_hosts[i];
232 
233 
234  containers::Map kb_read_domains (prefix + ".read_domains", knowledge);
235 
236  std::vector <std::string> keys;
237  kb_read_domains.keys (keys);
238 
239  for (unsigned int i = 0; i < keys.size (); ++i)
240  {
241  read_domains_[keys[i]] = 1;
242  }
243 
244  no_sending = knowledge.get (prefix + ".no_sending").is_true ();
245  no_receiving = knowledge.get (prefix + ".no_receiving").is_true ();
246 }
247 
248 void
250  const std::string & prefix) const
251 {
253 
254  // load what exists at the file so we can append/overwrite it
255  knowledge.load_context (madara::utility::file_to_string (filename));
256 
257  containers::StringVector kb_hosts (prefix + ".hosts", knowledge,
258  (int)hosts.size ());
259 
260  knowledge.set (prefix + ".read_threads", Integer (read_threads));
261  knowledge.set (prefix + ".write_domain", write_domain);
262  knowledge.set (prefix + ".queue_length", Integer (queue_length));
263  knowledge.set (prefix + ".type", Integer (type));
264  knowledge.set (prefix + ".max_fragment_size", Integer (max_fragment_size));
265  knowledge.set (prefix + ".resend_attempts", Integer (resend_attempts));
266  knowledge.set (prefix + ".fragment_queue_length",
267  Integer (fragment_queue_length));
268  knowledge.set (prefix + ".reliability", Integer (reliability));
269  knowledge.set (prefix + ".id", Integer (id));
270  knowledge.set (prefix + ".processes", Integer (processes));
271 
272  knowledge.set (prefix + ".on_data_received_logic", on_data_received_logic);
273  knowledge.set (prefix + ".delay_launch", Integer (delay_launch));
274  knowledge.set (prefix + ".never_exit", Integer (never_exit));
275 
276  knowledge.set (prefix + ".send_reduced_message_header",
277  Integer (send_reduced_message_header));
278  knowledge.set (prefix + ".slack_time", slack_time);
279  knowledge.set (prefix + ".read_thread_hertz", read_thread_hertz);
280 
281  for (size_t i = 0; i < hosts.size (); ++i)
282  kb_hosts.set (i, hosts[i]);
283 
284  knowledge.set (prefix + ".no_sending", Integer (no_sending));
285  knowledge.set (prefix + ".no_receiving", Integer (no_receiving));
286 
287  knowledge::containers::Map kb_read_domains (prefix + ".read_domains", knowledge);
288  for (std::map <std::string, int>::const_iterator i = read_domains_.begin ();
289  i != read_domains_.end (); ++i)
290  {
291  kb_read_domains.set (i->first,
293  }
294 
295  knowledge.save_context (filename);
296 }
297 
298 void
300  const std::string & prefix) const
301 {
303 
304  // load what exists at the file so we can append/overwrite it
305  knowledge.evaluate (madara::utility::file_to_string (filename));
306 
307  containers::StringVector kb_hosts (prefix + ".hosts", knowledge,
308  (int)hosts.size ());
309 
310  knowledge.set (prefix + ".read_threads", Integer (read_threads));
311  knowledge.set (prefix + ".write_domain", write_domain);
312  knowledge.set (prefix + ".queue_length", Integer (queue_length));
313  knowledge.set (prefix + ".type", Integer (type));
314  knowledge.set (prefix + ".max_fragment_size", Integer (max_fragment_size));
315  knowledge.set (prefix + ".resend_attempts", Integer (resend_attempts));
316  knowledge.set (prefix + ".fragment_queue_length",
317  Integer (fragment_queue_length));
318  knowledge.set (prefix + ".reliability", Integer (reliability));
319  knowledge.set (prefix + ".id", Integer (id));
320  knowledge.set (prefix + ".processes", Integer (processes));
321 
322  knowledge.set (prefix + ".on_data_received_logic", on_data_received_logic);
323  knowledge.set (prefix + ".delay_launch", Integer (delay_launch));
324  knowledge.set (prefix + ".never_exit", Integer (never_exit));
325 
326  knowledge.set (prefix + ".send_reduced_message_header",
327  Integer (send_reduced_message_header));
328  knowledge.set (prefix + ".slack_time", slack_time);
329  knowledge.set (prefix + ".read_thread_hertz", read_thread_hertz);
330 
331  for (size_t i = 0; i < hosts.size (); ++i)
332  kb_hosts.set (i, hosts[i]);
333 
334  knowledge.set (prefix + ".no_sending", Integer (no_sending));
335  knowledge.set (prefix + ".no_receiving", Integer (no_receiving));
336 
337  knowledge::containers::Map kb_read_domains (prefix + ".read_domains", knowledge);
338  for (std::map <std::string, int>::const_iterator i = read_domains_.begin ();
339  i != read_domains_.end (); ++i)
340  {
341  kb_read_domains.set (i->first,
343  }
344 
345  knowledge.save_as_karl (filename);
346 }
virtual void load_text(const std::string &filename, const std::string &prefix="transport")
Loads the settings from a text file.
bool is_true(void) const
Checks to see if the record is true.
#define DEFAULT_ID
Default id in group.
int64_t save_context(const std::string &filename) const
Saves the context to a file.
double to_double(void) const
converts the value to a float/double.
void keys(std::vector< std::string > &curkeys) const
Returns the keys within the map.
Definition: Map.cpp:516
uint32_t fragment_queue_length
Indicates queue length for holding clock-keyed fragments.
std::vector< std::string > hosts
Host information for transports that require it.
double read_thread_hertz
number of valid messages allowed to be received per second.
Holds basic transport settings.
madara::knowledge::KnowledgeRecord::Integer Integer
madara::knowledge::KnowledgeRecord get(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings(false))
Retrieves a knowledge value.
#define MAXIMUM_RESEND_ATTEMPTS
Default number of processes in group.
uint32_t type
Type of transport. See madara::transport::Types for options.
virtual void save(const std::string &filename, const std::string &prefix="transport") const
Saves the settings from a binary file.
Provides container classes for fast knowledge base access and mutation.
Definition: Barrier.h:27
uint32_t id
the id of this process.
This class stores a vector of strings inside of KaRL.
Definition: StringVector.h:31
int set(const std::string &key, madara::knowledge::KnowledgeRecord::Integer value=madara::knowledge::KnowledgeRecord::MODIFIED)
Sets a location within the map to the specified value.
Definition: Map.cpp:605
virtual void save_text(const std::string &filename, const std::string &prefix="transport") const
Saves the settings from a text file.
static struct madara::knowledge::tags::string_t string
uint32_t processes
number of processes expected in the network (best to overestimate if building latency tables ...
bool no_receiving
if true, never receive over transport
std::map< std::string, int > read_domains_
Any acceptable read domain is added here.
#define DEFAULT_DOMAIN
Default knowledge domain.
TransportSettings()
Constructor for this class.
std::string write_domain
All class members are accessible to users for easy setup.
This class stores a map of strings to KaRL variables.
Definition: Map.h:32
This class provides a distributed knowledge base to users.
Definition: KnowledgeBase.h:45
madara::knowledge::KnowledgeRecord::Integer Integer
MADARA_EXPORT std::string types_to_string(int id)
Converts a transport type enum to a string equivalent.
bool no_sending
if true, never send over transport
bool send_reduced_message_header
send the reduced message header (clock, size, updates, KaRL id)
std::string file_to_string(const std::string &filename)
Reads a file into a string.
Definition: Utility.cpp:347
int64_t save_as_karl(const std::string &filename) const
Saves the context to a file as karl assignments, rather than binary.
Integer to_integer(void) const
converts the value to an integer.
std::string on_data_received_logic
logic to be evaluated after every successful update
double slack_time
time to sleep between sends and rebroadcasts
MADARA_EXPORT void delete_fragments(FragmentMap &map)
Deletes fragments within a fragment map and clears the map.
uint32_t read_threads
the number of read threads to start
#define DEFAULT_PROCESSES
Default number of processes in group.
uint32_t queue_length
Length of the buffer used to store history of events.
int set(const VariableReference &variable, const std::string &value, const EvalSettings &settings=EvalSettings(false, false, true, false, false))
Atomically sets the value of a variable to a string.
Provides functions and classes for the distributed knowledge base.
madara::knowledge::KnowledgeRecord evaluate(const std::string &expression, const EvalSettings &settings=EvalSettings())
Evaluates an expression.
uint32_t reliability
Reliability required of the transport.
virtual void load(const std::string &filename, const std::string &prefix="transport")
Loads the settings from a binary file.
int64_t load_context(const std::string &filename, bool use_id=true, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings(true, true, true, false))
Loads the context from a file.
void operator=(const TransportSettings &settings)
Assignment operator.
std::string to_string(const std::string &delimiter=", ") const
converts the value to a string.
bool never_exit
prevent MADARA from exiting on fatal errors and invalid state
uint32_t max_fragment_size
Maximum allowed fragment size for partitioning large messages.
int resend_attempts
Maximum number of attempts to resend if transport is busy.
bool delay_launch
delay launching transports
size_t size(void) const
Returns the size of the local vector.