MADARA  3.2.3
BasicASIOTransport.cpp
Go to the documentation of this file.
1 
4 
10 
11 #include <iostream>
13 
14 namespace madara { namespace transport {
15 
16 namespace mcast = ip::multicast;
17 
18 //const double BasicASIOTransport::default_read_hertz = 10.0;
19 
22  TransportSettings & config)
23 : Base (id, config, context)
24 {
25  // create a reference to the knowledge base for threading
26  knowledge_.use (context);
27 
28  // set the data plane for the read threads
30 }
31 
33 {
35 }
36 
37 void
39 {
40  this->invalidate_transport ();
41 
43 
45 
46  socket_.close ();
47 }
48 
49 int
51 {
52  // call base setup method to initialize certain common variables
53  if (Base::setup () < 0) {
54  return -1;
55  }
56 
57  // resize addresses to be the size of the list of hosts
58  addresses_.clear ();
59  addresses_.reserve (this->settings_.hosts.size ());
60 
61  if (settings_.hosts.size () == 0) {
63  "BasicASIOTransport::setup:" \
64  " No host addresses. Aborting setup.\n");
65  this->invalidate_transport ();
66  return -1;
67  }
68 
69  // convert the string host:port into an asio address
70  for (unsigned int i = 0; i < settings_.hosts.size (); ++i)
71  {
72  try {
73  auto addr_parts = utility::parse_address(settings_.hosts[i]);
74 
75  auto addr = ip::address::from_string (addr_parts.first);
76  addresses_.emplace_back (addr, addr_parts.second);
77 
79  "BasicASIOTransport::setup:" \
80  " settings address[%d] to %s:%d\n", i,
81  addresses_.back ().address ().to_string ().c_str (),
82  addresses_.back ().port ());
83  } catch (const boost::system::system_error &e) {
85  "BasicASIOTransport::setup:" \
86  " Error parsing address %s: %s\n", settings_.hosts[i].c_str (), e.what ());
87  }
88  }
89 
90  if (addresses_.size () == 0) {
92  "BasicASIOTransport::setup:" \
93  " No valid addresses. Aborting setup.\n");
94  this->invalidate_transport ();
95  return -1;
96  }
97 
98  try {
99  socket_.open(addresses_[0].protocol ());
100  } catch (const boost::system::system_error &e) {
102  "BasicASIOTransport::setup:" \
103  " Error opening sockets: %s\n", e.what ());
104 
105  this->invalidate_transport ();
106  return -1;
107  }
108 
109  int ret = setup_sockets ();
110  if (ret < 0) {
111  return ret;
112  }
113 
114  ret = setup_read_threads ();
115  if (ret < 0) {
116  return ret;
117  }
118 
119  return this->validate_transport ();
120 }
121 
122 int
123 BasicASIOTransport::setup_socket (udp::socket &socket)
124 {
125  int tar_buff_size (settings_.queue_length);
126 
127  asio::socket_base::send_buffer_size send_buffer_option;
128  asio::socket_base::receive_buffer_size receive_buffer_option;
129 
130  try {
131  socket.get_option(send_buffer_option);
132  int send_buff_size = send_buffer_option.value();
133 
134  socket.get_option(receive_buffer_option);
135  int rcv_buff_size = receive_buffer_option.value();
136 
138  "BasicASIOTransport::setup:" \
139  " default socket buff size is send=%d, rcv=%d\n",
140  send_buff_size, rcv_buff_size);
141 
142  if (send_buff_size < tar_buff_size)
143  {
145  "BasicASIOTransport::setup:" \
146  " setting send buff size to settings.queue_length (%d)\n",
147  tar_buff_size);
148 
149  send_buffer_option = tar_buff_size;
150  socket.set_option (send_buffer_option);
151 
152  socket.get_option (send_buffer_option);
153  send_buff_size = send_buffer_option.value();
154 
156  "BasicASIOTransport::setup:" \
157  " current socket buff size is send=%d, rcv=%d\n",
158  send_buff_size, rcv_buff_size);
159  }
160 
161  if (rcv_buff_size < tar_buff_size)
162  {
164  "BasicASIOTransport::setup:" \
165  " setting rcv buff size to settings.queue_length (%d)\n",
166  tar_buff_size);
167 
168  receive_buffer_option = tar_buff_size;
169  socket.set_option (receive_buffer_option);
170 
171  socket.get_option (receive_buffer_option);
172  rcv_buff_size = receive_buffer_option.value();
173 
175  "BasicASIOTransport::setup:" \
176  " current socket buff size is send=%d, rcv=%d\n",
177  send_buff_size, rcv_buff_size);
178  }
179  } catch (const boost::system::system_error &e) {
181  "BasicASIOTransport::setup:" \
182  " Error setting up sockets: %s\n", e.what ());
183 
184  this->invalidate_transport ();
185  return -1;
186  }
187 
188  return 0;
189 }
190 
191 int
193 {
194  return setup_socket (socket_);
195 }
196 
197 int
199 {
200  //return setup_socket (write_socket_);
201  return 0;
202 }
203 
204 int
206 {
207  int ret = setup_read_socket ();
208  if (ret < 0) {
209  return ret;
210  }
211  ret = setup_write_socket ();
212  if (ret < 0) {
213  return ret;
214  }
215 
216  return 0;
217 }
218 
219 int
221 {
222  if (!settings_.no_receiving)
223  {
224  double hertz = settings_.read_thread_hertz;
225  if (hertz < 0.0)
226  {
227  // we need to maintain backwards compatibility
228  // people should be capable of bursting reads especially
229  hertz = 0.0;
230  }
231 
233  "BasicASIOTransport::setup:" \
234  " starting %d threads at %f hertz\n", settings_.read_threads,
235  hertz);
236 
237  for (uint32_t i = 0; i < settings_.read_threads; ++i)
238  {
239  std::stringstream thread_name;
240  thread_name << "read";
241  thread_name << i;
242 
244  "BasicASIOTransport::setup:" \
245  " starting thread %s at %f hertz\n",
246  thread_name.str ().c_str (), hertz);
247  setup_read_thread (hertz, thread_name.str ());
248  }
249  }
250 
251  return 0;
252 }
253 
254 } }
knowledge::KnowledgeBase knowledge_
knowledge base for threads to use
QoSTransportSettings settings_
Definition: Transport.h:133
void invalidate_transport(void)
Invalidates a transport to indicate it is shutting down.
Definition: Transport.inl:33
std::pair< std::string, uint16_t > parse_address(std::string addr)
Definition: Utility.cpp:789
void close() override
Closes this transport.
int validate_transport(void)
Validates a transport to indicate it is not shutting down.
Definition: Transport.inl:6
BasicASIOTransport(const std::string &id, madara::knowledge::ThreadSafeContext &context, TransportSettings &config)
udp::socket socket_
underlying socket
This class stores variables and their values for use by any entity needing state information in a thr...
std::vector< std::string > hosts
Host information for transports that require it.
double read_thread_hertz
number of valid messages allowed to be received per second.
Holds basic transport settings.
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
static struct madara::knowledge::tags::string_t string
bool no_receiving
if true, never receive over transport
std::vector< udp::endpoint > addresses_
holds all multicast addresses we are sending to
virtual int setup_socket(udp::socket &socket)
uint32_t read_threads
the number of read threads to start
uint32_t queue_length
Length of the buffer used to store history of events.
void terminate(const std::string name)
Requests a specific thread to terminate.
Definition: Threader.cpp:180
threads::Threader read_threads_
threads for reading knowledge updates
int setup() override
all subclasses should call this method at the end of its setup
void set_data_plane(knowledge::KnowledgeBase data_plane)
Sets the data plane for new threads.
Definition: Threader.cpp:173
virtual int setup_read_thread(double hertz, const std::string &name)=0
Copyright (c) 2015 Carnegie Mellon University.
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
virtual int setup(void)
all subclasses should call this method at the end of its setup
Definition: Transport.cpp:30
Base class from which all transports must be derived.
Definition: Transport.h:45
bool wait(const std::string name, const knowledge::WaitSettings &ws=knowledge::WaitSettings())
Wait for a specific thread to complete.
Definition: Threader.cpp:201
madara::knowledge::ThreadSafeContext & context_
Definition: Transport.h:136
void use(ThreadSafeContext &original)
Refer to and use another knowledge base&#39;s context.