MADARA  3.2.3
UdpRegistryClient.cpp
Go to the documentation of this file.
3 
7 
8 
9 #include <iostream>
10 
11 namespace madara { namespace transport {
12 
15  TransportSettings & config, bool launch_transport)
16  : UdpTransport (id, context, config, false)
17 {
18  endpoints_.set_name ("domain." + config.write_domain + ".endpoints", knowledge_);
19 
20  if (launch_transport)
21  setup ();
22 }
23 
24 int
26 {
27  // call base setup method to initialize certain common variables
28  int ret = UdpTransport::setup ();
29  if (ret < 0) {
30  return ret;
31  }
32 
33  if (addresses_.size () > 0)
34  {
35  servers_.clear ();
36 
37  // UdpTransport::setup puts hosts into addresses_; move all but first to
38  // servers_ instead
39  for (size_t i = 1; i < addresses_.size (); ++i)
40  {
42  "UdpRegistryClient::setup:" \
43  " adding server %s:%d to registry lookup list\n",
44  addresses_[i].address ().to_string ().c_str (),
45  (int)addresses_[i].port ());
46 
47  servers_.emplace_back (std::move(addresses_[i]));
48  }
49 
50  addresses_.resize (1);
51  }
52 
53  return this->validate_transport ();
54 }
55 
56 void
58 {
59  const char * print_prefix = "UdpRegistryClient::register";
60 
61  long result (0);
62  uint64_t bytes_sent = 0;
63 
64  if (servers_.size () > 0)
65  {
66  // Register messages always use the message header to include domain
67  MessageHeader header;
68  header.type = transport::REGISTER;
69  strncpy (header.originator, this->id_.c_str (), sizeof (header.originator) - 1);
70  strncpy (header.domain, this->settings_.write_domain.c_str (),
71  sizeof (header.domain) - 1);
72  header.updates = 0;
73  header.clock = context_.get_clock ();
74  // compute size of this header
75  header.size = header.encoded_size ();
76 
77  int64_t buffer_remaining = settings_.queue_length;
78 
79  char * update = header.write (buffer_.get_ptr (), buffer_remaining);
80 
81  result = (long)(update - buffer_.get_ptr ());
82 
83  for (const auto &server : servers_)
84  {
86  "%s:" \
87  " Sending register of size %d to %s:%d\n",
88  print_prefix, (int)result,
89  server.address ().to_string ().c_str (),
90  server.port ());
91 
92  ssize_t actual_sent = send_buffer(server, buffer_.get_ptr (), result);
93 
94  if (actual_sent > 0)
95  {
96  bytes_sent += actual_sent;
97 
99  "%s:" \
100  " Sent register of size %d to %s\n",
101  print_prefix, (int)actual_sent,
102  server.address ().to_string ().c_str (), server.port ());
103 
104  send_monitor_.add ((uint32_t)actual_sent);
105  }
106  }
107  }
108  else
109  {
111  "%s:" \
112  " ERROR: no servers available for sending. Failed to register.\n");
113  }
114 }
115 
116 long
118  const knowledge::VariableReferenceMap & orig_updates)
119 {
120  if (!settings_.no_sending)
121  {
122  this->endpoints_.sync_keys ();
123 
124  std::vector <std::string> hosts;
125  this->addresses_.resize (1);
126  this->endpoints_.keys (hosts);
127 
128  for (auto &host : hosts)
129  {
130  auto addr_parts = utility::parse_address (std::move(host));
131  auto addr = ip::address::from_string (addr_parts.first);
132  addresses_.emplace_back (addr, addr_parts.second);
133 
135  "UdpRegistryClient::send_data:" \
136  " adding %s:%d\n",
137  addresses_.back ().address ().to_string ().c_str (),
138  addresses_.back ().port ());
139  }
140 
141  send_register ();
142  }
143  return UdpTransport::send_data (orig_updates);
144 }
145 
146 } }
virtual uint32_t encoded_size(void) const
Returns the size of the encoded MessageHeader class, which may be different from sizeof (MessageHeade...
knowledge::KnowledgeBase knowledge_
knowledge base for threads to use
QoSTransportSettings settings_
Definition: Transport.h:133
int setup(void) override
all subclasses should call this method at the end of its setup
std::pair< std::string, uint16_t > parse_address(std::string addr)
Definition: Utility.cpp:789
knowledge::containers::Map endpoints_
int validate_transport(void)
Validates a transport to indicate it is not shutting down.
Definition: Transport.inl:6
uint32_t updates
the number of knowledge variable updates in the message
std::vector< udp::endpoint > servers_
registry servers
void keys(std::vector< std::string > &curkeys) const
Returns the keys within the map.
Definition: Map.cpp:516
This class stores variables and their values for use by any entity needing state information in a thr...
std::vector< std::string > sync_keys(void)
Syncs the keys from the knowledge base.
Definition: Map.cpp:225
long send_buffer(const udp::endpoint &target, const char *buf, size_t size)
Holds basic transport settings.
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
uint32_t type
the type of message
static struct madara::knowledge::tags::string_t string
T * get_ptr(void)
get the underlying pointer
Definition: ScopedArray.inl:68
std::string write_domain
All class members are accessible to users for easy setup.
uint64_t size
the size of this header plus the updates
void set_name(const std::string &var_name, KnowledgeBase &knowledge, bool sync=true)
Sets the variable name that this refers to.
Definition: Map.cpp:415
bool no_sending
if true, never send over transport
void add(uint64_t size)
Adds a message to the monitor.
void send_register(void)
Sends register messages to all servers.
std::vector< udp::endpoint > addresses_
holds all multicast addresses we are sending to
long send_data(const madara::knowledge::VariableReferenceMap &updates) override
Sends a list of knowledge updates to listeners.
std::map< const char *, VariableReference, VariableReferenceMapCompare > VariableReferenceMap
a map of variable references
char originator[64]
the originator of the message (host:port)
char domain[32]
the domain that this message is intended for
uint32_t queue_length
Length of the buffer used to store history of events.
UdpRegistryClient(const std::string &id, madara::knowledge::ThreadSafeContext &context, TransportSettings &config, bool launch_transport)
Constructor.
int setup() override
all subclasses should call this method at the end of its setup
uint64_t get_clock(void) const
Atomically gets the Lamport clock.
uint64_t clock
the clock of the sender when the message was generated
Copyright (c) 2015 Carnegie Mellon University.
madara::utility::ScopedArray< char > buffer_
buffer for sending
Definition: Transport.h:154
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
Defines a robust message header which is the default for KaRL messages.
Definition: MessageHeader.h:57
UDP-based transport for knowledge.
Definition: UdpTransport.h:37
long send_data(const madara::knowledge::VariableReferenceMap &updates) override
Sends a list of knowledge updates to listeners.
BandwidthMonitor send_monitor_
monitor for sending bandwidth usage
Definition: Transport.h:145
madara::knowledge::ThreadSafeContext & context_
Definition: Transport.h:136
virtual char * write(char *buffer, int64_t &buffer_remaining)
Writes a MessageHeader instance to a buffer and updates the amount of buffer room remaining...