MADARA  3.4.1
UdpRegistryClient.cpp
Go to the documentation of this file.
3 
7 
8 #include <iostream>
9 
10 namespace madara
11 {
12 namespace transport
13 {
16  bool launch_transport)
17  : UdpTransport(id, context, config, false)
18 {
20  "domain." + config.write_domain + ".endpoints", knowledge_);
21 
22  if (launch_transport)
23  setup();
24 }
25 
27 {
28  // call base setup method to initialize certain common variables
29  int ret = UdpTransport::setup();
30  if (ret < 0)
31  {
32  return ret;
33  }
34 
35  if (addresses_.size() > 0)
36  {
37  servers_.clear();
38 
39  // UdpTransport::setup puts hosts into addresses_; move all but first to
40  // servers_ instead
41  for (size_t i = 1; i < addresses_.size(); ++i)
42  {
44  "UdpRegistryClient::setup:"
45  " adding server %s:%d to registry lookup list\n",
46  addresses_[i].address().to_string().c_str(),
47  (int)addresses_[i].port());
48 
49  servers_.emplace_back(std::move(addresses_[i]));
50  }
51 
52  addresses_.resize(1);
53  }
54 
55  return this->validate_transport();
56 }
57 
59 {
60  const char* print_prefix = "UdpRegistryClient::register";
61 
62  long result(0);
63  //uint64_t bytes_sent = 0;
64 
65  if (servers_.size() > 0)
66  {
67  // Register messages always use the message header to include domain
68  MessageHeader header;
69  header.type = transport::REGISTER;
71  header.originator, this->id_.c_str(), sizeof(header.originator));
72  utility::strncpy_safe(header.domain, this->settings_.write_domain.c_str(),
73  sizeof(header.domain));
74  header.updates = 0;
75  header.clock = context_.get_clock();
76  // compute size of this header
77  header.size = header.encoded_size();
78 
79  int64_t buffer_remaining = settings_.queue_length;
80 
81  char* update = header.write(buffer_.get_ptr(), buffer_remaining);
82 
83  result = (long)(update - buffer_.get_ptr());
84 
85  for (const auto& server : servers_)
86  {
88  "%s:"
89  " Sending register of size %d to %s:%d\n",
90  print_prefix, (int)result, server.address().to_string().c_str(),
91  server.port());
92 
93  ssize_t actual_sent = send_buffer(server, buffer_.get_ptr(), result);
94 
95  if (actual_sent > 0)
96  {
97  //bytes_sent += actual_sent;
98 
100  "%s:"
101  " Sent register of size %d to %s\n",
102  print_prefix, (int)actual_sent,
103  server.address().to_string().c_str(), server.port());
104 
105  send_monitor_.add((uint32_t)actual_sent);
106  }
107  }
108  }
109  else
110  {
112  "%s:"
113  " ERROR: no servers available for sending. Failed to register.\n");
114  }
115 }
116 
118  const knowledge::KnowledgeMap& orig_updates)
119 {
120  if (!settings_.no_sending)
121  {
122  this->endpoints_.sync_keys();
123 
124  std::vector<std::string> hosts;
125  this->addresses_.resize(1);
126  this->endpoints_.keys(hosts);
127 
128  for (auto& host : hosts)
129  {
130  auto addr_parts = utility::parse_address(std::move(host));
131  auto addr = ip::address::from_string(addr_parts.first);
132  addresses_.emplace_back(addr, addr_parts.second);
133 
135  "UdpRegistryClient::send_data:"
136  " adding %s:%d\n",
137  addresses_.back().address().to_string().c_str(),
138  addresses_.back().port());
139  }
140 
141  send_register();
142  }
143  return UdpTransport::send_data(orig_updates);
144 }
145 }
146 }
#define madara_logger_log(loggering, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
This class stores variables and their values for use by any entity needing state information in a thr...
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
uint64_t get_clock(void) const
Atomically gets the Lamport clock.
void set_name(const std::string &var_name, KnowledgeBase &knowledge, bool sync=true)
Sets the variable name that this refers to.
Definition: Map.cpp:411
std::vector< std::string > sync_keys(void)
Syncs the keys from the knowledge base.
Definition: Map.cpp:206
void keys(std::vector< std::string > &curkeys) const
Returns the keys within the map.
Definition: Map.cpp:501
void add(uint64_t size)
Adds a message to the monitor.
int validate_transport(void)
Validates a transport to indicate it is not shutting down.
Definition: Transport.inl:6
madara::utility::ScopedArray< char > buffer_
buffer for sending
Definition: Transport.h:153
QoSTransportSettings settings_
Definition: Transport.h:132
madara::knowledge::ThreadSafeContext & context_
Definition: Transport.h:135
BandwidthMonitor send_monitor_
monitor for sending bandwidth usage
Definition: Transport.h:144
std::vector< udp::endpoint > addresses_
holds all multicast addresses we are sending to
knowledge::KnowledgeBase knowledge_
knowledge base for threads to use
int setup() override
all subclasses should call this method at the end of its setup
Defines a robust message header which is the default for KaRL messages.
Definition: MessageHeader.h:57
char domain[32]
the domain that this message is intended for
char originator[64]
the originator of the message (host:port)
uint32_t updates
the number of knowledge variable updates in the message
uint32_t type
the type of message
uint64_t clock
the clock of the sender when the message was generated
uint64_t size
the size of this header plus the updates
virtual uint32_t encoded_size(void) const
Returns the size of the encoded MessageHeader class, which may be different from sizeof (MessageHeade...
virtual char * write(char *buffer, int64_t &buffer_remaining)
Writes a MessageHeader instance to a buffer and updates the amount of buffer room remaining.
Holds basic transport settings.
uint32_t queue_length
Length of the buffer used to store history of events.
std::string write_domain
All class members are accessible to users for easy setup.
bool no_sending
if true, never send over transport
int setup(void) override
all subclasses should call this method at the end of its setup
std::vector< udp::endpoint > servers_
registry servers
long send_data(const madara::knowledge::KnowledgeMap &updates) override
Sends a list of knowledge updates to listeners.
knowledge::containers::Map endpoints_
UdpRegistryClient(const std::string &id, madara::knowledge::ThreadSafeContext &context, TransportSettings &config, bool launch_transport)
Constructor.
void send_register(void)
Sends register messages to all servers.
UDP-based transport for knowledge.
Definition: UdpTransport.h:39
long send_data(const madara::knowledge::KnowledgeMap &updates) override
Sends a list of knowledge updates to listeners.
long send_buffer(const udp::endpoint &target, const char *buf, size_t size)
Sends a buffer to a host endpoint.
T * get_ptr(void)
get the underlying pointer
Definition: ScopedArray.inl:64
constexpr string_t string
::std::map< std::string, KnowledgeRecord > KnowledgeMap
MADARA_EXPORT void strncpy_safe(char *dst, const char *src, size_t dst_size)
Performs a strncpy in a way that will compile without warnings.
Definition: Utility.cpp:376
std::pair< std::string, uint16_t > parse_address(std::string addr)
Definition: Utility.cpp:797
Copyright(c) 2020 Galois.