MADARA  3.4.1
BasicASIOTransport.cpp
Go to the documentation of this file.
1 
4 
10 
11 #include <iostream>
13 
14 namespace madara
15 {
16 namespace transport
17 {
18 namespace mcast = ip::multicast;
19 
20 // const double BasicASIOTransport::default_read_hertz = 10.0;
21 
24  : Base(id, config, context)
25 {
26  // create a reference to the knowledge base for threading
27  knowledge_.use(context);
28 
29  // set the data plane for the read threads
31 }
32 
34 {
36 }
37 
39 {
40  this->invalidate_transport();
41 
43 
45 
46  socket_.close();
47 }
48 
50 {
51  // call base setup method to initialize certain common variables
52  if (Base::setup() < 0)
53  {
54  return -1;
55  }
56 
57  // resize addresses to be the size of the list of hosts
58  addresses_.clear();
59  addresses_.reserve(this->settings_.hosts.size());
60 
61  if (settings_.hosts.size() == 0)
62  {
64  "BasicASIOTransport::setup:"
65  " No host addresses. Aborting setup.\n");
66  this->invalidate_transport();
67  return -1;
68  }
69 
70  // convert the string host:port into an asio address
71  for (unsigned int i = 0; i < settings_.hosts.size(); ++i)
72  {
73  try
74  {
75  auto addr_parts = utility::parse_address(settings_.hosts[i]);
76 
77  auto addr = ip::address::from_string(addr_parts.first);
78  addresses_.emplace_back(addr, addr_parts.second);
79 
81  "BasicASIOTransport::setup:"
82  " settings address[%d] to %s:%d\n",
83  i, addresses_.back().address().to_string().c_str(),
84  addresses_.back().port());
85  }
86  catch (const boost::system::system_error& e)
87  {
89  "BasicASIOTransport::setup:"
90  " Error parsing address %s: %s\n",
91  settings_.hosts[i].c_str(), e.what());
92  }
93  }
94 
95  if (addresses_.size() == 0)
96  {
98  "BasicASIOTransport::setup:"
99  " No valid addresses. Aborting setup.\n");
100  this->invalidate_transport();
101  return -1;
102  }
103 
104  try
105  {
106  socket_.open(addresses_[0].protocol());
107  }
108  catch (const boost::system::system_error& e)
109  {
111  "BasicASIOTransport::setup:"
112  " Error opening sockets: %s\n",
113  e.what());
114 
115  this->invalidate_transport();
116  return -1;
117  }
118 
119  int ret = setup_sockets();
120  if (ret < 0)
121  {
122  return ret;
123  }
124 
125  ret = setup_read_threads();
126  if (ret < 0)
127  {
128  return ret;
129  }
130 
131  return this->validate_transport();
132 }
133 
134 int BasicASIOTransport::setup_socket(udp::socket& socket)
135 {
136  int tar_buff_size(settings_.queue_length);
137 
138  asio::socket_base::send_buffer_size send_buffer_option;
139  asio::socket_base::receive_buffer_size receive_buffer_option;
140 
141  try
142  {
143  socket.get_option(send_buffer_option);
144  int send_buff_size = send_buffer_option.value();
145 
146  socket.get_option(receive_buffer_option);
147  int rcv_buff_size = receive_buffer_option.value();
148 
150  "BasicASIOTransport::setup:"
151  " default socket buff size is send=%d, rcv=%d\n",
152  send_buff_size, rcv_buff_size);
153 
154  if (send_buff_size < tar_buff_size)
155  {
157  "BasicASIOTransport::setup:"
158  " setting send buff size to settings.queue_length (%d)\n",
159  tar_buff_size);
160 
161  send_buffer_option = tar_buff_size;
162  socket.set_option(send_buffer_option);
163 
164  socket.get_option(send_buffer_option);
165  send_buff_size = send_buffer_option.value();
166 
168  "BasicASIOTransport::setup:"
169  " current socket buff size is send=%d, rcv=%d\n",
170  send_buff_size, rcv_buff_size);
171  }
172 
173  if (rcv_buff_size < tar_buff_size)
174  {
176  "BasicASIOTransport::setup:"
177  " setting rcv buff size to settings.queue_length (%d)\n",
178  tar_buff_size);
179 
180  receive_buffer_option = tar_buff_size;
181  socket.set_option(receive_buffer_option);
182 
183  socket.get_option(receive_buffer_option);
184  rcv_buff_size = receive_buffer_option.value();
185 
187  "BasicASIOTransport::setup:"
188  " current socket buff size is send=%d, rcv=%d\n",
189  send_buff_size, rcv_buff_size);
190  }
191  }
192  catch (const boost::system::system_error& e)
193  {
195  "BasicASIOTransport::setup:"
196  " Error setting up sockets: %s\n",
197  e.what());
198 
199  this->invalidate_transport();
200  return -1;
201  }
202 
203  return 0;
204 }
205 
207 {
208  return setup_socket(socket_);
209 }
210 
212 {
213  // return setup_socket (write_socket_);
214  return 0;
215 }
216 
218 {
219  int ret = setup_read_socket();
220  if (ret < 0)
221  {
222  return ret;
223  }
224  ret = setup_write_socket();
225  if (ret < 0)
226  {
227  return ret;
228  }
229 
230  return 0;
231 }
232 
234 {
235  if (!settings_.no_receiving)
236  {
237  double hertz = settings_.read_thread_hertz;
238  if (hertz < 0.0)
239  {
240  // we need to maintain backwards compatibility
241  // people should be capable of bursting reads especially
242  hertz = 0.0;
243  }
244 
246  "BasicASIOTransport::setup:"
247  " starting %d threads at %f hertz\n",
248  settings_.read_threads, hertz);
249 
250  for (uint32_t i = 0; i < settings_.read_threads; ++i)
251  {
252  std::stringstream thread_name;
253  thread_name << "read";
254  thread_name << i;
255 
257  "BasicASIOTransport::setup:"
258  " starting thread %s at %f hertz\n",
259  thread_name.str().c_str(), hertz);
260  setup_read_thread(hertz, thread_name.str());
261  }
262  }
263 
264  return 0;
265 }
266 }
267 }
#define madara_logger_log(loggering, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
void use(ThreadSafeContext &original)
Refer to and use another knowledge base's context.
This class stores variables and their values for use by any entity needing state information in a thr...
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
void terminate(const std::string &name)
Requests a specific thread to terminate.
Definition: Threader.cpp:174
bool wait(const std::string &name, const knowledge::WaitSettings &ws=knowledge::WaitSettings())
Wait for a specific thread to complete.
Definition: Threader.cpp:194
void set_data_plane(knowledge::KnowledgeBase &data_plane)
Sets the data plane for new threads.
Definition: Threader.cpp:168
Base class from which all transports must be derived.
Definition: Transport.h:46
int validate_transport(void)
Validates a transport to indicate it is not shutting down.
Definition: Transport.inl:6
QoSTransportSettings settings_
Definition: Transport.h:132
madara::knowledge::ThreadSafeContext & context_
Definition: Transport.h:135
virtual int setup(void)
all subclasses should call this method at the end of its setup
Definition: Transport.cpp:32
void invalidate_transport(void)
Invalidates a transport to indicate it is shutting down.
Definition: Transport.inl:31
threads::Threader read_threads_
threads for reading knowledge updates
virtual int setup_read_thread(double hertz, const std::string &name)=0
BasicASIOTransport(const std::string &id, madara::knowledge::ThreadSafeContext &context, TransportSettings &config)
std::vector< udp::endpoint > addresses_
holds all multicast addresses we are sending to
udp::socket socket_
underlying socket
knowledge::KnowledgeBase knowledge_
knowledge base for threads to use
int setup() override
all subclasses should call this method at the end of its setup
void close() override
Closes this transport.
virtual int setup_socket(udp::socket &socket)
Holds basic transport settings.
bool no_receiving
if true, never receive over transport
std::vector< std::string > hosts
Host information for transports that require it.
uint32_t queue_length
Length of the buffer used to store history of events.
uint32_t read_threads
the number of read threads to start
double read_thread_hertz
Number of valid messages allowed to be received per second.
constexpr string_t string
std::pair< std::string, uint16_t > parse_address(std::string addr)
Definition: Utility.cpp:797
Copyright(c) 2020 Galois.