MADARA  3.2.3
SpliceDDSTransport.cpp
Go to the documentation of this file.
4 #include <time.h>
5 
6 #include <iostream>
7 #include <sstream>
9 
10 namespace logger = madara::logger;
11 
13  "MADARA_KaRL_Data",
14  "MADARA_KaRL_Control"
15 };
16 
17 /* Array to hold the names for all ReturnCodes. */
19  "DDS_RETCODE_OK",
20  "DDS_RETCODE_ERROR",
21  "DDS_RETCODE_UNSUPPORTED",
22  "DDS_RETCODE_BAD_PARAMETER",
23  "DDS_RETCODE_PRECONDITION_NOT_MET",
24  "DDS_RETCODE_OUT_OF_RESOURCES",
25  "DDS_RETCODE_NOT_ENABLED",
26  "DDS_RETCODE_IMMUTABLE_POLICY",
27  "DDS_RETCODE_INCONSISTENT_POLICY",
28  "DDS_RETCODE_ALREADY_DELETED",
29  "DDS_RETCODE_TIMEOUT",
30  "DDS_RETCODE_NO_DATA",
31  "DDS_RETCODE_ILLEGAL_OPERATION" };
32 
33 const char * madara::transport::SpliceDDSTransport::partition_ = "Madara_knowledge";
34 
36  const std::string & id,
38  TransportSettings & config, bool launch_transport)
39  : madara::transport::Base (id, config, context),
40  domain_ (0), domain_factory_ (0),
41  domain_participant_ (0), publisher_ (0), subscriber_ (0),
42  datawriter_ (0), datareader_ (0),
43  update_writer_ (0), update_reader_ (0),
44  update_topic_ (0)
45 {
46  // create a reference to the knowledge base for threading
47  knowledge_.use (context);
48 
49  // set the data plane for the read threads
51 
52  if (launch_transport)
53  setup ();
54 }
56 {
57  close ();
58 }
59 
60 void
62 {
63  this->invalidate_transport ();
64 
66 
68 
69  //if (subscriber_.in ())
70  //{
71  // subscriber_->delete_datareader (update_reader_);
72  //}
73 
74  //if (publisher_.in ())
75  //{
76  // publisher_->delete_datawriter (update_writer_);
77  // publisher_->delete_datawriter (latency_update_writer_);
78  //}
79 
80 
81  //if (domain_participant_.in ())
82  //{
83  // domain_participant_->delete_subscriber (subscriber_);
84  // domain_participant_->delete_publisher (publisher_);
85  // domain_participant_->delete_topic (update_topic_);
86  //}
87 
88  //if (domain_factory_.in ())
89  // domain_factory_->delete_participant (domain_participant_);
90 
91  update_reader_ = 0;
92  update_writer_ = 0;
93  update_topic_ = 0;
94  subscriber_ = 0;
95  publisher_ = 0;
97  domain_factory_ = 0;
98 
99  this->shutting_down_ = false;
100 }
101 
102 int
104 {
105  return this->settings_.reliability;
106 }
107 
108 int
110 {
111  return this->settings_.reliability = setting;
112 }
113 
114 int
116 {
117  Base::setup ();
118  DDS::ReturnCode_t status;
119 
120  this->is_valid_ = false;
121 
122  // reset the valid setup flag
123  //valid_setup_ = false;
124 
126  "SpliceDDSTransport::setup:" \
127  " Creating a participant for topic (%s)\n",
129 
131  "SpliceDDSTransport::setup:" \
132  " Participant settings are being read from the OSPL_URI environment"
133  " variable\n",
135 
136  // get the domain participant factory
137  domain_factory_ = DDS::DomainParticipantFactory::get_instance ();
138  domain_factory_->get_default_participant_qos (part_qos_);
139  domain_participant_ = domain_factory_->create_participant (
140  domain_,
141  part_qos_, NULL, DDS::STATUS_MASK_NONE);
142 
143  // if dp == NULL, we've got an error
144  if (domain_participant_ == NULL)
145  {
147  "\nSpliceDDSTransport::setup:" \
148  " splice daemon not running. Try 'ospl start'...\n");
149 
150  return -2;
151  }
152 
153  domain_participant_->get_default_topic_qos(topic_qos_);
154 
156  {
157  topic_qos_.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
158  topic_qos_.history.depth = this->settings_.queue_length;
159  topic_qos_.resource_limits.max_samples_per_instance =
160  this->settings_.queue_length;
161  topic_qos_.resource_limits.max_samples = this->settings_.queue_length;
162  topic_qos_.destination_order.kind =
163  DDS::BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
164  //topic_qos_.
165  }
166  //topic_qos_.resource_limits.max_samples_per_instance= 10;
167  domain_participant_->set_default_topic_qos(topic_qos_);
168 
169 
171  "SpliceDDSTransport::setup:" \
172  " Registering type support\n");
173 
174  // Register Update type
175  status = this->update_type_support_.register_type (
176  domain_participant_, "Knowledge::Update");
177  if (int ret = check_status(status, "Knowledge::UpdateTypeSupport::register_type") < 0)
178  return ret;
179 
180  // Register Mutex type
181  //status = this->mutex_type_support_.register_type (
182  // domain_participant_, "Knowledge::Mutex");
183  //check_status(status, "Knowledge::MutexTypeSupport::register_type");
184 
186  "SpliceDDSTransport::setup:" \
187  " Setting up knowledge domain via topic (%s)\n",
189 
190  // Create Update topic
191  update_topic_ = domain_participant_->create_topic (
193  "Knowledge::Update",
194  topic_qos_, NULL, DDS::STATUS_MASK_NONE);
195 
196  if (int ret = check_handle(update_topic_,
197  "DDS::DomainParticipant::create_topic (KnowledgeUpdate)") < 0)
198  return ret;
199 
200 
201  // Get default qos for publisher
202  status = domain_participant_->get_default_publisher_qos (pub_qos_);
203  if (int ret = check_status(status, "DDS::DomainParticipant::get_default_publisher_qos") < 0)
204  return ret;
205 
206 
208  {
209  pub_qos_.presentation.access_scope = DDS::TOPIC_PRESENTATION_QOS;
210  pub_qos_.presentation.coherent_access = true;
211  pub_qos_.presentation.ordered_access = false;
212  //topic_qos_.
213  }
214 
216  "SpliceDDSTransport::setup:" \
217  " Creating publisher for topic (%s)\n",
219 
220  // Create publisher
221  pub_qos_.partition.name.length (1);
222  pub_qos_.partition.name[0] = DDS::string_dup (partition_);
223  publisher_ = domain_participant_->create_publisher (
224  pub_qos_, NULL, DDS::STATUS_MASK_NONE);
225  if (int ret = check_handle(publisher_, "DDS::DomainParticipant::create_publisher") < 0)
226  return ret;
227 
228  // Create subscriber
229  status = domain_participant_->get_default_subscriber_qos (sub_qos_);
230  if (int ret = check_status(status, "DDS::DomainParticipant::get_default_subscriber_qos") < 0)
231  return ret;
232 
233 
235  {
236  sub_qos_.presentation.access_scope = DDS::TOPIC_PRESENTATION_QOS;
237  sub_qos_.presentation.coherent_access = true;
238  sub_qos_.presentation.ordered_access = false;
239  }
240 
242  "SpliceDDSTransport::setup:" \
243  " Creating subscriber for topic (%s)\n",
245 
246  sub_qos_.partition.name.length (1);
247  sub_qos_.partition.name[0] = DDS::string_dup (partition_);
248  subscriber_ = domain_participant_->create_subscriber (
249 // sub_qos_, &sub_listener_, DDS::DATA_AVAILABLE_STATUS | DDS::DATA_ON_READERS_STATUS);
250  sub_qos_, NULL, DDS::STATUS_MASK_NONE);
251  if (int ret = check_handle(subscriber_, "DDS::DomainParticipant::create_subscriber") < 0)
252  return ret;
253 
254  if (!subscriber_ || !publisher_)
255  {
257  "SpliceDDSTransport::setup:" \
258  " pub or sub could not be created. Try 'ospl stop; ospl start'...\n");
259 
260  return -2;
261  }
262 
263  // Create datawriter
264  publisher_->get_default_datawriter_qos (datawriter_qos_);
265  publisher_->copy_from_topic_qos(datawriter_qos_, topic_qos_);
266 
268  {
270  "SpliceDDSTransport::setup:" \
271  " Enabling reliable transport for (%s) datawriters\n",
273 
274  datawriter_qos_.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
275  datawriter_qos_.history.depth = this->settings_.queue_length;
276  datawriter_qos_.resource_limits.max_samples = this->settings_.queue_length;
277  datawriter_qos_.resource_limits.max_samples_per_instance =
278  this->settings_.queue_length;
279  datawriter_qos_.destination_order.kind =
280  DDS::BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
281  }
282  else
283  {
285  "SpliceDDSTransport::setup:" \
286  " Enabling unreliable transport for (%s) datawriters\n",
288  }
289 
291  "SpliceDDSTransport::setup:" \
292  " Creating datawriter for topic (%s)\n",
294 
295  // Create Update writer
296  datawriter_ = publisher_->create_datawriter (update_topic_,
297  datawriter_qos_, NULL, DDS::STATUS_MASK_NONE);
298  if (int ret = check_handle(datawriter_, "DDS::Publisher::create_datawriter (Update)") < 0)
299  return ret;
300  update_writer_ = dynamic_cast<Knowledge::UpdateDataWriter_ptr> (datawriter_.in ());
301  if (int ret = check_handle(update_writer_, "Knowledge::UpdateDataWriter_ptr::narrow") < 0)
302  return ret;
303 
304  // Create Latency Update writer for Read Thread
305  latencywriter_ = publisher_->create_datawriter (update_topic_,
306  datawriter_qos_, NULL, DDS::STATUS_MASK_NONE);
307  if (int ret = check_handle(latencywriter_, "DDS::Publisher::create_datawriter (Update)") < 0)
308  return ret;
309  latency_update_writer_ = dynamic_cast<Knowledge::UpdateDataWriter_ptr> (latencywriter_.in ());
310  if (int ret = check_handle(latency_update_writer_, "Knowledge::UpdateDataWriter_ptr::narrow") < 0)
311  return ret;
312 
313 
314  // Create datareader
315  status = subscriber_->get_default_datareader_qos (datareader_qos_);
316  subscriber_->copy_from_topic_qos (datareader_qos_, topic_qos_);
317  //publisher_->copy_from_topic_qos(datawriter_qos_, topic_qos_);
318  if (int ret = check_status(status, "DDS::Subscriber::get_default_datareader_qos") < 0)
319  return ret;
320 
321  datareader_qos_.reader_data_lifecycle.enable_invalid_samples = FALSE;
322 
324  {
326  "SpliceDDSTransport::setup:" \
327  " Enabling reliable transport for (%s) datareaders\n",
329 
330  datareader_qos_.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
331  datareader_qos_.history.depth = this->settings_.queue_length;
332  datareader_qos_.resource_limits.max_samples = this->settings_.queue_length;
333  datareader_qos_.destination_order.kind =
334  DDS::BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
335 
336  // unlike the other qos, we do not set max_samples_per_instance here.
337  // that shouldn't be as necessary, since we are using take on the reader
338  }
339  else
340  {
342  "SpliceDDSTransport::setup:" \
343  " Enabling unreliable transport for (%s) datareaders\n",
345  }
346 
348  "SpliceDDSTransport::setup:" \
349  " Creating datareader for topic (%s)\n",
351 
352  // Create Update datareader
353  datareader_ = subscriber_->create_datareader (update_topic_,
354  //datareader_qos_, &dr_listener_, DDS::STATUS_MASK_NONE);
355  datareader_qos_, NULL, DDS::STATUS_MASK_NONE);
356 
357  // notes: we set the mask to none because the listener will be called
358  // by the subscriber listener with notify_datareaders. This is the Splice
359  // way of doing this, since we require subscription information and they
360  // have so far not implemented on_subscription_matched.
361 
362  if (int ret = check_handle(datareader_, "DDS::Subscriber::create_datareader (Update)") < 0)
363  return ret;
364  update_reader_ = dynamic_cast<Knowledge::UpdateDataReader_ptr>(datareader_.in ());
365  if (int ret = check_handle(update_reader_, "Knowledge::UpdateDataReader_ptr::narrow") < 0)
366  return ret;
367 
368  if (!settings_.no_receiving)
369  {
370  double hertz = settings_.read_thread_hertz;
371  if (hertz < 0.0)
372  {
373  hertz = 0.0;
374  }
375 
377  "UdpTransportReadThread::setup:" \
378  " starting %d threads at %f hertz\n", settings_.read_threads,
379  hertz);
380 
381  for (uint32_t i = 0; i < settings_.read_threads; ++i)
382  {
383  std::stringstream thread_name;
384  thread_name << "read";
385  thread_name << i;
386 
387  read_threads_.run (hertz, thread_name.str (),
391  }
392  }
393 
394  return this->validate_transport ();
395 }
396 
397 long
399  const knowledge::VariableReferenceMap & updates)
400 {
401  long result = 0;
402 
403  if (!settings_.no_sending)
404  {
405  result = prep_send (updates, "SpliceDDSTransport::send_data:");
406 
407  // get the maximum quality from the updates
408  uint32_t quality = knowledge::max_quality (updates);
409 
411  unsigned long long cur_clock = context_.get_clock ();
412 
413  DDS::ReturnCode_t dds_result;
414  DDS::InstanceHandle_t handle;
415 
416  Knowledge::Update data;
417 
418  data.buffer = Knowledge::seq_oct (result, result, (unsigned char *)buffer_.get_ptr ());
419  data.clock = cur_clock;
420  data.quality = quality;
421  data.updates = DDS::ULong (updates.size ());
422  data.originator = DDS::string_dup (id_.c_str ());
423  data.type = madara::transport::MULTIASSIGN;
424  data.ttl = settings_.get_rebroadcast_ttl ();
425  data.timestamp = utility::get_time ();
426  data.madara_id = DDS::string_dup (MADARA_IDENTIFIER);
427 
429  "SpliceDDSTransport::send:" \
430  " sending multiassignment: %d updates, time=llu, quality=%d\n",
431  data.updates, cur_clock, quality);
432 
433  handle = update_writer_->register_instance (data);
434  dds_result = update_writer_->write (data, handle);
435  result = (long)dds_result;
436  //update_writer_->unregister_instance (data, handle);
437  }
438 
439  return result;
440 }
441 
442 int
444  const char * info)
445 {
446  if (!handle)
447  {
449  "SpliceDDSTransport::check_handle:" \
450  " error in %s: Creation failed: invalid handle\n", info);
451 
452  return -2;
453  }
454 
455  return 0;
456 }
457 
458 int
460  const char * info)
461 {
462  // if the status is okay, then return without issue
463  if ((status == DDS::RETCODE_OK) || (status == DDS::RETCODE_NO_DATA))
464  return 0;
465 
467  "SpliceDDSTransport::check_status:" \
468  " error in %s: Creation failed: %s\n",
469  info, get_error_name (status));
470 
471  return -2;
472 }
473 
474 const char *
476 {
477  return ret_code_names[status];
478 }
uint32_t max_quality(const KnowledgeRecords &records)
Returns the maximum quality within the records.
volatile bool is_valid_
Definition: Transport.h:126
QoSTransportSettings settings_
Definition: Transport.h:133
#define MADARA_IDENTIFIER
Definition: MessageHeader.h:22
int reliability(void) const
Accesses reliability setting.
void invalidate_transport(void)
Invalidates a transport to indicate it is shutting down.
Definition: Transport.inl:33
int validate_transport(void)
Validates a transport to indicate it is not shutting down.
Definition: Transport.inl:6
int64_t get_time(void)
Returns a time of day in nanoseconds If simtime feature is enabled, this may be simulation time inste...
Definition: Utility.inl:253
int check_status(DDS::ReturnCode_t status, const char *info)
Splice status checker.
This class stores variables and their values for use by any entity needing state information in a thr...
DDS::DomainParticipant_var domain_participant_
Knowledge::UpdateDataReader_var update_reader_
unsigned char get_rebroadcast_ttl(void) const
Gets the time to live for rebroadcasting (number of rebroadcasts per message).
double read_thread_hertz
number of valid messages allowed to be received per second.
Provides knowledge logging services to files and terminals.
Definition: GlobalLogger.h:11
Holds basic transport settings.
Knowledge::UpdateDataWriter_var update_writer_
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
DDS::DomainParticipantFactory_var domain_factory_
BandwidthMonitor receive_monitor_
monitor for receiving bandwidth usage
Definition: Transport.h:148
void run(const std::string name, BaseThread *thread, bool paused=false)
Starts a new thread and executes the provided user thread once.
Definition: Threader.cpp:71
long send_data(const knowledge::VariableReferenceMap &updates) override
Sends a list of knowledge updates to listeners.
static struct madara::knowledge::tags::string_t string
threads::Threader read_threads_
threads for reading knowledge updates
T * get_ptr(void)
get the underlying pointer
Definition: ScopedArray.inl:68
bool no_receiving
if true, never receive over transport
SpliceDDSTransport(const std::string &id, knowledge::ThreadSafeContext &context, TransportSettings &config, bool launch_transport)
Constructor.
std::string write_domain
All class members are accessible to users for easy setup.
volatile bool shutting_down_
Definition: Transport.h:127
knowledge::KnowledgeBase knowledge_
knowledge base for threads to use
bool no_sending
if true, never send over transport
Knowledge::UpdateDataWriter_var latency_update_writer_
std::string & dds_topicify(std::string &input)
Changes periods to underscores in compliance with OpenSplice needs.
Definition: Utility.inl:72
Knowledge::UpdateTypeSupport update_type_support_
int setup(void)
Activates this transport.
Thread for reading knowledge updates via waitsets.
const std::string id_
host:port identifier of this process
Definition: Transport.h:131
uint32_t read_threads
the number of read threads to start
void close(void)
Closes this transport.
std::map< const char *, VariableReference, VariableReferenceMapCompare > VariableReferenceMap
a map of variable references
uint32_t queue_length
Length of the buffer used to store history of events.
void terminate(const std::string name)
Requests a specific thread to terminate.
Definition: Threader.cpp:180
uint64_t get_clock(void) const
Atomically gets the Lamport clock.
void set_data_plane(knowledge::KnowledgeBase data_plane)
Sets the data plane for new threads.
Definition: Threader.cpp:173
PacketScheduler packet_scheduler_
scheduler for dropping packets to simulate network issues
Definition: Transport.h:151
uint32_t reliability
Reliability required of the transport.
Copyright (c) 2015 Carnegie Mellon University.
madara::utility::ScopedArray< char > buffer_
buffer for sending
Definition: Transport.h:154
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
const char * get_error_name(DDS::ReturnCode_t status)
Returns error name of the specific status.
virtual int setup(void)
all subclasses should call this method at the end of its setup
Definition: Transport.cpp:30
Base class from which all transports must be derived.
Definition: Transport.h:45
long prep_send(const knowledge::VariableReferenceMap &orig_updates, const char *print_prefix)
Preps a message for sending.
Definition: Transport.cpp:812
bool wait(const std::string name, const knowledge::WaitSettings &ws=knowledge::WaitSettings())
Wait for a specific thread to complete.
Definition: Threader.cpp:201
BandwidthMonitor send_monitor_
monitor for sending bandwidth usage
Definition: Transport.h:145
madara::knowledge::ThreadSafeContext & context_
Definition: Transport.h:136
int check_handle(void *handle, const char *info)
Splice handle checker.
void use(ThreadSafeContext &original)
Refer to and use another knowledge base&#39;s context.