16 Knowledge::UpdateDataReader_ptr & update_reader,
17 Knowledge::UpdateDataWriter_ptr & update_writer,
21 : id_ (id), settings_ (settings), context_ (&context),
22 update_reader_ (update_reader),
23 update_writer_ (update_writer),
24 send_monitor_ (send_monitor),
25 receive_monitor_ (receive_monitor),
26 packet_scheduler_ (packet_scheduler)
46 #ifndef _MADARA_NO_KARL_ 48 "UdpTransportReadThread::init:" \
49 " setting rules to %s\n",
54 #endif // _MADARA_NO_KARL_ 59 "UdpTransportReadThread::init:" \
60 " no permanent rules were set\n");
72 const char * print_prefix,
85 ssize_t bytes_sent (result +
sizeof (Knowledge::Update));
86 DDS::ReturnCode_t dds_result;
87 DDS::InstanceHandle_t handle;
95 Knowledge::Update data;
97 data.buffer = Knowledge::seq_oct (result, result,
99 data.clock = cur_clock;
100 data.quality = quality;
101 data.updates = DDS::ULong (records.size ());
102 data.originator = DDS::string_dup(
id_.c_str ());
113 " Sent packet of size %d\n",
121 " Send bandwidth = %d B/s\n",
130 DDS::SampleInfoSeq_var infoList =
new DDS::SampleInfoSeq;
131 DDS::ReturnCode_t dds_result;
133 DDS::Boolean result =
false;
134 const char * print_prefix =
"SpliceReadThread::svc";
135 Knowledge::UpdateSeq_var update_data_list_ =
new Knowledge::UpdateSeq;
137 DDS::WaitSet waitset_;
138 DDS::StatusCondition_ptr condition_;
141 condition_->set_enabled_statuses (DDS::DATA_AVAILABLE_STATUS);
142 waitset_.attach_condition (condition_);
144 ::DDS::Duration_t wait_time;
146 wait_time.nanosec = 0;
151 "%s: entering processing loop.\n", print_prefix);
156 DDS::ConditionSeq_var conditionList =
new DDS::ConditionSeq();
157 result = waitset_.wait (conditionList.inout (), wait_time);
160 "%s: entering a take on the DDS reader.\n", print_prefix);
162 dds_result =
update_reader_->take (update_data_list_, infoList, 20,
163 DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, DDS::ANY_INSTANCE_STATE);
165 amount = update_data_list_->length ();
169 for (
int i = 0; i < amount; ++i)
174 if (!update_data_list_[i].originator.val ())
179 "%s: discarding null originator event.\n", print_prefix);
187 "%s: discarding non-assignment event.\n", print_prefix);
197 update_data_list_[i].buffer.length (),
id_, *
context_,
204 if (header->
ttl > 0 && rebroadcast_records.size () > 0 &&
205 settings_.get_participant_ttl () > 0)
208 header->
ttl = std::min (
209 settings_.get_participant_ttl (), header->
ttl);
211 rebroadcast (print_prefix, header, rebroadcast_records);
221 dds_result =
update_reader_->return_loan (update_data_list_, infoList);
uint32_t max_quality(const KnowledgeRecords &records)
Returns the maximum quality within the records.
BandwidthMonitor & send_monitor_
monitor for sending bandwidth usage
SpliceReadThread(const std::string &id, const TransportSettings &settings, knowledge::ThreadSafeContext &context, Knowledge::UpdateDataReader_ptr &update_reader, Knowledge::UpdateDataWriter_ptr &update_writer, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, PacketScheduler &packet_scheduler)
Constructor.
::Knowledge::UpdateDataWriter_var update_writer_
The DDS data writer that we can write to.
const QoSTransportSettings settings_
Transport settings.
uint64_t get_bytes_per_second(void)
Queries the monitor for the current bandwidth utilization per second over the past window...
int64_t get_time(void)
Returns a time of day in nanoseconds If simtime feature is enabled, this may be simulation time inste...
This class stores variables and their values for use by any entity needing state information in a thr...
Provides scheduler for dropping packets.
unsigned char get_rebroadcast_ttl(void) const
Gets the time to live for rebroadcasting (number of rebroadcasts per message).
BandwidthMonitor & receive_monitor_
monitor for receiving bandwidth usage
Provides knowledge logging services to files and terminals.
Holds basic transport settings.
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
madara::utility::ScopedArray< char > buffer_
buffer for receiving
void rebroadcast(const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records)
Sends a rebroadcast packet.
knowledge::CompiledExpression on_data_received_
data received rules, defined in Transport settings
T * get_ptr(void)
get the underlying pointer
void init(knowledge::KnowledgeBase &knowledge)
Initializes MADARA context-related items.
::Knowledge::UpdateDataReader_var update_reader_
The DDS data reader that we will take from.
This class provides a distributed knowledge base to users.
Parses incoming expression strings into a parse tree and generates an expression tree from the parse ...
::std::map< std::string, KnowledgeRecord > KnowledgeMap
CompiledExpression compile(const std::string &expression)
Compiles a KaRL expression into an expression tree.
void add(uint64_t size)
Adds a message to the monitor.
void cleanup(void)
Cleanup function called by thread manager.
std::string on_data_received_logic
logic to be evaluated after every successful update
uint32_t queue_length
Length of the buffer used to store history of events.
ThreadSafeContext & get_context(void)
Returns the ThreadSafeContext associated with this Knowledge Base.
Provides utility functions and classes for common tasks and needs.
Provides monitoring capability of a transport's bandwidth.
Provides functions and classes for the distributed knowledge base.
uint64_t get_clock(void) const
Atomically gets the Lamport clock.
const std::string id_
Unique identifier for this entity (e.g., host:port)
PacketScheduler & packet_scheduler_
scheduler for mimicking target network conditions
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
int process_received_update(const char *buffer, uint32_t bytes_read, const std::string &id, knowledge::ThreadSafeContext &context, const QoSTransportSettings &settings, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, knowledge::KnowledgeMap &rebroadcast_records, knowledge::CompiledExpression &on_data_received, const char *print_prefix, const char *remote_host, MessageHeader *&header)
Processes a received update, updates monitors, fills rebroadcast records according to settings filter...
int prep_rebroadcast(knowledge::ThreadSafeContext &context, char *buffer, int64_t &buffer_remaining, const QoSTransportSettings &settings, const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records, PacketScheduler &packet_scheduler)
Preps a buffer for rebroadcasting records to other agents on the network.
knowledge::ThreadSafeContext * context_
The knowledge context that we will be updating.
void run(void)
The main loop internals for the read thread.