MADARA  3.4.1
PacketScheduler.cpp
Go to the documentation of this file.
1 #include "PacketScheduler.h"
4 
5 const uint64_t max_stride(150000000);
6 
11 {
12 public:
16  StrideTask(double rate, bool op) : operation(op)
17  {
18  set_rate(rate);
19  }
23  bool operator<(const StrideTask& rhs) const
24  {
25  return pass > rhs.pass;
26  }
27 
31  bool operator==(const StrideTask& rhs) const
32  {
33  return pass == rhs.pass;
34  }
35 
39  bool operator>(const StrideTask& rhs) const
40  {
41  return pass < rhs.pass;
42  }
43 
47  void operator++(void)
48  {
49  pass += stride;
50  }
51 
55  void set_rate(double rate)
56  {
57  tickets = (uint64_t)(1000000 * rate);
58 
59  if (tickets > 0)
61  else
62  stride = 1;
63 
64  pass = stride;
65  }
66 
70  uint64_t stride;
71 
75  uint64_t pass;
76 
80  uint64_t tickets;
81 
85  bool operation;
86 };
87 
89  const QoSTransportSettings* settings)
90  : settings_(settings),
91  sent_messages_(0),
92  dropped_messages_(0),
93  consecutive_drops_(0)
94 {
95 }
96 
98  : settings_(rhs.settings_),
99  sent_messages_(rhs.sent_messages_),
100  dropped_messages_(rhs.dropped_messages_),
101  consecutive_drops_(rhs.consecutive_drops_)
102 {
103 }
104 
106 
108 {
109  MADARA_GUARD_TYPE guard(mutex_);
110  if (this != &rhs)
111  {
112  settings_ = rhs.settings_;
113  sent_messages_ = rhs.sent_messages_;
114  dropped_messages_ = rhs.dropped_messages_;
115  consecutive_drops_ = rhs.consecutive_drops_;
116  }
117 }
118 
120 {
121  MADARA_GUARD_TYPE guard(mutex_);
122 
123  if (settings_)
124  {
128  double drop_rate = settings_->get_drop_rate();
129  int drop_type = settings_->get_drop_type();
130  uint64_t drop_burst = settings_->get_drop_burst();
131 
132  bool result = true;
133 
134  // if the user has specified a positive drop rate
135  if (drop_rate > 0)
136  {
142  if (drop_rate >= 100 || (consecutive_drops_ > 0 && drop_burst > 1 &&
143  consecutive_drops_ < drop_burst))
144  {
145  result = false;
146  }
147  else
148  {
149  if (drop_type == PACKET_DROP_PROBABLISTIC)
150  {
151  // if burst is greater than 1, then divide the rate by burst
152  if (drop_burst > 1)
153  drop_rate = drop_rate / (drop_burst - 1);
154 
155  // easiest drop rate policy. Just drop messages with a probability.
156  if (utility::rand_double() <= drop_rate)
157  {
158  result = false;
159  }
160  }
161  else
162  {
163  // Reset queue if this is the first time we have used this policy
164  if (queue_.empty())
165  {
166  reset();
167  }
168 
169  // remove current operation from the priority queue
170  StrideTask current = queue_.top();
171  queue_.pop();
172 
173  // set the result to the current operation
174  result = current.operation;
175 
176  // reinsert the operation into the queue
177  ++current;
178  queue_.push(current);
179  }
180  }
181  }
182 
183  if (result)
184  {
185  ++sent_messages_;
186  consecutive_drops_ = 0;
187  }
188  else
189  {
190  ++dropped_messages_;
191  ++consecutive_drops_;
192  }
193 
194  return result;
195  } // end if settings_
196  else
197  {
199  "PacketScheduler::add: ERROR: Invalid settings class\n");
200 
201  return true;
202  }
203 }
204 
206  const QoSTransportSettings* settings)
207 {
208  settings_ = settings;
209 }
210 
212 {
213  MADARA_GUARD_TYPE guard(mutex_);
214 
215  sent_messages_ = 0;
216  dropped_messages_ = 0;
217  consecutive_drops_ = 0;
218  while (!queue_.empty())
219  queue_.pop();
220 }
221 
223 {
224  MADARA_GUARD_TYPE guard(mutex_);
225 
226  if (settings_)
227  {
228  double drop_rate = settings_->get_drop_rate();
229  uint64_t burst = settings_->get_drop_burst();
230 
231  // if burst is greater than 1, then divide the rate by burst
232  if (burst > 1)
233  drop_rate = drop_rate / (burst - 1);
234 
235  StrideTask drop_message(drop_rate, false);
236  StrideTask send_message(1 - drop_rate, true);
237 
238  while (!queue_.empty())
239  queue_.pop();
240 
241  queue_.push(drop_message);
242  queue_.push(send_message);
243  }
244  else
245  {
247  "PacketScheduler::reset: ERROR: Invalid settings class\n");
248  }
249 }
250 
252  unsigned int log_level, const char* prefix)
253 {
254  MADARA_GUARD_TYPE guard(mutex_);
255 
257  "%s: %d sent, %d dropped, %d consec dropped\n", prefix, sent_messages_,
258  dropped_messages_, consecutive_drops_);
259 }
260 
262 {
263  MADARA_GUARD_TYPE guard(mutex_);
264  return dropped_messages_;
265 }
266 
268 {
269  MADARA_GUARD_TYPE guard(mutex_);
270  return sent_messages_;
271 }
#define madara_logger_ptr_log(loggering, level,...)
Fast version of the madara::logger::log method for Logger pointers.
Definition: Logger.h:41
const uint64_t max_stride(150000000)
INTERNAL USE: Task that can be added to a Stride scheduler
bool operation
the type of operation
uint64_t stride
the stride to take
void operator++(void)
Increases pass by stride.
uint64_t tickets
the number of tickets, which influences stride
bool operator<(const StrideTask &rhs) const
Checks for lower pass value.
uint64_t pass
the current pass which determines next schedule
StrideTask(double rate, bool op)
Constructor.
bool operator>(const StrideTask &rhs) const
Checks for greater pass value.
void set_rate(double rate)
Sets the ticket rate, which influences stride.
bool operator==(const StrideTask &rhs) const
Checks for pass equality.
Provides scheduler for dropping packets.
uint64_t consecutive_drops_
Consecutive dropped messages.
void reset(void)
Resets the packet scheduler to current settings.
void print_status(unsigned int log_level=0, const char *prefix="PacketScheduler")
Prints the number of status of the packet scheduler.
void clear(void)
Clears the packet scheduler.
uint64_t get_dropped(void)
Queries the monitor for the current dropped messages.
uint64_t dropped_messages_
Number of dropped messages.
PacketScheduler(const QoSTransportSettings *settings=0)
Default constructor.
void operator=(const PacketScheduler &rhs)
Assignment operator.
bool add(void)
Adds a message to the monitor.
uint64_t sent_messages_
Number of sent messages.
const QoSTransportSettings * settings_
Transport settings.
void attach(const QoSTransportSettings *settings)
Attaches settings.
uint64_t get_sent(void)
Queries the monitor for the current sent messages per second over the past window.
Container for quality-of-service settings.
T get(const KnowledgeRecord &kr)
Get the value of a KnowlegeRecord.
Definition: GetRecord.h:121
MADARA_EXPORT utility::Refcounter< logger::Logger > global_logger
double rand_double(double floor, double ceiling, bool set_seed_to_time)
Returns a random double between floor and ceiling.
Definition: Utility.cpp:520