15 Knowledge::UpdateDataReader_ptr& update_reader,
16 Knowledge::UpdateDataWriter_ptr& update_writer,
22 update_reader_(update_reader),
23 update_writer_(update_writer),
24 send_monitor_(send_monitor),
25 receive_monitor_(receive_monitor),
26 packet_scheduler_(packet_scheduler)
36 if (settings_.queue_length > 0)
37 buffer_ =
new char[settings_.queue_length];
42 if (settings_.on_data_received_logic.length() != 0)
44 #ifndef _MADARA_NO_KARL_
46 "UdpTransportReadThread::init:"
47 " setting rules to %s\n",
48 settings_.on_data_received_logic.c_str());
51 on_data_received_ =
context_->compile(settings_.on_data_received_logic);
57 "UdpTransportReadThread::init:"
58 " no permanent rules were set\n");
68 int64_t buffer_remaining = (int64_t)settings_.queue_length;
69 char* buffer = buffer_.get_ptr();
71 settings_, print_prefix, header, records, packet_scheduler_);
75 ssize_t bytes_sent(result +
sizeof(Knowledge::Update));
76 DDS::ReturnCode_t dds_result;
77 DDS::InstanceHandle_t handle;
83 unsigned long long cur_clock =
context_->get_clock();
85 Knowledge::Update data;
88 Knowledge::seq_oct(result, result, (
unsigned char*)buffer_.get_ptr());
89 data.clock = cur_clock;
90 data.quality = quality;
91 data.updates = DDS::ULong(records.size());
92 data.originator = DDS::string_dup(id_.c_str());
94 data.ttl = settings_.get_rebroadcast_ttl();
98 handle = update_writer_->register_instance(data);
99 dds_result = update_writer_->write(data, handle);
103 " Sent packet of size %d\n",
104 print_prefix, bytes_sent);
106 send_monitor_.add((uint32_t)bytes_sent);
110 " Send bandwidth = %d B/s\n",
111 print_prefix, send_monitor_.get_bytes_per_second());
117 DDS::SampleInfoSeq_var infoList =
new DDS::SampleInfoSeq;
118 DDS::ReturnCode_t dds_result;
120 DDS::Boolean result =
false;
121 const char* print_prefix =
"SpliceReadThread::svc";
122 Knowledge::UpdateSeq_var update_data_list_ =
new Knowledge::UpdateSeq;
124 DDS::WaitSet waitset_;
125 DDS::StatusCondition_ptr condition_;
127 condition_ = update_reader_->get_statuscondition();
128 condition_->set_enabled_statuses(DDS::DATA_AVAILABLE_STATUS);
129 waitset_.attach_condition(condition_);
131 ::DDS::Duration_t wait_time;
133 wait_time.nanosec = 0;
138 "%s: entering processing loop.\n", print_prefix);
143 DDS::ConditionSeq_var conditionList =
new DDS::ConditionSeq();
144 result = waitset_.wait(conditionList.inout(), wait_time);
147 "%s: entering a take on the DDS reader.\n", print_prefix);
149 dds_result = update_reader_->take(update_data_list_, infoList, 20,
150 DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, DDS::ANY_INSTANCE_STATE);
152 amount = update_data_list_->length();
156 for (
int i = 0; i < amount; ++i)
161 if (!update_data_list_[i].originator.val())
166 "%s: discarding null originator event.\n", print_prefix);
174 "%s: discarding non-assignment event.\n", print_prefix);
184 update_data_list_[i].buffer.length(), id_, *
context_, settings_,
185 send_monitor_, receive_monitor_, rebroadcast_records,
186 on_data_received_, print_prefix,
"", header);
190 if (header->
ttl > 0 && rebroadcast_records.size() > 0 &&
191 settings_.get_participant_ttl() > 0)
194 header->
ttl = std::min(settings_.get_participant_ttl(), header->
ttl);
196 rebroadcast(print_prefix, header, rebroadcast_records);
205 dds_result = update_reader_->return_loan(update_data_list_, infoList);
#define madara_logger_log(loggering, level,...)
Fast version of the madara::logger::log method.
const ThreadSafeContext * context_
Parses incoming expression strings into a parse tree and generates an expression tree from the parse ...
This class provides a distributed knowledge base to users.
This class stores variables and their values for use by any entity needing state information in a thr...
Provides monitoring capability of a transport's bandwidth.
Provides scheduler for dropping packets.
void cleanup(void)
Cleanup function called by thread manager.
SpliceReadThread(const std::string &id, const TransportSettings &settings, knowledge::ThreadSafeContext &context, Knowledge::UpdateDataReader_ptr &update_reader, Knowledge::UpdateDataWriter_ptr &update_writer, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, PacketScheduler &packet_scheduler)
Constructor.
void init(knowledge::KnowledgeBase &knowledge)
Initializes MADARA context-related items.
void run(void)
The main loop internals for the read thread.
void rebroadcast(const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records)
Sends a rebroadcast packet.
Holds basic transport settings.
Provides functions and classes for the distributed knowledge base.
uint32_t max_quality(const KnowledgeRecords &records)
Returns the maximum quality within the records.
::std::map< std::string, KnowledgeRecord > KnowledgeMap
Provides knowledge logging services to files and terminals.
int process_received_update(const char *buffer, uint32_t bytes_read, const std::string &id, knowledge::ThreadSafeContext &context, const QoSTransportSettings &settings, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, knowledge::KnowledgeMap &rebroadcast_records, knowledge::CompiledExpression &on_data_received, const char *print_prefix, const char *remote_host, MessageHeader *&header)
Processes a received update, updates monitors, fills rebroadcast records according to settings filter...
int prep_rebroadcast(knowledge::ThreadSafeContext &context, char *buffer, int64_t &buffer_remaining, const QoSTransportSettings &settings, const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records, PacketScheduler &packet_scheduler)
Preps a buffer for rebroadcasting records to other agents on the network.
Provides utility functions and classes for common tasks and needs.
int64_t get_time(void)
Returns a time of day in nanoseconds If simtime feature is enabled, this may be simulation time inste...