13 "MADARA_KaRL_Data",
"MADARA_KaRL_Control"};
17 "DDS_RETCODE_OK",
"DDS_RETCODE_ERROR",
"DDS_RETCODE_UNSUPPORTED",
18 "DDS_RETCODE_BAD_PARAMETER",
"DDS_RETCODE_PRECONDITION_NOT_MET",
19 "DDS_RETCODE_OUT_OF_RESOURCES",
"DDS_RETCODE_NOT_ENABLED",
20 "DDS_RETCODE_IMMUTABLE_POLICY",
"DDS_RETCODE_INCONSISTENT_POLICY",
21 "DDS_RETCODE_ALREADY_DELETED",
"DDS_RETCODE_TIMEOUT",
"DDS_RETCODE_NO_DATA",
22 "DDS_RETCODE_ILLEGAL_OPERATION"};
29 bool launch_transport)
30 :
madara::transport::
Base(id, config, context),
33 domain_participant_(0),
58 this->invalidate_transport();
60 read_threads_.terminate();
90 domain_participant_ = 0;
93 this->shutting_down_ =
false;
98 return this->settings_.reliability;
103 return this->settings_.reliability = setting;
109 DDS::ReturnCode_t status;
111 this->is_valid_ =
false;
117 "SpliceDDSTransport::setup:"
118 " Creating a participant for topic (%s)\n",
122 "SpliceDDSTransport::setup:"
123 " Participant settings are being read from the OSPL_URI environment"
128 domain_factory_ = DDS::DomainParticipantFactory::get_instance();
129 domain_factory_->get_default_participant_qos(part_qos_);
130 domain_participant_ = domain_factory_->create_participant(
131 domain_, part_qos_, NULL, DDS::STATUS_MASK_NONE);
134 if (domain_participant_ == NULL)
137 "\nSpliceDDSTransport::setup:"
138 " splice daemon not running. Try 'ospl start'...\n");
143 domain_participant_->get_default_topic_qos(topic_qos_);
147 topic_qos_.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
148 topic_qos_.history.depth = this->settings_.queue_length;
149 topic_qos_.resource_limits.max_samples_per_instance =
150 this->settings_.queue_length;
151 topic_qos_.resource_limits.max_samples = this->settings_.queue_length;
152 topic_qos_.destination_order.kind =
153 DDS::BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
157 domain_participant_->set_default_topic_qos(topic_qos_);
160 "SpliceDDSTransport::setup:"
161 " Registering type support\n");
164 status = this->update_type_support_.register_type(
165 domain_participant_,
"Knowledge::Update");
166 if (
int ret = check_status(
167 status,
"Knowledge::UpdateTypeSupport::register_type") < 0)
176 "SpliceDDSTransport::setup:"
177 " Setting up knowledge domain via topic (%s)\n",
181 update_topic_ = domain_participant_->create_topic(
183 "Knowledge::Update", topic_qos_, NULL, DDS::STATUS_MASK_NONE);
186 check_handle(update_topic_,
187 "DDS::DomainParticipant::create_topic (KnowledgeUpdate)") < 0)
191 status = domain_participant_->get_default_publisher_qos(pub_qos_);
192 if (
int ret = check_status(status,
193 "DDS::DomainParticipant::get_default_publisher_qos") < 0)
198 pub_qos_.presentation.access_scope = DDS::TOPIC_PRESENTATION_QOS;
199 pub_qos_.presentation.coherent_access =
true;
200 pub_qos_.presentation.ordered_access =
false;
205 "SpliceDDSTransport::setup:"
206 " Creating publisher for topic (%s)\n",
210 pub_qos_.partition.name.length(1);
211 pub_qos_.partition.name[0] = DDS::string_dup(partition_);
212 publisher_ = domain_participant_->create_publisher(
213 pub_qos_, NULL, DDS::STATUS_MASK_NONE);
214 if (
int ret = check_handle(
215 publisher_,
"DDS::DomainParticipant::create_publisher") < 0)
219 status = domain_participant_->get_default_subscriber_qos(sub_qos_);
220 if (
int ret = check_status(status,
221 "DDS::DomainParticipant::get_default_subscriber_qos") < 0)
226 sub_qos_.presentation.access_scope = DDS::TOPIC_PRESENTATION_QOS;
227 sub_qos_.presentation.coherent_access =
true;
228 sub_qos_.presentation.ordered_access =
false;
232 "SpliceDDSTransport::setup:"
233 " Creating subscriber for topic (%s)\n",
236 sub_qos_.partition.name.length(1);
237 sub_qos_.partition.name[0] = DDS::string_dup(partition_);
238 subscriber_ = domain_participant_->create_subscriber(
241 sub_qos_, NULL, DDS::STATUS_MASK_NONE);
242 if (
int ret = check_handle(subscriber_,
243 "DDS::DomainParticipant::create_subscriber") < 0)
246 if (!subscriber_ || !publisher_)
249 "SpliceDDSTransport::setup:"
250 " pub or sub could not be created. Try 'ospl stop; ospl start'...\n");
256 publisher_->get_default_datawriter_qos(datawriter_qos_);
257 publisher_->copy_from_topic_qos(datawriter_qos_, topic_qos_);
262 "SpliceDDSTransport::setup:"
263 " Enabling reliable transport for (%s) datawriters\n",
266 datawriter_qos_.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
267 datawriter_qos_.history.depth = this->settings_.queue_length;
268 datawriter_qos_.resource_limits.max_samples = this->settings_.queue_length;
269 datawriter_qos_.resource_limits.max_samples_per_instance =
270 this->settings_.queue_length;
271 datawriter_qos_.destination_order.kind =
272 DDS::BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
277 "SpliceDDSTransport::setup:"
278 " Enabling unreliable transport for (%s) datawriters\n",
283 "SpliceDDSTransport::setup:"
284 " Creating datawriter for topic (%s)\n",
288 datawriter_ = publisher_->create_datawriter(
289 update_topic_, datawriter_qos_, NULL, DDS::STATUS_MASK_NONE);
290 if (
int ret = check_handle(datawriter_,
291 "DDS::Publisher::create_datawriter (Update)") < 0)
294 dynamic_cast<Knowledge::UpdateDataWriter_ptr
>(datawriter_.in());
295 if (
int ret = check_handle(update_writer_,
296 "Knowledge::UpdateDataWriter_ptr::narrow") < 0)
300 latencywriter_ = publisher_->create_datawriter(
301 update_topic_, datawriter_qos_, NULL, DDS::STATUS_MASK_NONE);
302 if (
int ret = check_handle(latencywriter_,
303 "DDS::Publisher::create_datawriter (Update)") < 0)
305 latency_update_writer_ =
306 dynamic_cast<Knowledge::UpdateDataWriter_ptr
>(latencywriter_.in());
307 if (
int ret = check_handle(latency_update_writer_,
308 "Knowledge::UpdateDataWriter_ptr::narrow") < 0)
312 status = subscriber_->get_default_datareader_qos(datareader_qos_);
313 subscriber_->copy_from_topic_qos(datareader_qos_, topic_qos_);
315 if (
int ret = check_status(
316 status,
"DDS::Subscriber::get_default_datareader_qos") < 0)
319 datareader_qos_.reader_data_lifecycle.enable_invalid_samples = FALSE;
324 "SpliceDDSTransport::setup:"
325 " Enabling reliable transport for (%s) datareaders\n",
328 datareader_qos_.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
329 datareader_qos_.history.depth = this->settings_.queue_length;
330 datareader_qos_.resource_limits.max_samples = this->settings_.queue_length;
331 datareader_qos_.destination_order.kind =
332 DDS::BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
340 "SpliceDDSTransport::setup:"
341 " Enabling unreliable transport for (%s) datareaders\n",
346 "SpliceDDSTransport::setup:"
347 " Creating datareader for topic (%s)\n",
351 datareader_ = subscriber_->create_datareader(update_topic_,
353 datareader_qos_, NULL, DDS::STATUS_MASK_NONE);
360 if (
int ret = check_handle(datareader_,
361 "DDS::Subscriber::create_datareader (Update)") < 0)
364 dynamic_cast<Knowledge::UpdateDataReader_ptr
>(datareader_.in());
365 if (
int ret = check_handle(update_reader_,
366 "Knowledge::UpdateDataReader_ptr::narrow") < 0)
369 if (!settings_.no_receiving)
371 double hertz = settings_.read_thread_hertz;
378 "UdpTransportReadThread::setup:"
379 " starting %d threads at %f hertz\n",
380 settings_.read_threads, hertz);
382 for (uint32_t i = 0; i < settings_.read_threads; ++i)
384 std::stringstream thread_name;
385 thread_name <<
"read";
388 read_threads_.run(hertz, thread_name.str(),
390 context_, update_reader_, latency_update_writer_, send_monitor_,
391 receive_monitor_, packet_scheduler_));
395 return this->validate_transport();
403 if (!settings_.no_sending)
405 result = prep_send(updates,
"SpliceDDSTransport::send_data:");
411 unsigned long long cur_clock =
context_.get_clock();
413 DDS::ReturnCode_t dds_result;
414 DDS::InstanceHandle_t handle;
416 Knowledge::Update data;
419 Knowledge::seq_oct(result, result, (
unsigned char*)buffer_.get_ptr());
420 data.clock = cur_clock;
421 data.quality = quality;
422 data.updates = DDS::ULong(updates.size());
423 data.originator = DDS::string_dup(id_.c_str());
425 data.ttl = settings_.get_rebroadcast_ttl();
430 "SpliceDDSTransport::send:"
431 " sending multiassignment: %d updates, time=llu, quality=%d\n",
432 data.updates, cur_clock, quality);
434 handle = update_writer_->register_instance(data);
435 dds_result = update_writer_->write(data, handle);
436 result = (long)dds_result;
444 void* handle,
const char* info)
449 "SpliceDDSTransport::check_handle:"
450 " error in %s: Creation failed: invalid handle\n",
460 DDS::ReturnCode_t status,
const char* info)
463 if ((status == DDS::RETCODE_OK) || (status == DDS::RETCODE_NO_DATA))
467 "SpliceDDSTransport::check_status:"
468 " error in %s: Creation failed: %s\n",
469 info, get_error_name(status));
475 DDS::ReturnCode_t status)
477 return ret_code_names[status];
#define madara_logger_log(loggering, level,...)
Fast version of the madara::logger::log method.
const ThreadSafeContext * context_
void use(ThreadSafeContext &original)
Refer to and use another knowledge base's context.
This class stores variables and their values for use by any entity needing state information in a thr...
void set_data_plane(knowledge::KnowledgeBase &data_plane)
Sets the data plane for new threads.
Base class from which all transports must be derived.
virtual int setup(void)
all subclasses should call this method at the end of its setup
SpliceDDSTransport(const std::string &id, knowledge::ThreadSafeContext &context, TransportSettings &config, bool launch_transport)
Constructor.
threads::Threader read_threads_
threads for reading knowledge updates
long send_data(const knowledge::KnowledgeMap &updates) override
Sends a list of knowledge updates to listeners.
int setup(void)
Activates this transport.
const char * get_error_name(DDS::ReturnCode_t status)
Returns error name of the specific status.
void close(void)
Closes this transport.
static const char * topic_names_[]
int check_handle(void *handle, const char *info)
Splice handle checker.
static const char * partition_
int check_status(DDS::ReturnCode_t status, const char *info)
Splice status checker.
static const char * ret_code_names[]
knowledge::KnowledgeBase knowledge_
knowledge base for threads to use
~SpliceDDSTransport()
Destructor.
int reliability(void) const
Accesses reliability setting.
Thread for reading knowledge updates via waitsets.
Holds basic transport settings.
uint32_t max_quality(const KnowledgeRecords &records)
Returns the maximum quality within the records.
::std::map< std::string, KnowledgeRecord > KnowledgeMap
Provides knowledge logging services to files and terminals.
std::string & dds_topicify(std::string &input)
Changes periods to underscores in compliance with OpenSplice needs.
int64_t get_time(void)
Returns a time of day in nanoseconds If simtime feature is enabled, this may be simulation time inste...
Copyright(c) 2020 Galois.