MADARA  3.4.1
NddsTransport.cpp
Go to the documentation of this file.
1 #ifdef _MADARA_NDDS_
2 
4 //#include "madara/transport/ndds/NddsTransportReadThread.h"
8 
9 namespace logger = madara::logger;
10 
11 #include <iostream>
12 #include <sstream>
13 
14 const char* madara::transport::NddsTransport::topic_names_[] = {
15  "MADARA_KaRL_Data", "MADARA_KaRL_Control"};
16 
17 const char* madara::transport::NddsTransport::partition_ = "Madara_knowledge";
18 
19 madara::transport::NddsTransport::NddsTransport(const std::string& id,
20  knowledge::ThreadSafeContext& context, TransportSettings& config,
21  bool launch_transport)
22  : madara::transport::Base(id, config, context),
23  domain_participant_(0),
24  update_topic_(0),
25  update_writer_(0),
26  listener_(0)
27 {
28  if (launch_transport)
29  setup();
30 }
31 
32 madara::transport::NddsTransport::~NddsTransport()
33 {
34  close();
35 }
36 
37 void madara::transport::NddsTransport::close(void)
38 {
39  DDS_ReturnCode_t rc;
40  this->invalidate_transport();
41 
42  if (subscriber_)
43  {
44  subscriber_->delete_datareader(data_reader_);
45  }
46 
47  if (publisher_)
48  {
49  publisher_->delete_datawriter(data_writer_);
50  }
51 
52  if (domain_participant_)
53  {
54  domain_participant_->delete_subscriber(subscriber_);
55  domain_participant_->delete_publisher(publisher_);
56  domain_participant_->delete_topic(update_topic_);
57  }
58 
59  if (domain_participant_)
60  {
61  /* Perform a clean shutdown of the participant and all the contained
62  * entities
63  */
64  rc = domain_participant_->delete_contained_entities();
65  if (rc != DDS_RETCODE_OK)
66  {
67  context_.get_logger().log(logger::LOG_MINOR,
68  "NddsTransport::close: unable to delete participant entities\n");
69  }
70 
71  rc = DDSDomainParticipantFactory::get_instance()->delete_participant(
72  domain_participant_);
73  if (rc != DDS_RETCODE_OK)
74  {
75  context_.get_logger().log(logger::LOG_MINOR,
76  "NddsTransport::close: unable to delete participant\n");
77  }
78  }
79 
80  domain_participant_ = 0;
81  subscriber_ = 0;
82  publisher_ = 0;
83  update_writer_ = 0;
84  update_topic_ = 0;
85 
86  this->shutting_down_ = false;
87 }
88 
89 int madara::transport::NddsTransport::reliability(void) const
90 {
91  return this->settings_.reliability;
92 }
93 
94 int madara::transport::NddsTransport::reliability(const int& setting)
95 {
96  return this->settings_.reliability = setting;
97 }
98 
99 int madara::transport::NddsTransport::setup(void)
100 {
101  Base::setup();
102 
103  DDS_ReturnCode_t rc;
104  DDS_DomainId_t domain = 0;
105  this->is_valid_ = false;
106 
107  std::stringstream domainreader;
108  domainreader << this->settings_.write_domain;
109  domainreader >> domain;
110 
111  // create the domain participant
112 
113  domain_participant_ =
114  DDSDomainParticipantFactory::get_instance()->create_participant(domain,
115  DDS_PARTICIPANT_QOS_DEFAULT, NULL, /* Listener */
116  DDS_STATUS_MASK_NONE);
117 
118  if (domain_participant_ == NULL)
119  {
120  context_.get_logger().log(logger::LOG_ERROR,
121  "NddsTransport::setup:"
122  " Unable to start the NDDS transport. Exiting...\n");
123 
124  return -2;
125  }
126 
127  DDS_TopicQos topic_qos;
128  domain_participant_->get_default_topic_qos(topic_qos);
129 
130  if (madara::transport::RELIABLE == this->settings_.reliability)
131  {
132  topic_qos.reliability.kind = DDS_RELIABLE_RELIABILITY_QOS;
133  topic_qos.history.depth = this->settings_.queue_length;
134  topic_qos.resource_limits.max_samples_per_instance =
135  this->settings_.queue_length;
136  topic_qos.resource_limits.max_samples = this->settings_.queue_length;
137  topic_qos.destination_order.kind =
138  DDS_BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
139  }
140  // topic_qos_.resource_limits.max_samples_per_instance= 10;
141  domain_participant_->set_default_topic_qos(topic_qos);
142 
143  // register the Knowledge Update Type
144  rc = Ndds_Knowledge_UpdateTypeSupport::register_type(
145  domain_participant_, Ndds_Knowledge_UpdateTypeSupport::get_type_name());
146 
147  if (rc != DDS_RETCODE_OK)
148  {
149  context_.get_logger().log(logger::LOG_ERROR,
150  "NddsTransport::setup:"
151  " Unable to register the knowledge update data type. Exiting...\n");
152 
153  return -2;
154  }
155 
156  // create the knowledge topic
157  update_topic_ = domain_participant_->create_topic(topic_names_[0],
158  Ndds_Knowledge_UpdateTypeSupport::get_type_name(), topic_qos,
159  NULL, /* listener */
160  DDS_STATUS_MASK_NONE);
161  if (update_topic_ == 0)
162  {
163  context_.get_logger().log(logger::LOG_ERROR,
164  "NddsTransport::setup:"
165  " Unable to create topic. Exiting...\n");
166 
167  return -2;
168  }
169 
170  DDS_PublisherQos pub_qos;
171 
172  domain_participant_->get_default_publisher_qos(pub_qos);
173 
174  if (madara::transport::RELIABLE == this->settings_.reliability)
175  {
176  pub_qos.presentation.access_scope = DDS_TOPIC_PRESENTATION_QOS;
177  pub_qos.presentation.coherent_access = true;
178  pub_qos.presentation.ordered_access = false;
179  }
180 
181  // create the publisher_
182  publisher_ =
183  domain_participant_->create_publisher(pub_qos, NULL, /* listener */
184  DDS_STATUS_MASK_NONE);
185  if (publisher_ == 0)
186  {
187  context_.get_logger().log(logger::LOG_ERROR,
188  "NddsTransport::setup:"
189  " Unable to create publisher_. Exiting...\n");
190  return -2;
191  }
192 
193  DDS_DataWriterQos datawriter_qos;
194  publisher_->get_default_datawriter_qos(datawriter_qos);
195  publisher_->copy_from_topic_qos(datawriter_qos, topic_qos);
196 
197  // create a topic data writer
198  data_writer_ = publisher_->create_datawriter(update_topic_, datawriter_qos,
199  NULL, /* listener */
200  DDS_STATUS_MASK_NONE);
201  if (data_writer_ == 0)
202  {
203  context_.get_logger().log(logger::LOG_ERROR,
204  "NddsTransport::setup:"
205  " Unable to create topic data writer. Exiting...\n");
206 
207  return -2;
208  }
209 
210  // create the specialized data writer for our data type
211  update_writer_ = Ndds_Knowledge_UpdateDataWriter::narrow(data_writer_);
212  if (update_writer_ == 0)
213  {
214  context_.get_logger().log(logger::LOG_ERROR,
215  "NddsTransport::setup:"
216  " Unable to create narrowed data writer. Exiting...\n");
217 
218  return -2;
219  }
220 
221  DDS_SubscriberQos sub_qos;
222 
223  if (madara::transport::RELIABLE == this->settings_.reliability)
224  {
225  sub_qos.presentation.access_scope = DDS_TOPIC_PRESENTATION_QOS;
226  sub_qos.presentation.coherent_access = true;
227  sub_qos.presentation.ordered_access = false;
228  }
229 
230  // create a subscriber_
231  subscriber_ =
232  domain_participant_->create_subscriber(sub_qos, NULL, /* listener */
233  DDS_STATUS_MASK_NONE);
234  if (subscriber_ == 0)
235  {
236  context_.get_logger().log(logger::LOG_ERROR,
237  "NddsTransport::setup:"
238  " Unable to create subscriber_. Exiting...\n");
239 
240  return -2;
241  }
242 
243  DDS_DataReaderQos datareader_qos;
244  subscriber_->get_default_datareader_qos(datareader_qos);
245  subscriber_->copy_from_topic_qos(datareader_qos, topic_qos);
246 
247  if (madara::transport::RELIABLE == this->settings_.reliability)
248  {
249  datareader_qos.reliability.kind = DDS_RELIABLE_RELIABILITY_QOS;
250  datareader_qos.history.depth = this->settings_.queue_length;
251  datareader_qos.resource_limits.max_samples = this->settings_.queue_length;
252  datareader_qos.destination_order.kind =
253  DDS_BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
254  }
255 
256  listener_ = new NddsListener(settings_, id_, context_, send_monitor_,
257  receive_monitor_, packet_scheduler_);
258 
259  // create a reader for the topic
260  data_reader_ = subscriber_->create_datareader(
261  update_topic_, datareader_qos, listener_, DDS_STATUS_MASK_ALL);
262  if (data_reader_ == 0)
263  {
264  context_.get_logger().log(logger::LOG_ERROR,
265  "NddsTransport::setup:"
266  " Unable to create reader. Leaving thread...\n");
267 
268  return -2;
269  }
270 
271  this->validate_transport();
272 
273  return 0;
274 }
275 
276 long madara::transport::NddsTransport::send_data(
277  const knowledge::KnowledgeMap& updates)
278 {
279  long result = prep_send(updates, "NddsTransport::send_data:");
280 
281  // get the maximum quality from the updates
282  uint32_t quality = knowledge::max_quality(updates);
283 
285  unsigned long long cur_clock = context_.get_clock();
286 
287  DDS_ReturnCode_t rc;
288 
289  std::stringstream buffer;
290  Ndds_Knowledge_Update data;
291 
292  Ndds_Knowledge_Update_initialize(&data);
293 
294  data.clock = cur_clock;
295  data.quality = quality;
296 
297  data.buffer.ensure_length(result, result);
298  data.buffer.from_array((DDS_Octet*)buffer_.get_ptr(), result);
299  data.clock = cur_clock;
300  data.quality = quality;
301  data.updates = DDS_UnsignedLong(updates.size());
302  data.originator = new char[id_.size() + 1];
303  utility::strncpy_safe(data.originator, id_.c_str(), id_.size() + 1);
304  data.type = madara::transport::MULTIASSIGN;
305  data.ttl = settings_.get_rebroadcast_ttl();
306  data.timestamp = utility::get_time();
307  data.madara_id = new char[strlen(MADARA_IDENTIFIER) + 1];
308  utility::strncpy_safe(data.madara_id, MADARA_IDENTIFIER, strlen(MADARA_IDENTIFIER) + 1);
309 
310  context_.get_logger().log(logger::LOG_MAJOR,
311  "NddsTransport::send:"
312  " sending multiassignment: %d updates, time=%llu, quality=%d\n",
313  data.updates, cur_clock, quality);
314 
315  DDS_InstanceHandle_t handle = update_writer_->register_instance(data);
316  rc = update_writer_->write(data, handle);
317 
318  Ndds_Knowledge_Update_finalize(&data);
319 
320  return rc;
321 }
322 
323 #endif // _MADARA_NDDS_
324 
#define MADARA_IDENTIFIER
Definition: MessageHeader.h:22
const ThreadSafeContext * context_
This class stores variables and their values for use by any entity needing state information in a thr...
constexpr string_t string
uint32_t max_quality(const KnowledgeRecords &records)
Returns the maximum quality within the records.
::std::map< std::string, KnowledgeRecord > KnowledgeMap
Provides knowledge logging services to files and terminals.
Definition: GlobalLogger.h:12
int64_t get_time(void)
Returns a time of day in nanoseconds If simtime feature is enabled, this may be simulation time inste...
Definition: Utility.inl:265
MADARA_EXPORT void strncpy_safe(char *dst, const char *src, size_t dst_size)
Performs a strncpy in a way that will compile without warnings.
Definition: Utility.cpp:376
Copyright(c) 2020 Galois.