21 "DDS_RETCODE_UNSUPPORTED",
22 "DDS_RETCODE_BAD_PARAMETER",
23 "DDS_RETCODE_PRECONDITION_NOT_MET",
24 "DDS_RETCODE_OUT_OF_RESOURCES",
25 "DDS_RETCODE_NOT_ENABLED",
26 "DDS_RETCODE_IMMUTABLE_POLICY",
27 "DDS_RETCODE_INCONSISTENT_POLICY",
28 "DDS_RETCODE_ALREADY_DELETED",
29 "DDS_RETCODE_TIMEOUT",
30 "DDS_RETCODE_NO_DATA",
31 "DDS_RETCODE_ILLEGAL_OPERATION" };
39 :
madara::transport::
Base (id, config, context),
40 domain_ (0), domain_factory_ (0),
41 domain_participant_ (0), publisher_ (0), subscriber_ (0),
42 datawriter_ (0), datareader_ (0),
43 update_writer_ (0), update_reader_ (0),
118 DDS::ReturnCode_t status;
126 "SpliceDDSTransport::setup:" \
127 " Creating a participant for topic (%s)\n",
131 "SpliceDDSTransport::setup:" \
132 " Participant settings are being read from the OSPL_URI environment" 138 domain_factory_->get_default_participant_qos (
part_qos_);
147 "\nSpliceDDSTransport::setup:" \
148 " splice daemon not running. Try 'ospl start'...\n");
157 topic_qos_.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
159 topic_qos_.resource_limits.max_samples_per_instance =
163 DDS::BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
171 "SpliceDDSTransport::setup:" \
172 " Registering type support\n");
177 if (
int ret =
check_status(status,
"Knowledge::UpdateTypeSupport::register_type") < 0)
186 "SpliceDDSTransport::setup:" \
187 " Setting up knowledge domain via topic (%s)\n",
197 "DDS::DomainParticipant::create_topic (KnowledgeUpdate)") < 0)
203 if (
int ret =
check_status(status,
"DDS::DomainParticipant::get_default_publisher_qos") < 0)
209 pub_qos_.presentation.access_scope = DDS::TOPIC_PRESENTATION_QOS;
210 pub_qos_.presentation.coherent_access =
true;
211 pub_qos_.presentation.ordered_access =
false;
216 "SpliceDDSTransport::setup:" \
217 " Creating publisher for topic (%s)\n",
224 pub_qos_, NULL, DDS::STATUS_MASK_NONE);
230 if (
int ret =
check_status(status,
"DDS::DomainParticipant::get_default_subscriber_qos") < 0)
236 sub_qos_.presentation.access_scope = DDS::TOPIC_PRESENTATION_QOS;
237 sub_qos_.presentation.coherent_access =
true;
238 sub_qos_.presentation.ordered_access =
false;
242 "SpliceDDSTransport::setup:" \
243 " Creating subscriber for topic (%s)\n",
250 sub_qos_, NULL, DDS::STATUS_MASK_NONE);
257 "SpliceDDSTransport::setup:" \
258 " pub or sub could not be created. Try 'ospl stop; ospl start'...\n");
270 "SpliceDDSTransport::setup:" \
271 " Enabling reliable transport for (%s) datawriters\n",
280 DDS::BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
285 "SpliceDDSTransport::setup:" \
286 " Enabling unreliable transport for (%s) datawriters\n",
291 "SpliceDDSTransport::setup:" \
292 " Creating datawriter for topic (%s)\n",
298 if (
int ret =
check_handle(datawriter_,
"DDS::Publisher::create_datawriter (Update)") < 0)
300 update_writer_ =
dynamic_cast<Knowledge::UpdateDataWriter_ptr
> (datawriter_.in ());
318 if (
int ret =
check_status(status,
"DDS::Subscriber::get_default_datareader_qos") < 0)
326 "SpliceDDSTransport::setup:" \
327 " Enabling reliable transport for (%s) datareaders\n",
334 DDS::BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
342 "SpliceDDSTransport::setup:" \
343 " Enabling unreliable transport for (%s) datareaders\n",
348 "SpliceDDSTransport::setup:" \
349 " Creating datareader for topic (%s)\n",
362 if (
int ret =
check_handle(datareader_,
"DDS::Subscriber::create_datareader (Update)") < 0)
364 update_reader_ =
dynamic_cast<Knowledge::UpdateDataReader_ptr
>(datareader_.in ());
377 "UdpTransportReadThread::setup:" \
383 std::stringstream thread_name;
384 thread_name <<
"read";
405 result =
prep_send (updates,
"SpliceDDSTransport::send_data:");
413 DDS::ReturnCode_t dds_result;
414 DDS::InstanceHandle_t handle;
416 Knowledge::Update data;
418 data.buffer = Knowledge::seq_oct (result, result, (
unsigned char *)
buffer_.
get_ptr ());
419 data.clock = cur_clock;
420 data.quality = quality;
421 data.updates = DDS::ULong (updates.size ());
422 data.originator = DDS::string_dup (
id_.c_str ());
429 "SpliceDDSTransport::send:" \
430 " sending multiassignment: %d updates, time=llu, quality=%d\n",
431 data.updates, cur_clock, quality);
435 result = (long)dds_result;
449 "SpliceDDSTransport::check_handle:" \
450 " error in %s: Creation failed: invalid handle\n", info);
463 if ((status == DDS::RETCODE_OK) || (status == DDS::RETCODE_NO_DATA))
467 "SpliceDDSTransport::check_status:" \
468 " error in %s: Creation failed: %s\n",
uint32_t max_quality(const KnowledgeRecords &records)
Returns the maximum quality within the records.
QoSTransportSettings settings_
DDS::SubscriberQos sub_qos_
DDS::DataReader_var datareader_
DDS::DataWriterQos datawriter_qos_
int reliability(void) const
Accesses reliability setting.
void invalidate_transport(void)
Invalidates a transport to indicate it is shutting down.
int validate_transport(void)
Validates a transport to indicate it is not shutting down.
int64_t get_time(void)
Returns a time of day in nanoseconds If simtime feature is enabled, this may be simulation time inste...
int check_status(DDS::ReturnCode_t status, const char *info)
Splice status checker.
This class stores variables and their values for use by any entity needing state information in a thr...
DDS::Publisher_var publisher_
DDS::DomainParticipant_var domain_participant_
Knowledge::UpdateDataReader_var update_reader_
unsigned char get_rebroadcast_ttl(void) const
Gets the time to live for rebroadcasting (number of rebroadcasts per message).
double read_thread_hertz
number of valid messages allowed to be received per second.
Provides knowledge logging services to files and terminals.
Holds basic transport settings.
Knowledge::UpdateDataWriter_var update_writer_
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
DDS::DomainParticipantFactory_var domain_factory_
DDS::PublisherQos pub_qos_
BandwidthMonitor receive_monitor_
monitor for receiving bandwidth usage
~SpliceDDSTransport()
Destructor.
void run(const std::string name, BaseThread *thread, bool paused=false)
Starts a new thread and executes the provided user thread once.
static const char * partition_
DDS::Topic_var update_topic_
long send_data(const knowledge::VariableReferenceMap &updates) override
Sends a list of knowledge updates to listeners.
static const char * ret_code_names[]
DDS::DataWriter_var datawriter_
threads::Threader read_threads_
threads for reading knowledge updates
T * get_ptr(void)
get the underlying pointer
bool no_receiving
if true, never receive over transport
SpliceDDSTransport(const std::string &id, knowledge::ThreadSafeContext &context, TransportSettings &config, bool launch_transport)
Constructor.
std::string write_domain
All class members are accessible to users for easy setup.
volatile bool shutting_down_
knowledge::KnowledgeBase knowledge_
knowledge base for threads to use
bool no_sending
if true, never send over transport
Knowledge::UpdateDataWriter_var latency_update_writer_
std::string & dds_topicify(std::string &input)
Changes periods to underscores in compliance with OpenSplice needs.
DDS::DomainParticipantQos part_qos_
DDS::Subscriber_var subscriber_
Knowledge::UpdateTypeSupport update_type_support_
int setup(void)
Activates this transport.
Thread for reading knowledge updates via waitsets.
const std::string id_
host:port identifier of this process
DDS::DataWriter_var latencywriter_
uint32_t read_threads
the number of read threads to start
void close(void)
Closes this transport.
std::map< const char *, VariableReference, VariableReferenceMapCompare > VariableReferenceMap
a map of variable references
uint32_t queue_length
Length of the buffer used to store history of events.
void terminate(const std::string name)
Requests a specific thread to terminate.
DDS::DataReaderQos datareader_qos_
static const char * topic_names_[]
uint64_t get_clock(void) const
Atomically gets the Lamport clock.
void set_data_plane(knowledge::KnowledgeBase data_plane)
Sets the data plane for new threads.
PacketScheduler packet_scheduler_
scheduler for dropping packets to simulate network issues
uint32_t reliability
Reliability required of the transport.
Copyright (c) 2015 Carnegie Mellon University.
madara::utility::ScopedArray< char > buffer_
buffer for sending
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
const char * get_error_name(DDS::ReturnCode_t status)
Returns error name of the specific status.
virtual int setup(void)
all subclasses should call this method at the end of its setup
Base class from which all transports must be derived.
long prep_send(const knowledge::VariableReferenceMap &orig_updates, const char *print_prefix)
Preps a message for sending.
bool wait(const std::string name, const knowledge::WaitSettings &ws=knowledge::WaitSettings())
Wait for a specific thread to complete.
BandwidthMonitor send_monitor_
monitor for sending bandwidth usage
madara::knowledge::ThreadSafeContext & context_
int check_handle(void *handle, const char *info)
Splice handle checker.
void use(ThreadSafeContext &original)
Refer to and use another knowledge base's context.