MADARA  3.4.1
SpliceDDSTransport.cpp
Go to the documentation of this file.
4 #include <time.h>
5 
6 #include <iostream>
7 #include <sstream>
9 
10 namespace logger = madara::logger;
11 
13  "MADARA_KaRL_Data", "MADARA_KaRL_Control"};
14 
15 /* Array to hold the names for all ReturnCodes. */
17  "DDS_RETCODE_OK", "DDS_RETCODE_ERROR", "DDS_RETCODE_UNSUPPORTED",
18  "DDS_RETCODE_BAD_PARAMETER", "DDS_RETCODE_PRECONDITION_NOT_MET",
19  "DDS_RETCODE_OUT_OF_RESOURCES", "DDS_RETCODE_NOT_ENABLED",
20  "DDS_RETCODE_IMMUTABLE_POLICY", "DDS_RETCODE_INCONSISTENT_POLICY",
21  "DDS_RETCODE_ALREADY_DELETED", "DDS_RETCODE_TIMEOUT", "DDS_RETCODE_NO_DATA",
22  "DDS_RETCODE_ILLEGAL_OPERATION"};
23 
25  "Madara_knowledge";
26 
29  bool launch_transport)
30  : madara::transport::Base(id, config, context),
31  domain_(0),
32  domain_factory_(0),
33  domain_participant_(0),
34  publisher_(0),
35  subscriber_(0),
36  datawriter_(0),
37  datareader_(0),
38  update_writer_(0),
39  update_reader_(0),
40  update_topic_(0)
41 {
42  // create a reference to the knowledge base for threading
43  knowledge_.use(context);
44 
45  // set the data plane for the read threads
47 
48  if (launch_transport)
49  setup();
50 }
52 {
53  close();
54 }
55 
57 {
58  this->invalidate_transport();
59 
60  read_threads_.terminate();
61 
62  read_threads_.wait();
63 
64  // if (subscriber_.in ())
65  //{
66  // subscriber_->delete_datareader (update_reader_);
67  //}
68 
69  // if (publisher_.in ())
70  //{
71  // publisher_->delete_datawriter (update_writer_);
72  // publisher_->delete_datawriter (latency_update_writer_);
73  //}
74 
75  // if (domain_participant_.in ())
76  //{
77  // domain_participant_->delete_subscriber (subscriber_);
78  // domain_participant_->delete_publisher (publisher_);
79  // domain_participant_->delete_topic (update_topic_);
80  //}
81 
82  // if (domain_factory_.in ())
83  // domain_factory_->delete_participant (domain_participant_);
84 
85  update_reader_ = 0;
86  update_writer_ = 0;
87  update_topic_ = 0;
88  subscriber_ = 0;
89  publisher_ = 0;
90  domain_participant_ = 0;
91  domain_factory_ = 0;
92 
93  this->shutting_down_ = false;
94 }
95 
97 {
98  return this->settings_.reliability;
99 }
100 
102 {
103  return this->settings_.reliability = setting;
104 }
105 
107 {
108  Base::setup();
109  DDS::ReturnCode_t status;
110 
111  this->is_valid_ = false;
112 
113  // reset the valid setup flag
114  // valid_setup_ = false;
115 
117  "SpliceDDSTransport::setup:"
118  " Creating a participant for topic (%s)\n",
119  madara::utility::dds_topicify(settings_.write_domain).c_str());
120 
122  "SpliceDDSTransport::setup:"
123  " Participant settings are being read from the OSPL_URI environment"
124  " variable\n",
125  madara::utility::dds_topicify(settings_.write_domain).c_str());
126 
127  // get the domain participant factory
128  domain_factory_ = DDS::DomainParticipantFactory::get_instance();
129  domain_factory_->get_default_participant_qos(part_qos_);
130  domain_participant_ = domain_factory_->create_participant(
131  domain_, part_qos_, NULL, DDS::STATUS_MASK_NONE);
132 
133  // if dp == NULL, we've got an error
134  if (domain_participant_ == NULL)
135  {
137  "\nSpliceDDSTransport::setup:"
138  " splice daemon not running. Try 'ospl start'...\n");
139 
140  return -2;
141  }
142 
143  domain_participant_->get_default_topic_qos(topic_qos_);
144 
145  if (madara::transport::RELIABLE == this->settings_.reliability)
146  {
147  topic_qos_.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
148  topic_qos_.history.depth = this->settings_.queue_length;
149  topic_qos_.resource_limits.max_samples_per_instance =
150  this->settings_.queue_length;
151  topic_qos_.resource_limits.max_samples = this->settings_.queue_length;
152  topic_qos_.destination_order.kind =
153  DDS::BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
154  // topic_qos_.
155  }
156  // topic_qos_.resource_limits.max_samples_per_instance= 10;
157  domain_participant_->set_default_topic_qos(topic_qos_);
158 
160  "SpliceDDSTransport::setup:"
161  " Registering type support\n");
162 
163  // Register Update type
164  status = this->update_type_support_.register_type(
165  domain_participant_, "Knowledge::Update");
166  if (int ret = check_status(
167  status, "Knowledge::UpdateTypeSupport::register_type") < 0)
168  return ret;
169 
170  // Register Mutex type
171  // status = this->mutex_type_support_.register_type (
172  // domain_participant_, "Knowledge::Mutex");
173  // check_status(status, "Knowledge::MutexTypeSupport::register_type");
174 
176  "SpliceDDSTransport::setup:"
177  " Setting up knowledge domain via topic (%s)\n",
178  madara::utility::dds_topicify(settings_.write_domain).c_str());
179 
180  // Create Update topic
181  update_topic_ = domain_participant_->create_topic(
182  madara::utility::dds_topicify(settings_.write_domain).c_str(),
183  "Knowledge::Update", topic_qos_, NULL, DDS::STATUS_MASK_NONE);
184 
185  if (int ret =
186  check_handle(update_topic_,
187  "DDS::DomainParticipant::create_topic (KnowledgeUpdate)") < 0)
188  return ret;
189 
190  // Get default qos for publisher
191  status = domain_participant_->get_default_publisher_qos(pub_qos_);
192  if (int ret = check_status(status,
193  "DDS::DomainParticipant::get_default_publisher_qos") < 0)
194  return ret;
195 
196  if (madara::transport::RELIABLE == this->settings_.reliability)
197  {
198  pub_qos_.presentation.access_scope = DDS::TOPIC_PRESENTATION_QOS;
199  pub_qos_.presentation.coherent_access = true;
200  pub_qos_.presentation.ordered_access = false;
201  // topic_qos_.
202  }
203 
205  "SpliceDDSTransport::setup:"
206  " Creating publisher for topic (%s)\n",
207  madara::utility::dds_topicify(settings_.write_domain).c_str());
208 
209  // Create publisher
210  pub_qos_.partition.name.length(1);
211  pub_qos_.partition.name[0] = DDS::string_dup(partition_);
212  publisher_ = domain_participant_->create_publisher(
213  pub_qos_, NULL, DDS::STATUS_MASK_NONE);
214  if (int ret = check_handle(
215  publisher_, "DDS::DomainParticipant::create_publisher") < 0)
216  return ret;
217 
218  // Create subscriber
219  status = domain_participant_->get_default_subscriber_qos(sub_qos_);
220  if (int ret = check_status(status,
221  "DDS::DomainParticipant::get_default_subscriber_qos") < 0)
222  return ret;
223 
224  if (madara::transport::RELIABLE == this->settings_.reliability)
225  {
226  sub_qos_.presentation.access_scope = DDS::TOPIC_PRESENTATION_QOS;
227  sub_qos_.presentation.coherent_access = true;
228  sub_qos_.presentation.ordered_access = false;
229  }
230 
232  "SpliceDDSTransport::setup:"
233  " Creating subscriber for topic (%s)\n",
234  madara::utility::dds_topicify(settings_.write_domain).c_str());
235 
236  sub_qos_.partition.name.length(1);
237  sub_qos_.partition.name[0] = DDS::string_dup(partition_);
238  subscriber_ = domain_participant_->create_subscriber(
239  // sub_qos_, &sub_listener_, DDS::DATA_AVAILABLE_STATUS |
240  // DDS::DATA_ON_READERS_STATUS);
241  sub_qos_, NULL, DDS::STATUS_MASK_NONE);
242  if (int ret = check_handle(subscriber_,
243  "DDS::DomainParticipant::create_subscriber") < 0)
244  return ret;
245 
246  if (!subscriber_ || !publisher_)
247  {
249  "SpliceDDSTransport::setup:"
250  " pub or sub could not be created. Try 'ospl stop; ospl start'...\n");
251 
252  return -2;
253  }
254 
255  // Create datawriter
256  publisher_->get_default_datawriter_qos(datawriter_qos_);
257  publisher_->copy_from_topic_qos(datawriter_qos_, topic_qos_);
258 
259  if (madara::transport::RELIABLE == this->settings_.reliability)
260  {
262  "SpliceDDSTransport::setup:"
263  " Enabling reliable transport for (%s) datawriters\n",
264  madara::utility::dds_topicify(settings_.write_domain).c_str());
265 
266  datawriter_qos_.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
267  datawriter_qos_.history.depth = this->settings_.queue_length;
268  datawriter_qos_.resource_limits.max_samples = this->settings_.queue_length;
269  datawriter_qos_.resource_limits.max_samples_per_instance =
270  this->settings_.queue_length;
271  datawriter_qos_.destination_order.kind =
272  DDS::BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
273  }
274  else
275  {
277  "SpliceDDSTransport::setup:"
278  " Enabling unreliable transport for (%s) datawriters\n",
279  madara::utility::dds_topicify(settings_.write_domain).c_str());
280  }
281 
283  "SpliceDDSTransport::setup:"
284  " Creating datawriter for topic (%s)\n",
285  madara::utility::dds_topicify(settings_.write_domain).c_str());
286 
287  // Create Update writer
288  datawriter_ = publisher_->create_datawriter(
289  update_topic_, datawriter_qos_, NULL, DDS::STATUS_MASK_NONE);
290  if (int ret = check_handle(datawriter_,
291  "DDS::Publisher::create_datawriter (Update)") < 0)
292  return ret;
293  update_writer_ =
294  dynamic_cast<Knowledge::UpdateDataWriter_ptr>(datawriter_.in());
295  if (int ret = check_handle(update_writer_,
296  "Knowledge::UpdateDataWriter_ptr::narrow") < 0)
297  return ret;
298 
299  // Create Latency Update writer for Read Thread
300  latencywriter_ = publisher_->create_datawriter(
301  update_topic_, datawriter_qos_, NULL, DDS::STATUS_MASK_NONE);
302  if (int ret = check_handle(latencywriter_,
303  "DDS::Publisher::create_datawriter (Update)") < 0)
304  return ret;
305  latency_update_writer_ =
306  dynamic_cast<Knowledge::UpdateDataWriter_ptr>(latencywriter_.in());
307  if (int ret = check_handle(latency_update_writer_,
308  "Knowledge::UpdateDataWriter_ptr::narrow") < 0)
309  return ret;
310 
311  // Create datareader
312  status = subscriber_->get_default_datareader_qos(datareader_qos_);
313  subscriber_->copy_from_topic_qos(datareader_qos_, topic_qos_);
314  // publisher_->copy_from_topic_qos(datawriter_qos_, topic_qos_);
315  if (int ret = check_status(
316  status, "DDS::Subscriber::get_default_datareader_qos") < 0)
317  return ret;
318 
319  datareader_qos_.reader_data_lifecycle.enable_invalid_samples = FALSE;
320 
321  if (madara::transport::RELIABLE == this->settings_.reliability)
322  {
324  "SpliceDDSTransport::setup:"
325  " Enabling reliable transport for (%s) datareaders\n",
326  madara::utility::dds_topicify(settings_.write_domain).c_str());
327 
328  datareader_qos_.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
329  datareader_qos_.history.depth = this->settings_.queue_length;
330  datareader_qos_.resource_limits.max_samples = this->settings_.queue_length;
331  datareader_qos_.destination_order.kind =
332  DDS::BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
333 
334  // unlike the other qos, we do not set max_samples_per_instance here.
335  // that shouldn't be as necessary, since we are using take on the reader
336  }
337  else
338  {
340  "SpliceDDSTransport::setup:"
341  " Enabling unreliable transport for (%s) datareaders\n",
342  madara::utility::dds_topicify(settings_.write_domain).c_str());
343  }
344 
346  "SpliceDDSTransport::setup:"
347  " Creating datareader for topic (%s)\n",
348  madara::utility::dds_topicify(settings_.write_domain).c_str());
349 
350  // Create Update datareader
351  datareader_ = subscriber_->create_datareader(update_topic_,
352  // datareader_qos_, &dr_listener_, DDS::STATUS_MASK_NONE);
353  datareader_qos_, NULL, DDS::STATUS_MASK_NONE);
354 
355  // notes: we set the mask to none because the listener will be called
356  // by the subscriber listener with notify_datareaders. This is the Splice
357  // way of doing this, since we require subscription information and they
358  // have so far not implemented on_subscription_matched.
359 
360  if (int ret = check_handle(datareader_,
361  "DDS::Subscriber::create_datareader (Update)") < 0)
362  return ret;
363  update_reader_ =
364  dynamic_cast<Knowledge::UpdateDataReader_ptr>(datareader_.in());
365  if (int ret = check_handle(update_reader_,
366  "Knowledge::UpdateDataReader_ptr::narrow") < 0)
367  return ret;
368 
369  if (!settings_.no_receiving)
370  {
371  double hertz = settings_.read_thread_hertz;
372  if (hertz < 0.0)
373  {
374  hertz = 0.0;
375  }
376 
378  "UdpTransportReadThread::setup:"
379  " starting %d threads at %f hertz\n",
380  settings_.read_threads, hertz);
381 
382  for (uint32_t i = 0; i < settings_.read_threads; ++i)
383  {
384  std::stringstream thread_name;
385  thread_name << "read";
386  thread_name << i;
387 
388  read_threads_.run(hertz, thread_name.str(),
389  new madara::transport::SpliceReadThread(id_, this->settings_,
390  context_, update_reader_, latency_update_writer_, send_monitor_,
391  receive_monitor_, packet_scheduler_));
392  }
393  }
394 
395  return this->validate_transport();
396 }
397 
399  const knowledge::KnowledgeMap& updates)
400 {
401  long result = 0;
402 
403  if (!settings_.no_sending)
404  {
405  result = prep_send(updates, "SpliceDDSTransport::send_data:");
406 
407  // get the maximum quality from the updates
408  uint32_t quality = knowledge::max_quality(updates);
409 
411  unsigned long long cur_clock = context_.get_clock();
412 
413  DDS::ReturnCode_t dds_result;
414  DDS::InstanceHandle_t handle;
415 
416  Knowledge::Update data;
417 
418  data.buffer =
419  Knowledge::seq_oct(result, result, (unsigned char*)buffer_.get_ptr());
420  data.clock = cur_clock;
421  data.quality = quality;
422  data.updates = DDS::ULong(updates.size());
423  data.originator = DDS::string_dup(id_.c_str());
424  data.type = madara::transport::MULTIASSIGN;
425  data.ttl = settings_.get_rebroadcast_ttl();
426  data.timestamp = utility::get_time();
427  data.madara_id = DDS::string_dup(MADARA_IDENTIFIER);
428 
430  "SpliceDDSTransport::send:"
431  " sending multiassignment: %d updates, time=llu, quality=%d\n",
432  data.updates, cur_clock, quality);
433 
434  handle = update_writer_->register_instance(data);
435  dds_result = update_writer_->write(data, handle);
436  result = (long)dds_result;
437  // update_writer_->unregister_instance (data, handle);
438  }
439 
440  return result;
441 }
442 
444  void* handle, const char* info)
445 {
446  if (!handle)
447  {
449  "SpliceDDSTransport::check_handle:"
450  " error in %s: Creation failed: invalid handle\n",
451  info);
452 
453  return -2;
454  }
455 
456  return 0;
457 }
458 
460  DDS::ReturnCode_t status, const char* info)
461 {
462  // if the status is okay, then return without issue
463  if ((status == DDS::RETCODE_OK) || (status == DDS::RETCODE_NO_DATA))
464  return 0;
465 
467  "SpliceDDSTransport::check_status:"
468  " error in %s: Creation failed: %s\n",
469  info, get_error_name(status));
470 
471  return -2;
472 }
473 
475  DDS::ReturnCode_t status)
476 {
477  return ret_code_names[status];
478 }
#define madara_logger_log(loggering, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
#define MADARA_IDENTIFIER
Definition: MessageHeader.h:22
const ThreadSafeContext * context_
void use(ThreadSafeContext &original)
Refer to and use another knowledge base's context.
This class stores variables and their values for use by any entity needing state information in a thr...
void set_data_plane(knowledge::KnowledgeBase &data_plane)
Sets the data plane for new threads.
Definition: Threader.cpp:168
Base class from which all transports must be derived.
Definition: Transport.h:46
virtual int setup(void)
all subclasses should call this method at the end of its setup
Definition: Transport.cpp:32
SpliceDDSTransport(const std::string &id, knowledge::ThreadSafeContext &context, TransportSettings &config, bool launch_transport)
Constructor.
threads::Threader read_threads_
threads for reading knowledge updates
long send_data(const knowledge::KnowledgeMap &updates) override
Sends a list of knowledge updates to listeners.
int setup(void)
Activates this transport.
const char * get_error_name(DDS::ReturnCode_t status)
Returns error name of the specific status.
void close(void)
Closes this transport.
int check_handle(void *handle, const char *info)
Splice handle checker.
int check_status(DDS::ReturnCode_t status, const char *info)
Splice status checker.
knowledge::KnowledgeBase knowledge_
knowledge base for threads to use
int reliability(void) const
Accesses reliability setting.
Thread for reading knowledge updates via waitsets.
Holds basic transport settings.
constexpr string_t string
uint32_t max_quality(const KnowledgeRecords &records)
Returns the maximum quality within the records.
::std::map< std::string, KnowledgeRecord > KnowledgeMap
Provides knowledge logging services to files and terminals.
Definition: GlobalLogger.h:12
std::string & dds_topicify(std::string &input)
Changes periods to underscores in compliance with OpenSplice needs.
Definition: Utility.inl:76
int64_t get_time(void)
Returns a time of day in nanoseconds If simtime feature is enabled, this may be simulation time inste...
Definition: Utility.inl:265
Copyright(c) 2020 Galois.