14 const char* madara::transport::NddsTransport::topic_names_[] = {
15 "MADARA_KaRL_Data",
"MADARA_KaRL_Control"};
17 const char* madara::transport::NddsTransport::partition_ =
"Madara_knowledge";
19 madara::transport::NddsTransport::NddsTransport(
const std::string&
id,
21 bool launch_transport)
22 :
madara::transport::Base(id, config, context),
23 domain_participant_(0),
32 madara::transport::NddsTransport::~NddsTransport()
37 void madara::transport::NddsTransport::close(
void)
40 this->invalidate_transport();
44 subscriber_->delete_datareader(data_reader_);
49 publisher_->delete_datawriter(data_writer_);
52 if (domain_participant_)
54 domain_participant_->delete_subscriber(subscriber_);
55 domain_participant_->delete_publisher(publisher_);
56 domain_participant_->delete_topic(update_topic_);
59 if (domain_participant_)
64 rc = domain_participant_->delete_contained_entities();
65 if (rc != DDS_RETCODE_OK)
68 "NddsTransport::close: unable to delete participant entities\n");
71 rc = DDSDomainParticipantFactory::get_instance()->delete_participant(
73 if (rc != DDS_RETCODE_OK)
76 "NddsTransport::close: unable to delete participant\n");
80 domain_participant_ = 0;
86 this->shutting_down_ =
false;
89 int madara::transport::NddsTransport::reliability(
void)
const
91 return this->settings_.reliability;
94 int madara::transport::NddsTransport::reliability(
const int& setting)
96 return this->settings_.reliability = setting;
99 int madara::transport::NddsTransport::setup(
void)
104 DDS_DomainId_t domain = 0;
105 this->is_valid_ =
false;
107 std::stringstream domainreader;
108 domainreader << this->settings_.write_domain;
109 domainreader >> domain;
113 domain_participant_ =
114 DDSDomainParticipantFactory::get_instance()->create_participant(domain,
115 DDS_PARTICIPANT_QOS_DEFAULT, NULL,
116 DDS_STATUS_MASK_NONE);
118 if (domain_participant_ == NULL)
121 "NddsTransport::setup:"
122 " Unable to start the NDDS transport. Exiting...\n");
127 DDS_TopicQos topic_qos;
128 domain_participant_->get_default_topic_qos(topic_qos);
132 topic_qos.reliability.kind = DDS_RELIABLE_RELIABILITY_QOS;
133 topic_qos.history.depth = this->settings_.queue_length;
134 topic_qos.resource_limits.max_samples_per_instance =
135 this->settings_.queue_length;
136 topic_qos.resource_limits.max_samples = this->settings_.queue_length;
137 topic_qos.destination_order.kind =
138 DDS_BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
141 domain_participant_->set_default_topic_qos(topic_qos);
144 rc = Ndds_Knowledge_UpdateTypeSupport::register_type(
145 domain_participant_, Ndds_Knowledge_UpdateTypeSupport::get_type_name());
147 if (rc != DDS_RETCODE_OK)
150 "NddsTransport::setup:"
151 " Unable to register the knowledge update data type. Exiting...\n");
157 update_topic_ = domain_participant_->create_topic(topic_names_[0],
158 Ndds_Knowledge_UpdateTypeSupport::get_type_name(), topic_qos,
160 DDS_STATUS_MASK_NONE);
161 if (update_topic_ == 0)
164 "NddsTransport::setup:"
165 " Unable to create topic. Exiting...\n");
170 DDS_PublisherQos pub_qos;
172 domain_participant_->get_default_publisher_qos(pub_qos);
176 pub_qos.presentation.access_scope = DDS_TOPIC_PRESENTATION_QOS;
177 pub_qos.presentation.coherent_access =
true;
178 pub_qos.presentation.ordered_access =
false;
183 domain_participant_->create_publisher(pub_qos, NULL,
184 DDS_STATUS_MASK_NONE);
188 "NddsTransport::setup:"
189 " Unable to create publisher_. Exiting...\n");
193 DDS_DataWriterQos datawriter_qos;
194 publisher_->get_default_datawriter_qos(datawriter_qos);
195 publisher_->copy_from_topic_qos(datawriter_qos, topic_qos);
198 data_writer_ = publisher_->create_datawriter(update_topic_, datawriter_qos,
200 DDS_STATUS_MASK_NONE);
201 if (data_writer_ == 0)
204 "NddsTransport::setup:"
205 " Unable to create topic data writer. Exiting...\n");
211 update_writer_ = Ndds_Knowledge_UpdateDataWriter::narrow(data_writer_);
212 if (update_writer_ == 0)
215 "NddsTransport::setup:"
216 " Unable to create narrowed data writer. Exiting...\n");
221 DDS_SubscriberQos sub_qos;
225 sub_qos.presentation.access_scope = DDS_TOPIC_PRESENTATION_QOS;
226 sub_qos.presentation.coherent_access =
true;
227 sub_qos.presentation.ordered_access =
false;
232 domain_participant_->create_subscriber(sub_qos, NULL,
233 DDS_STATUS_MASK_NONE);
234 if (subscriber_ == 0)
237 "NddsTransport::setup:"
238 " Unable to create subscriber_. Exiting...\n");
243 DDS_DataReaderQos datareader_qos;
244 subscriber_->get_default_datareader_qos(datareader_qos);
245 subscriber_->copy_from_topic_qos(datareader_qos, topic_qos);
249 datareader_qos.reliability.kind = DDS_RELIABLE_RELIABILITY_QOS;
250 datareader_qos.history.depth = this->settings_.queue_length;
251 datareader_qos.resource_limits.max_samples = this->settings_.queue_length;
252 datareader_qos.destination_order.kind =
253 DDS_BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
256 listener_ =
new NddsListener(settings_, id_,
context_, send_monitor_,
257 receive_monitor_, packet_scheduler_);
260 data_reader_ = subscriber_->create_datareader(
261 update_topic_, datareader_qos, listener_, DDS_STATUS_MASK_ALL);
262 if (data_reader_ == 0)
265 "NddsTransport::setup:"
266 " Unable to create reader. Leaving thread...\n");
271 this->validate_transport();
276 long madara::transport::NddsTransport::send_data(
279 long result = prep_send(updates,
"NddsTransport::send_data:");
285 unsigned long long cur_clock =
context_.get_clock();
289 std::stringstream buffer;
290 Ndds_Knowledge_Update data;
292 Ndds_Knowledge_Update_initialize(&data);
294 data.clock = cur_clock;
295 data.quality = quality;
297 data.buffer.ensure_length(result, result);
298 data.buffer.from_array((DDS_Octet*)buffer_.get_ptr(), result);
299 data.clock = cur_clock;
300 data.quality = quality;
301 data.updates = DDS_UnsignedLong(updates.size());
302 data.originator =
new char[id_.size() + 1];
305 data.ttl = settings_.get_rebroadcast_ttl();
311 "NddsTransport::send:"
312 " sending multiassignment: %d updates, time=%llu, quality=%d\n",
313 data.updates, cur_clock, quality);
315 DDS_InstanceHandle_t handle = update_writer_->register_instance(data);
316 rc = update_writer_->write(data, handle);
318 Ndds_Knowledge_Update_finalize(&data);
const ThreadSafeContext * context_
This class stores variables and their values for use by any entity needing state information in a thr...
uint32_t max_quality(const KnowledgeRecords &records)
Returns the maximum quality within the records.
::std::map< std::string, KnowledgeRecord > KnowledgeMap
Provides knowledge logging services to files and terminals.
int64_t get_time(void)
Returns a time of day in nanoseconds If simtime feature is enabled, this may be simulation time inste...
MADARA_EXPORT void strncpy_safe(char *dst, const char *src, size_t dst_size)
Performs a strncpy in a way that will compile without warnings.
Copyright(c) 2020 Galois.