MADARA  3.2.3
PacketScheduler.cpp
Go to the documentation of this file.
1 #include "PacketScheduler.h"
4 
5 
6 const uint64_t max_stride (150000000);
7 
12 {
13 public:
17  StrideTask (double rate, bool op)
18  : operation (op)
19  {
20  set_rate (rate);
21  }
25  bool operator< (const StrideTask & rhs) const
26  {
27  return pass > rhs.pass;
28  }
29 
33  bool operator== (const StrideTask & rhs) const
34  {
35  return pass == rhs.pass;
36  }
37 
41  bool operator> (const StrideTask & rhs) const
42  {
43  return pass < rhs.pass;
44  }
45 
49  void operator++ (void)
50  {
51  pass += stride;
52  }
53 
57  void set_rate (double rate)
58  {
59  tickets = (uint64_t)(1000000 * rate);
60 
61  if (tickets > 0)
63  else
64  stride = 1;
65 
66  pass = stride;
67  }
68 
72  uint64_t stride;
73 
77  uint64_t pass;
78 
82  uint64_t tickets;
83 
87  bool operation;
88 };
89 
91  const QoSTransportSettings * settings)
92  : settings_ (settings),
93  sent_messages_ (0),
94  dropped_messages_ (0),
95  consecutive_drops_ (0)
96 {
97 }
98 
100  const PacketScheduler & rhs)
101  : settings_ (rhs.settings_),
105 {
106 }
107 
109 {
110 }
111 
112 void
114  const PacketScheduler & rhs)
115 {
116  MADARA_GUARD_TYPE guard (mutex_);
117  if (this != &rhs)
118  {
119  settings_ = rhs.settings_;
123  }
124 }
125 
126 bool
128 {
129  MADARA_GUARD_TYPE guard (mutex_);
130 
131  if (settings_)
132  {
136  double drop_rate = settings_->get_drop_rate ();
137  int drop_type = settings_->get_drop_type ();
138  uint64_t drop_burst = settings_->get_drop_burst ();
139 
140  bool result = true;
141 
142  // if the user has specified a positive drop rate
143  if (drop_rate > 0)
144  {
150  if (drop_rate >= 100 ||
151  (consecutive_drops_ > 0 &&
152  drop_burst > 1 && consecutive_drops_ < drop_burst))
153  {
154  result = false;
155  }
156  else
157  {
158  if (drop_type == PACKET_DROP_PROBABLISTIC)
159  {
160  // if burst is greater than 1, then divide the rate by burst
161  if (drop_burst > 1)
162  drop_rate = drop_rate / (drop_burst - 1);
163 
164  // easiest drop rate policy. Just drop messages with a probability.
165  if (utility::rand_double () <= drop_rate)
166  {
167  result = false;
168  }
169  }
170  else
171  {
172  // Reset queue if this is the first time we have used this policy
173  if (queue_.empty ())
174  {
175  reset ();
176  }
177 
178  // remove current operation from the priority queue
179  StrideTask current = queue_.top ();
180  queue_.pop ();
181 
182  // set the result to the current operation
183  result = current.operation;
184 
185  // reinsert the operation into the queue
186  ++current;
187  queue_.push (current);
188  }
189  }
190  }
191 
192  if (result)
193  {
194  ++sent_messages_;
195  consecutive_drops_ = 0;
196  }
197  else
198  {
201  }
202 
203  return result;
204  } // end if settings_
205  else
206  {
208  "PacketScheduler::add: ERROR: Invalid settings class\n");
209 
210  return true;
211  }
212 }
213 
214 void
216  const QoSTransportSettings * settings)
217 {
218  settings_ = settings;
219 }
220 
221 void
223 {
224  MADARA_GUARD_TYPE guard (mutex_);
225 
226  sent_messages_ = 0;
227  dropped_messages_ = 0;
228  consecutive_drops_ = 0;
229  while (!queue_.empty ())
230  queue_.pop ();
231 }
232 
233 void
235 {
236  MADARA_GUARD_TYPE guard (mutex_);
237 
238  if (settings_)
239  {
240  double drop_rate = settings_->get_drop_rate ();
241  uint64_t burst = settings_->get_drop_burst ();
242 
243  // if burst is greater than 1, then divide the rate by burst
244  if (burst > 1)
245  drop_rate = drop_rate / (burst - 1);
246 
247  StrideTask drop_message (drop_rate, false);
248  StrideTask send_message (1 - drop_rate, true);
249 
250  while (!queue_.empty ())
251  queue_.pop ();
252 
253  queue_.push (drop_message);
254  queue_.push (send_message);
255  }
256  else
257  {
259  "PacketScheduler::reset: ERROR: Invalid settings class\n");
260  }
261 }
262 
263 void
265  unsigned int log_level,
266  const char * prefix)
267 {
268  MADARA_GUARD_TYPE guard (mutex_);
269 
270  madara_logger_ptr_log (logger::global_logger.get(), (int)log_level,
271  "%s: %d sent, %d dropped, %d consec dropped\n",
273 }
274 
275 uint64_t
277 {
278  MADARA_GUARD_TYPE guard (mutex_);
279  return dropped_messages_;
280 }
281 
282 uint64_t
284 {
285  MADARA_GUARD_TYPE guard (mutex_);
286  return sent_messages_;
287 }
std::priority_queue< StrideTask, std::vector< StrideTask > > queue_
queue used by stride scheduling algorithm
void operator=(const PacketScheduler &rhs)
Assignment operator.
void print_status(unsigned int log_level=0, const char *prefix="PacketScheduler")
Prints the number of status of the packet scheduler.
const uint64_t max_stride(150000000)
int get_drop_type(void) const
Returns the policy type for packet drops.
uint64_t consecutive_drops_
Consecutive dropped messages.
bool operation
the type of operation
bool operator<(const StrideTask &rhs) const
Checks for lower pass value.
Provides scheduler for dropping packets.
MADARA_EXPORT utility::Refcounter< logger::Logger > global_logger
bool operator>(const StrideTask &rhs) const
Checks for greater pass value.
PacketScheduler(const QoSTransportSettings *settings=0)
Default constructor.
bool add(void)
Adds a message to the monitor.
void set_rate(double rate)
Sets the ticket rate, which influences stride.
uint64_t tickets
the number of tickets, which influences stride
uint64_t get_drop_burst(void) const
Returns the bursts of packet drops.
#define madara_logger_ptr_log(logger, level,...)
Fast version of the madara::logger::log method for Logger pointers.
Definition: Logger.h:32
uint64_t stride
the stride to take
const QoSTransportSettings * settings_
Transport settings.
void clear(void)
Clears the packet scheduler.
bool operator==(const StrideTask &rhs) const
Checks for pass equality.
void attach(const QoSTransportSettings *settings)
Attaches settings.
virtual ~PacketScheduler()
Destructor.
uint64_t dropped_messages_
Number of dropped messages.
uint64_t get_dropped(void)
Queries the monitor for the current dropped messages.
MADARA_LOCK_TYPE mutex_
Mutex for supporting multithreaded monitor calls.
double rand_double(double floor, double ceiling, bool set_seed_to_time)
Returns a random double between floor and ceiling.
Definition: Utility.cpp:580
Container for quality-of-service settings.
void reset(void)
Resets the packet scheduler to current settings.
StrideTask(double rate, bool op)
Constructor.
uint64_t pass
the current pass which determines next schedule
double get_drop_rate(void) const
Returns the percentage of dropped packets to enforce on sends.
INTERNAL USE: Task that can be added to a Stride scheduler
void operator++(void)
Increases pass by stride.
uint64_t sent_messages_
Number of sent messages.
uint64_t get_sent(void)
Queries the monitor for the current sent messages per second over the past window.