feisty meow concerns codebase  2.140
test_bin_threaded.cpp
Go to the documentation of this file.
1 /*****************************************************************************\
2 * *
3 * Name : test_entity_data_bin_threaded *
4 * Author : Chris Koeritz *
5 * *
6 *******************************************************************************
7 * Copyright (c) 2010-$now By Author. This program is free software; you can *
8 * redistribute it and/or modify it under the terms of the GNU General Public *
9 * License as published by the Free Software Foundation; either version 2 of *
10 * the License or (at your option) any later version. This is online at: *
11 * http://www.fsf.org/copyleft/gpl.html *
12 * Please send any updates to: fred@gruntose.com *
13 \*****************************************************************************/
14 
17 #include <basis/astring.h>
18 #include <basis/byte_array.h>
19 #include <basis/functions.h>
20 #include <basis/guards.h>
21 #include <basis/mutex.h>
23 #include <loggers/console_logger.h>
24 #include <mathematics/chaos.h>
26 #include <octopus/entity_defs.h>
28 #include <processes/ethread.h>
29 #include <processes/safe_roller.h>
30 #include <structures/amorph.h>
34 #include <timely/time_control.h>
35 #include <timely/time_stamp.h>
36 #include <unit_test/unit_base.h>
37 
38 #include <stdio.h>
39 
40 #ifdef __WIN32__
41  #include <process.h>
42 #endif
43 
44 using namespace application;
45 using namespace basis;
46 using namespace loggers;
47 using namespace mathematics;
48 using namespace octopi;
49 using namespace processes;
50 using namespace structures;
51 using namespace textual;
52 using namespace timely;
53 using namespace unit_test;
54 
56 
57 // synchronization for logged messages to avoid overwriting on the console.
58 SAFE_STATIC(mutex, __loggers_lock, )
59 
60 // our macros for logging (with or without a timestamp).
61 #define LOG(s) { \
62  auto_synchronizer critical_section(__loggers_lock()); \
63  CLASS_EMERGENCY_LOG(program_wide_logger::get(), astring(s)); \
64 }
65 
67 
68 // the base log feature just prints the text to the console with no carriage return or extra flair.
69 // it does count up how many characters have been printed though, and does an EOL when it seems like it would be reasonable (80 chars-ish).
70 // note that this code makes no attempt to worry about any other printing that's happening; it has a very egocentric view of what's on the
71 // terminal so far.
72 static int chars_printed = 0;
73 const int MAXIMUM_CHARS_PER_LINE = 108;
75 #define BASE_LOG(s) { \
76  auto_synchronizer critical_section(__loggers_lock()); \
77  /* we set the eol every time, since console_logger constructor doesn't currently provide. */ \
78  ted().eol(parser_bits::NO_ENDING); \
79  astring joe(s); \
80  int len = joe.length(); \
81  /* naive check for line length. */ \
82  if (chars_printed + len > MAXIMUM_CHARS_PER_LINE) { \
83  ted().log(astring("\n"), basis::ALWAYS_PRINT); \
84  chars_printed = 0; \
85  } \
86  chars_printed += len; \
87  ted().log(joe, basis::ALWAYS_PRINT); \
88 }
89 //hmmm: may want to make the line size selectable, if we keep some version of the above line handling code around.
90 
92 
93 // protects our logging stream really, by keeping all the threads chomping at the bit rather
94 // than running right away. when this flag switches to true, then *bam* they're off.
95 class bool_scared_ya : public root_object {
96 public:
97  bool_scared_ya(bool init = false) : _value(init) {}
98  bool_scared_ya &operator = (const bool_scared_ya &s1) { _value = s1._value; return *this; }
99  bool_scared_ya &operator = (bool s1) { _value = s1; return *this; }
100  virtual ~bool_scared_ya() {}
101  operator bool() { return _value; }
102  DEFINE_CLASS_NAME("bool_scared_ya");
103 private:
104  bool _value;
105 };
106 SAFE_STATIC(bool_scared_ya, __threads_can_run_wild_and_free, (false));
107 
109 
110 // global constants...
111 
112 //const int DEFAULT_RUN_TIME = 80 * MINUTE_ms;
113 //const int DEFAULT_RUN_TIME = 2 * MINUTE_ms;
114 //const int DEFAULT_RUN_TIME = 28 * SECOND_ms;
115 const int DEFAULT_RUN_TIME = 4 * SECOND_ms;
116  // the length of time to run the program.
117 
118 // how much data is the entity data bin allowed to hold at one time.
120 //tiny limit to test having too much data.
121 
122 // controls the timing of the thread that adds items.
124 const int MAX_ADDER_THREAD_PAUSE = 20;
125 
126 // controls the timing of the item deleting thread.
127 // we currently have this biased to be slower than the adder, so things accumulate.
130 
131 // bound the randomly chosen pause time for the cleanup thread.
132 const int MIN_TIDIER_THREAD_PAUSE = 60;
133 const int MAX_TIDIER_THREAD_PAUSE = 500;
134 
135 // monk is kept asleep most of the time or he'd be trashing all our data too frequently.
138 
139 // the range of new items added whenever the creator or destroyer threads are hit.
140 const int MINIMUM_ITEMS_HANDLED = 1;
141 const int MAXIMUM_ITEMS_HANDLED = 20;
142 
143 const int DEFAULT_THREADS = 90;
144  // the number of threads we create by default.
145 
146 const int DATA_DECAY_TIME = 1 * MINUTE_ms;
147  // how long we retain unclaimed data.
148 
150  // a very short duration for data to live.
151 
153 
154 // global objects...
155 
156 SAFE_STATIC(chaos, _rando, );
158 /* replaces app_shell version with local static randomizer. */
159 #define randomizer() _rando()
160 
162 
164 
166 {
167  // test the basic filling of the values in an entity.
168  octopus_request_id req_id;
169  if (randomizer().inclusive(1, 100) < 25) {
170  // some of the time we make a totally random entity id.
171  int sequencer = randomizer().inclusive(1, MAXINT32 - 10);
172  int add_in = randomizer().inclusive(0, MAXINT32 - 10);
173  int process_id = randomizer().inclusive(0, MAXINT32 - 10);
174  req_id._entity = octopus_entity(string_manipulation::make_random_name(),
175  process_id, sequencer, add_in);
176  } else {
177  // sometimes we use a less random identity.
178  int sequencer = randomizer().inclusive(1, 3);
179  int add_in = 12;
180  int process_id = randomizer().inclusive(1, 4);
181  req_id._entity = octopus_entity("boringentity",
182  process_id, sequencer, add_in);
183  }
184  req_id._request_num = randomizer().inclusive(1, MAXINT32 - 10);
185  return req_id;
186 }
187 
189 
190 // this thread creates new items for the entity data bin.
191 // also known as the adder.
192 class ballot_box_stuffer : public ethread
193 {
194 public:
195  ballot_box_stuffer() : ethread(MIN_ADDER_THREAD_PAUSE, ethread::TIGHT_INTERVAL) {
196  FUNCDEF("constructor");
197  LOG(">> new creator >>");
198  }
199 
200  virtual ~ballot_box_stuffer() {
201  FUNCDEF("destructor");
202  LOG("<< creator exits <<");
203  }
204 
205  DEFINE_CLASS_NAME("ballot_box_stuffer");
206 
207  void perform_activity(void *formal(data)) {
208  FUNCDEF("perform_activity");
209  if (!__threads_can_run_wild_and_free()) { sleep_time(MIN_ADDER_THREAD_PAUSE); return; }
210  // add a new item to the cache.
211  int how_many = randomizer().inclusive(MINIMUM_ITEMS_HANDLED,
213  for (int i = 0; i < how_many; i++) {
214  string_array random_strings;
215  int string_count = randomizer().inclusive(1, 10);
216  // we create a random classifier, just to use up some space.
217  for (int q = 0; q < string_count; q++) {
218  random_strings += string_manipulation::make_random_name();
219  }
220  // check exit sentry again just before adding.
221  if (!__threads_can_run_wild_and_free()) { sleep_time(MIN_ADDER_THREAD_PAUSE); return; }
223  random_strings);
224  binger.add_item(newbert, create_request_id());
225  BASE_LOG("+");
226  }
227 
228  // snooze.
229  int sleepy_time = randomizer().inclusive(MIN_ADDER_THREAD_PAUSE,
231  time_control::sleep_ms(sleepy_time);
232  // reset the thread's snooze timing.
233  ethread::sleep_time(sleepy_time);
234  }
235 };
236 
238 
239 // this thread eliminates entries in the ballot box.
240 // also known as the whacker.
241 class vote_destroyer : public ethread
242 {
243 public:
244  vote_destroyer() : ethread(MIN_WHACKER_THREAD_PAUSE, ethread::TIGHT_INTERVAL) {
245  FUNCDEF("constructor");
246  LOG(">> new destroyer >>");
247  }
248 
249  virtual ~vote_destroyer() {
250  FUNCDEF("destructor");
251  LOG("<< destroyer exits <<");
252  }
253 
254  DEFINE_CLASS_NAME("vote_destroyer");
255 
256  void perform_activity(void *formal(data)) {
257  FUNCDEF("perform_activity");
258  if (!__threads_can_run_wild_and_free()) { sleep_time(MIN_WHACKER_THREAD_PAUSE); return; }
259  int how_many = randomizer().inclusive(MINIMUM_ITEMS_HANDLED,
261  for (int i = 0; i < how_many; i++) {
262  // check exit sentry again just before removing.
263  if (!__threads_can_run_wild_and_free()) { sleep_time(MIN_WHACKER_THREAD_PAUSE); return; }
264  // snag any old item and drop it on the floor.
266  infoton *found = binger.acquire_for_any(id);
267  if (!found) break; // nothing to whack there.
268  WHACK(found);
269  BASE_LOG("-");
270  }
271  // snooze.
272  int sleepy_time = randomizer().inclusive(MIN_WHACKER_THREAD_PAUSE,
274  time_control::sleep_ms(sleepy_time);
275  // re-schedule the thread.
276  ethread::sleep_time(sleepy_time);
277  }
278 };
279 
281 
282 // this class makes sure the deadwood is cleaned out of the entity bin.
283 class obsessive_compulsive : public ethread
284 {
285 public:
286  obsessive_compulsive() : ethread(MIN_TIDIER_THREAD_PAUSE, ethread::TIGHT_INTERVAL) {
287  FUNCDEF("constructor");
288  LOG(">> new cleaner >>");
289  }
290 
291  virtual ~obsessive_compulsive() {
292  FUNCDEF("destructor");
293  LOG("<< cleaner exits <<");
294  }
295 
296  DEFINE_CLASS_NAME("obsessive_compulsive");
297 
298  void perform_activity(void *formal(data)) {
299  FUNCDEF("perform_activity");
300  if (!__threads_can_run_wild_and_free()) { sleep_time(MIN_TIDIER_THREAD_PAUSE); return; }
301  // make sure there's nothing rotting too long.
303  // snooze.
304  int sleepy_time = randomizer().inclusive(MIN_TIDIER_THREAD_PAUSE,
306  time_control::sleep_ms(sleepy_time);
307  ethread::sleep_time(sleepy_time);
308  }
309 };
310 
312 
313 // this thread will destroy all data in the bins while cleaning furiously.
314 class monk_the_detective : public ethread
315 {
316 public:
317  monk_the_detective() : ethread(MIN_MONK_THREAD_PAUSE, ethread::TIGHT_INTERVAL) {
318  FUNCDEF("constructor");
319  LOG(">> new monk >>");
320  }
321 
322  virtual ~monk_the_detective() {
323  FUNCDEF("destructor");
324  LOG("<< monk exits <<");
325  }
326 
327  DEFINE_CLASS_NAME("monk_the_detective");
328 
329  void perform_activity(void *formal(data)) {
330  FUNCDEF("perform_activity");
331  if (!__threads_can_run_wild_and_free() || !_hit_first_activation) {
332  _hit_first_activation = true; // we've seen an activation now for sure.
333  sleep_time(MIN_MONK_THREAD_PAUSE);
334  return;
335  } else {
336  // still record that we've seen an activation, so we're not gated by the wild & free sentinel for correctness.
337  _hit_first_activation = true;
338  }
339 
340  // one activation of monk has devastating consequences. we empty out
341  // the data one item at a time until we see no data at all. after
342  // cleaning each item, we ensure that the deadwood is cleaned out.
343  auto_synchronizer l(binger.locker());
344 LOG(a_sprintf("monk sees %d items and will clean them all.", binger.items_held()));
345  int check_count = 0;
346  while (binger.items_held()) {
347  // check exit sentry again just before obsessing over cleanliness.
348  if (!__threads_can_run_wild_and_free()) { sleep_time(MIN_MONK_THREAD_PAUSE); return; }
349  // grab one instance of any item in the bin.
351  infoton *found = binger.acquire_for_any(id);
352  if (!found) break; // nothing to see here.
353  check_count++;
354  WHACK(found);
355  BASE_LOG("-");
356  // also clean out things a lot faster than normal.
358 //hmmm: interesting--deadwood cleaning above will possibly cause us not to clean out the number of items reported above for items_held().
359  }
360 LOG(a_sprintf("monk manually cleaned %d items very carefully...", check_count));
361 LOG(a_sprintf("after this little light cleaning, monk sees %d items held.", binger.items_held()));
362  // snooze.
363  int sleepy_time = randomizer().inclusive(MIN_MONK_THREAD_PAUSE, MAX_MONK_THREAD_PAUSE);
364  // reschedule the thread for the new snooze. and note how we are not actually stuck waiting
365  // for the whole sleep time, given how ethread works with timed threads. an old implementation
366  // actually slept uninterruptably for the whole snooze time, which was really off-putting and
367  // rude.
368  ethread::sleep_time(sleepy_time);
369  }
370 
371 private:
372  bool _hit_first_activation;
373  /* if this is true, then we can start running normally. we don't fire on the first activation of the thread,
374  because we want monk to only start after its minimum pause period, rather than getting cranked up right away.
375  there is nothing for him to do at program inception anyhow. */
376 };
377 
379 
380 class test_entity_data_bin_threaded : public application_shell
381 {
382 public:
383  test_entity_data_bin_threaded() : application_shell() {}
384 
385  DEFINE_CLASS_NAME("test_entity_data_bin_threaded");
386 
387  int execute();
388 };
389 
390 int test_entity_data_bin_threaded::execute()
391 {
392  FUNCDEF("execute");
393 
394  int duration = DEFAULT_RUN_TIME;
395  if (application::_global_argc >= 2) {
396  astring duration_string = application::_global_argv[1];
397  if (duration_string.length()) {
398  duration = duration_string.convert(DEFAULT_RUN_TIME);
399  LOG(a_sprintf("user specified runtime duration of %d seconds.", duration));
400  // convert from seconds to milliseconds.
401  duration *= SECOND_ms;
402  }
403  }
404 
405  /* we could use a thread_cabinet here, but it's kind of funny that we're using a bare amorph and
406  that its built-in reset() method is shutting down all the threads for us. so leaving this as
407  a nice example. */
408  amorph<ethread> thread_list;
409 
410  for (int i = 0; i < DEFAULT_THREADS; i++) {
411  ethread *t = NULL_POINTER;
412  if (i == DEFAULT_THREADS - 1) {
413  // last item gets special treatment; we reserve this space for monk.
414  t = new monk_the_detective;
415  } else if (i % 3 == 0) {
416  t = new ballot_box_stuffer;
417  } else if (i % 3 == 1) {
418  t = new vote_destroyer;
419  } else { // i % 3 must = 2.
420  t = new obsessive_compulsive;
421  }
422  thread_list.append(t);
423  ethread *q = thread_list[thread_list.elements() - 1];
424  if (q != t)
425  deadly_error(class_name(), func, "amorph has incorrect pointer!");
426  // start the thread we added.
427  thread_list[thread_list.elements() - 1]->start(NULL_POINTER);
428  }
429 
430  // set our sentinel variable to allow the threads to run now.
431  __threads_can_run_wild_and_free() = true;
432 
433  time_stamp when_to_leave(duration);
434  while (when_to_leave > time_stamp()) {
435  time_control::sleep_ms(20);
436  }
437 
438  __threads_can_run_wild_and_free() = false;
439 
440  /* we cancel all the threads first. this gives them an opportunity to know
441  they should shut down, and they will all go about that at their own rate. if we
442  just killed the list with reset first, then the amorph would dutifully shut the
443  threads down also, but it would do them sequentially which is way slower. */
444  LOG("now cancelling all threads...");
445  for (int j = 0; j < thread_list.elements(); j++) { thread_list[j]->cancel(); }
446  LOG("now resetting thread list...");
447  thread_list.reset(); // should whack all threads.
448  LOG("...done exiting from all threads.");
449 
450 //report the results:
451 // how many objects created.
452 // how many got destroyed.
453 // how many evaporated due to timeout.
454 
455  critical_events::alert_message(astring(class_name()) + ":: works for all functions tested.");
456  return 0;
457 }
458 
460 
461 HOOPLE_MAIN(test_entity_data_bin_threaded, )
462 
The application_shell is a base object for console programs.
a_sprintf is a specialization of astring that provides printf style support.
Definition: astring.h:440
Provides a dynamically resizable ASCII character string.
Definition: astring.h:35
int convert(int default_value) const
Converts the string into a corresponding integer.
Definition: astring.cpp:760
int length() const
Returns the current length of the string.
Definition: astring.cpp:132
auto_synchronizer simplifies concurrent code by automatically unlocking.
Definition: mutex.h:113
a platform-independent way to acquire random numbers in a specific range.
Definition: chaos.h:51
Stores a set of infotons grouped by the entity that owns them.
bool add_item(infoton *to_add, const octopus_request_id &id)
infoton * acquire_for_any(octopus_request_id &id)
void clean_out_deadwood(int decay_interval=4 *basis::MINUTE_ms)
An infoton is an individual request parcel with accompanying information.
Definition: infoton.h:32
Provides a way of identifying users of an octopus object.
Definition: entity_defs.h:35
Identifies requests made on an octopus by users.
Definition: entity_defs.h:114
int _request_num
the item number from the entity.
Definition: entity_defs.h:117
octopus_entity _entity
the entity.
Definition: entity_defs.h:116
Informs the caller that a request type was unknown to the server octopus.
Provides a platform-independent object for adding threads to a program.
Definition: ethread.h:36
int elements() const
the maximum number of elements currently allowed in this amorph.
Definition: amorph.h:66
basis::outcome append(const contents *data)
puts "data" on the end of this amorph.
Definition: amorph.h:303
void reset()
cleans out all of the contents.
Definition: amorph.h:81
An array of strings with some additional helpful methods.
Definition: string_array.h:32
Represents a point in time relative to the operating system startup time.
Definition: time_stamp.h:38
#define deadly_error(c, f, i)
#define formal(parameter)
This macro just eats what it's passed; it marks unused formal parameters.
Definition: definitions.h:48
#define NULL_POINTER
The value representing a pointer to nothing.
Definition: definitions.h:32
#define MAXINT32
Maximum 32-bit integer value.
Definition: definitions.h:75
#define DEFINE_CLASS_NAME(objname)
Defines the name of a class by providing a couple standard methods.
Definition: enhance_cpp.h:42
#define FUNCDEF(func_in)
FUNCDEF sets the name of a function (and plugs it into the callstack).
Definition: enhance_cpp.h:54
Provides macros that implement the 'main' program of an application.
#define HOOPLE_MAIN(obj_name, obj_args)
options that should work for most unix and linux apps.
Definition: hoople_main.h:61
Implements an application lock to ensure only one is running at once.
char ** _global_argv
The guards collection helps in testing preconditions and reporting errors.
Definition: array.h:30
void WHACK(contents *&ptr)
deletion with clearing of the pointer.
Definition: functions.h:121
const int SECOND_ms
Number of milliseconds in a second.
Definition: definitions.h:120
const int MINUTE_ms
Number of milliseconds in a minute.
Definition: definitions.h:121
const int KILOBYTE
Number of bytes in a kilobyte.
Definition: definitions.h:134
A logger that sends to the console screen using the standard output device.
An extension to floating point primitives providing approximate equality.
Definition: averager.h:21
A dynamic container class that holds any kind of object via pointers.
Definition: amorph.h:55
#include <time.h>
Definition: earth_time.cpp:37
Useful support functions for unit testing, especially within hoople.
Definition: unit_base.cpp:35
octopus_request_id create_request_id()
const int MIN_ADDER_THREAD_PAUSE
entity_data_bin binger(MAXIMUM_DATA_PER_ENTITY)
const int MINIMUM_ITEMS_HANDLED
const int MAX_MONK_THREAD_PAUSE
const int MAX_TIDIER_THREAD_PAUSE
#define LOG(s)
const int DATA_DECAY_TIME
const int DEFAULT_RUN_TIME
SAFE_STATIC(console_logger, ted,)
const int MIN_WHACKER_THREAD_PAUSE
const int MAXIMUM_ITEMS_HANDLED
const int DEFAULT_THREADS
#define randomizer()
const int MAXIMUM_CHARS_PER_LINE
const int MAX_WHACKER_THREAD_PAUSE
const int MIN_MONK_THREAD_PAUSE
#define BASE_LOG(s)
const int MAX_ADDER_THREAD_PAUSE
const int MIN_TIDIER_THREAD_PAUSE
const int MONKS_CLEANING_TIME
const int MAXIMUM_DATA_PER_ENTITY