feisty meow concerns codebase  2.140
test_cromp_client.cpp
Go to the documentation of this file.
1 /*****************************************************************************\
2 * *
3 * Name : test_cromp_client *
4 * Author : Chris Koeritz *
5 * *
6 *******************************************************************************
7 * Copyright (c) 2002-$now By Author. This program is free software; you can *
8 * redistribute it and/or modify it under the terms of the GNU General Public *
9 * License as published by the Free Software Foundation; either version 2 of *
10 * the License or (at your option) any later version. This is online at: *
11 * http://www.fsf.org/copyleft/gpl.html *
12 * Please send any updates to: fred@gruntose.com *
13 \*****************************************************************************/
14 
15 #include "crompish_pax.h"
16 
17 #include <mathematics/chaos.h>
18 #include <basis/astring.h>
19 
23 #include <cromp/cromp_client.h>
24 #include <filesystem/filename.h>
25 #include <loggers/console_logger.h>
26 #include <loggers/file_logger.h>
29 #include <octopus/entity_defs.h>
30 #include <octopus/infoton.h>
31 #include <processes/ethread.h>
33 #include <processes/rendezvous.h>
37 #include <structures/set.h>
38 #include <timely/time_control.h>
39 #include <unit_test/unit_base.h>
40 
41 #include <stdlib.h>
42 
43 using namespace application;
44 using namespace basis;
45 using namespace configuration;
46 using namespace cromp;
47 using namespace mathematics;
48 using namespace filesystem;
49 using namespace loggers;
50 using namespace octopi;
51 using namespace processes;
52 using namespace sockets;
53 using namespace structures;
54 using namespace textual;
55 using namespace timely;
56 using namespace unit_test;
57 
58 #undef LOG
59 #define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger::get(), astring(s))
60 
61 #define DEBUG_TESTER
62  // uncomment for noisier version.
63 
64 // the number of transactions to send during a test. if timing connection
65 // duration, then use a maximum of 1. if timing speed of operation once
66 // connected, use a large number.
67 //const int MAXIMUM_SENDS = 10008;
68 const int MAXIMUM_SENDS = 100008;
69 //const int MAXIMUM_SENDS = 10000008;
70  // have had success with up to 10000000 sends using small data segments.
71 
72 const int NUMBER_OF_THREADS = 1;
73 //const int NUMBER_OF_THREADS = 10;
74 //const int NUMBER_OF_THREADS = 20;
75  // the number of simultaneous actors on the single cromp_client.
76 
77 //const int GRABBER_THREADS = 5;
78 const int GRABBER_THREADS = 0;
79  // the number of threads that just pluck at the cromp_client trying to
80  // interfere with the testing threads.
81 
82 //const int MAX_SEND_TRIES = 0; // don't pause.
83 const int MAX_SEND_TRIES = 1; // try to get stuff out but don't wait long.
84 //const int MAX_SEND_TRIES = 5; // wait a reasonable amount of times to send.
85 //const int MAX_SEND_TRIES = 10000; // force it to get out, hopefully.
86  // the number of times we try to push the sends out. zero means never
87  // try to push anything, just add it to the buffer. 1 or more is that
88  // many tries to push the send.
89 
90 //const int CHECKPOINT_SIZE = 1000;
91 //const int CHECKPOINT_SIZE = 100;
92 const int CHECKPOINT_SIZE = 100;
93  // prints a counter out when we reach a multiple of this many sends.
94 
95 //const int DATA_SEGMENT_SIZE = 0;
96 const int DATA_SEGMENT_SIZE = 64;
97 //const int DATA_SEGMENT_SIZE = 128 * KILOBYTE;
98 //const int DATA_SEGMENT_SIZE = 1 * MEGABYTE;
99  // the chunk size that we attach.
100 
101 const int REPORTING_INTERVAL = 10 * SECOND_ms;
102  // this is the period between reports on how the test is going.
103 
104 //***this is where we are in testing the faux dual cpu problem.
105 //***the longer delay shows the problem more easily. the shorter delay
106 //***is being used for a long test.
108 //const int MAXIMUM_ACQUISITION_DELAY = int(0.5 * SECOND_ms);
109  // the longest we'll snooze off waiting for pending receptions to occur.
110 
111 //const int MAXIMUM_PENDING_REQUESTS = 1;
112 const int MAXIMUM_PENDING_REQUESTS = 108;
113  // this is a threshold for the number of requests; once hit, we start
114  // awaiting the responses.
115 
116 //const int PENDING_REQUESTS_FORCED = 3;
117 //const int PENDING_REQUESTS_FORCED = 80;
119  // when we've been forced to gather some pending responses to previous
120  // requests, this is how many we'll try to get at once. numbers closer
121  // to the MAXIMUM_PENDING_REQUESTS will force more synchrony.
122 
123 const int CHANCE_OF_RECONSTRUCT = 14;
124  // how frequently a bus reconstruction occurs, in 1000.
125 
126 class cromp_client_tester : virtual public unit_base, virtual public application_shell
127 {
128 public:
129  cromp_client_tester();
130  ~cromp_client_tester();
131 
132  virtual int execute();
133 
134  DEFINE_CLASS_NAME("cromp_client_tester");
135 
136  void bite_server(structures::set<octopus_request_id> &ids,
137  structures::set<octopus_request_id> &delinquents, void *originator);
138  // performs the big chunk of testing. the "ids" are the history of the
139  // sends that were made and they're managed by this method. the
140  // "originator" is a tag we can use to generate unique print outs.
141 
142  int print_instructions();
144 
145  void grab_items();
147 
148  void cause_object_reconstruction();
150 
151  void increment_thread_count() {
152  FUNCDEF("increment_thread_count");
153  auto_synchronizer l(*_lock);
154  _threads_active++;
155 //LOG(a_sprintf("count now %d", _threads_active));
156  }
157 
158  void decrement_thread_count() {
159  FUNCDEF("decrement_thread_count");
160  auto_synchronizer l(*_lock);
161  _threads_active--;
162 //LOG(a_sprintf("count now %d", _threads_active));
163  }
164 
165  void report(const time_stamp &start_time, double bytes_transmitted,
166  double conversations);
167  // describes how the test is going.
168 
169 private:
170  cromp_client *_uplink; // provides the connection and transmission services.
171 
172  mutex *_lock; // protects the objects below.
173  int _threads_active; // the number of transmitter threads running.
174  time_stamp _last_report; // when we last reported on progress.
175  double _finished_loops; // counts number of loops we've achieved.
176  bool _encryption; // true if we're encrypting.
177  int _send_count;
178  int _thread_count;
179  int _grabber_count;
180  int _send_tries;
181  int _checkpoint_count;
182  int _dataseg_size;
183  int _report_interval;
184  int _snooze_duration;
185  bool _rpc_style;
186  bool _reconstruct_object;
187  internet_address _server_loc;
188 
189  void look_for_receipts(int count, structures::set<octopus_request_id> &ids,
190  structures::set<octopus_request_id> &delinquents, bool wait = false);
191  // attempts to get "count" items from the list of "ids".
192 };
193 
195 
196 class bitey_thread : public ethread
197 {
198 public:
199  bitey_thread(cromp_client_tester &parent)
200  : ethread(), _parent(parent) {}
201 
202  void perform_activity(void *formal(ptr)) {
203  FUNCDEF("perform_activity");
204  _parent.increment_thread_count();
205  _parent.bite_server(_ids, _delinquents, this);
206  _parent.decrement_thread_count();
207  }
208 
209 private:
210  cromp_client_tester &_parent;
211  structures::set<octopus_request_id> _ids; // the ids for commands we've sent.
212  structures::set<octopus_request_id> _delinquents; // missing ids during rcv.
213 };
214 
216 
217 //hmmm: next stop; inject the types of items they're expecting in grab_items.
218 
219 class grabby_thread : public ethread
220 {
221 public:
222  grabby_thread(cromp_client_tester &parent)
223  : ethread(), _parent(parent) {}
224 
225  void perform_activity(void *formal(ptr)) {
226  while (!should_stop()) {
227  _parent.grab_items();
228  if (_rando.inclusive(0, 100) > 10)
229  time_control::sleep_ms(_rando.inclusive(5, 38));
230  }
231  }
232 
233 private:
234  cromp_client_tester &_parent;
235  chaos _rando;
236 };
237 
239 
240 cromp_client_tester::cromp_client_tester()
242 //"cromp_client_tester"),
243  _uplink(NULL_POINTER),
244  _lock(new mutex),
245  _threads_active(0),
246  _finished_loops(0.0),
247  _encryption(false),
248  _send_count(0),
249  _thread_count(0),
250  _grabber_count(0),
251  _send_tries(0),
252  _checkpoint_count(0),
253  _dataseg_size(0),
254  _report_interval(0),
255  _snooze_duration(0),
256  _rpc_style(false),
257  _reconstruct_object(false),
258  _server_loc()
259 {
260  FUNCDEF("constructor");
261  LOG("");
262  LOG("");
263 
265 //LOG(a_sprintf("argc is %d and first is %s", application::_global_argc, application::_global_argv[0]));
266 
267  int indy = 0;
268  if (args.find("help", indy, false)
269  || (args.find("?", indy, false))
270  || (args.find('?', indy, false)) ) {
272  exit(0);
273  }
274 
275  // check for a port on the command line.
276  astring port_text;
277  int port = 5678;
278  if (args.get_value("port", port_text, false)) {
279  LOG(astring("using port: ") + port_text);
280  port = port_text.convert(5678);
281  }
282  _server_loc.port = port;
283 
284 //hmmm:normalize host so this can take either name or IP.
285 
286  indy = 0;
287  if (args.find("encrypt", indy, false) || (args.find('e', indy, true)) ) {
288  // they're saying that we should encrypt the communication.
289  LOG("turning on encryption.");
290  _encryption = true;
291  }
292 
293  indy = 0;
294  if (args.find("rpc", indy, false) || (args.find('R', indy, true)) ) {
295  // this is telling us to turn on RPC mode. we will make each request and
296  // reply pair synchronous, i.e., each reply will be awaited for when a
297  // request has been made.
298  LOG("turning on RPC style requests.");
299  _rpc_style = true;
300  }
301 
302  // check for a hostname on the command line.
303  astring hostname("local");
304  astring host_temp;
305  if (args.get_value("host", host_temp, false)) {
306  LOG(astring("using host: ") + host_temp);
307  hostname = host_temp;
308  }
309 LOG(astring("using host: ") + hostname);
310  strcpy(_server_loc.hostname, hostname.s());
311 
312  astring send_temp;
313  int send_count = MAXIMUM_SENDS;
314  if (args.get_value("sends", send_temp, false)) {
315  LOG(astring("using send count: ") + send_temp);
316  send_count = send_temp.convert(send_count);
317  if (send_count <= 0) send_count = 1;
318  }
319  _send_count = send_count;
320 
321  astring thread_temp;
322  int thread_count = NUMBER_OF_THREADS;
323  if (args.get_value("threads", thread_temp, false)) {
324  LOG(astring("using thread count: ") + thread_temp);
325  thread_count = thread_temp.convert(thread_count);
326  if (thread_count <= 0) thread_count = 1;
327  }
328  _thread_count = thread_count;
329 
330  astring grabber_temp;
331  int grabber_count = GRABBER_THREADS;
332  if (args.get_value("grab", grabber_temp, false)) {
333  LOG(astring("using grabber count: ") + grabber_temp);
334  grabber_count = grabber_temp.convert(grabber_count);
335  if (grabber_count < 0) grabber_count = 0;
336  }
337  _grabber_count = grabber_count;
338 
339  astring send_tries_temp;
340  int send_tries = MAX_SEND_TRIES;
341  if (args.get_value("trysend", send_tries_temp, false)) {
342  LOG(astring("using send tries: ") + send_tries_temp);
343  send_tries = send_tries_temp.convert(send_tries);
344  if (send_tries < 0) send_tries = 0;
345  }
346  _send_tries = send_tries;
347 
348 //hmmm: how tiresome. how about a macro here? could help in general
349 // with command_line also.
350 
351  astring checkpoint_temp;
352  int checkpoint_count = CHECKPOINT_SIZE;
353  if (args.get_value("print", checkpoint_temp, false)) {
354  LOG(astring("using checkpoint count: ") + checkpoint_temp);
355  checkpoint_count = checkpoint_temp.convert(checkpoint_count);
356  if (checkpoint_count <= 0) checkpoint_count = 1;
357  }
358  _checkpoint_count = checkpoint_count;
359 
360  astring dataseg_temp;
361  int dataseg_size = DATA_SEGMENT_SIZE;
362  if (args.get_value("dataseg", dataseg_temp, false)) {
363  LOG(astring("using dataseg size: ") + dataseg_temp);
364  dataseg_size = dataseg_temp.convert(dataseg_size);
365  if (dataseg_size < 0) dataseg_size = 0;
366  }
367  _dataseg_size = dataseg_size;
368 
369  astring report_temp;
370  int report_interval = REPORTING_INTERVAL;
371  if (args.get_value("report", report_temp, false)) {
372  LOG(astring("using report interval: ") + report_temp);
373  report_interval = report_temp.convert(report_interval);
374  if (report_interval <= 0) report_interval = 1;
375  report_interval *= SECOND_ms; // convert to milliseconds.
376  }
377  _report_interval = report_interval;
378 
379  astring snooze_temp;
380  int snooze_duration = 0; // no snooze by default.
381  if (args.get_value("snooze", snooze_temp, false)) {
382  LOG(astring("using snooze duration: ") + snooze_temp);
383  snooze_duration = snooze_temp.convert(snooze_duration);
384  if (snooze_duration < 0) snooze_duration = 0;
385  }
386  _snooze_duration = snooze_duration;
387 
388  if (args.find("reconstruct", indy, false)) {
389  LOG("saw reconstruct flag; will periodically tear down object.");
390  _reconstruct_object = true;
391  }
392 
393 LOG(astring("opening at ") + _server_loc.text_form());
394  _uplink = new cromp_client(_server_loc);
395 
396  _uplink->add_tentacle(new bubbles_tentacle(false));
397 //we don't need backgrounding right now.
398 }
399 
400 cromp_client_tester::~cromp_client_tester()
401 {
402  WHACK(_lock);
403  WHACK(_uplink);
404 }
405 
407 {
409  log(a_sprintf("%s usage:", name.s()));
410  log(astring(""));
411  log(a_sprintf("\
412 This program connects to a cromp test server and exchanges packets to test\n\
413 the performance of the cromp protocol. All command line flags are optional\n\
414 but can be added to specify how the test should be performed. Currently,\n\
415 the valid options are:\n\
416  --help\tShow this set of command-line help.\n\
417  -?\t\tditto.\n\
418  --port N\tConnect to the server on the port specified.\n\
419  --host X\tConnect to server at IP address or hostname X.\n\
420  --encrypt\tEncrypt the connection. Server must do this also.\n\
421  -e\t\tditto.\n\
422  --sends N\tThe number of sends to perform.\n\
423  --threads N\tNumber of threads competing for single cromp link.\n\
424  --grab N\tNumber of additional threads stressing retrievals.\n\
425  --trysend N\tCount of tries for sending if not all data went out.\n\
426  --print N\tItems handled in between showing send counter.\n\
427  --dataseg N\tSize of extra data packed in each test packet.\n\
428  --report N\tDuration of time between reports, in seconds.\n\
429  --snooze N\tSleep N ms between each send; this invalidates timing info.\n\
430  --rpc\tEmulate Remote Procedure Call by awaiting each response.\n\
431  -R\t\tditto\n\
432 "));
433  return -3;
434 }
435 
436 void cromp_client_tester::look_for_receipts(int count,
438  structures::set<octopus_request_id> &delinquents, bool wait)
439 {
440  FUNCDEF("look_for_receipts");
441  infoton *received = NULL_POINTER;
442  while (count--) {
443  if (!ids.length()) break; // nothing to check on.
444  octopus_request_id the_id = ids[0];
445  ids.zap(0, 0); // take out the one we're inspecting right now.
446 
447  time_stamp start_acquire;
448  int delay = MAXIMUM_ACQUISITION_DELAY;
449  if (wait) delay = 2 * MINUTE_ms; // force a long delay.
450  outcome ret = _uplink->acquire(received, the_id, delay);
451  int acquire_duration = int(time_stamp().value() - start_acquire.value());
452  if (acquire_duration >= MAXIMUM_ACQUISITION_DELAY - 1) {
453  LOG("passed time limit for acquire! this is the faux dual-cpu bug!");
454  LOG(a_sprintf("there were %d items left to acquire.", count));
455  LOG(a_sprintf("pending %d bytes to send, %d bytes accumulated.",
456  _uplink->pending_sends(), _uplink->accumulated_bytes()));
457  LOG(a_sprintf("the data bin had %d items awaiting pickup.",
458  _uplink->octo()->responses().items_held()));
459  if (ret != cromp_client::TIMED_OUT) {
460  LOG("cromp client lied about outcome?? didn't call this timed out!!");
461  }
462  }
463 
464  if (ret != cromp_client::OKAY) {
465  if (ret != cromp_client::TIMED_OUT) {
466  LOG(astring("failed to acquire the response--got error ")
467  + cromp_client::outcome_name(ret));
468  // give it another chance later.
469  ids += the_id;
470 LOG(a_sprintf("moved %s back to main id queue.", the_id.text_form().s()));
471  } else {
472  if (delinquents.member(the_id))
473  continuable_error(class_name(), func,
474  astring("a delinquent response is still missing: ")
475  + the_id.text_form());
476  // if we hadn't already seen it, we'll watch for it next time.
477  delinquents += the_id;
478 LOG(a_sprintf("added %s to delinquents.", the_id.text_form().s()));
479  }
480  return;
481  }
482 
483 if (!received) {
484 deadly_error(class_name(), func,
485 "received packet was NULL_POINTER even though outcome was OKAY!");
486 }
487 
488  // check that the right type is coming back to us.
489  bubble *cast = dynamic_cast<bubble *>(received);
490  if (!cast) {
491  continuable_error(class_name(), func, astring("got the wrong type "
492  "of response: ") + received->classifier().text_form());
493  }
494 
495  // if we had a problem with this item earlier, we remove it since it
496  // succeeded this time.
497  if (delinquents.member(the_id))
498  delinquents.remove(the_id);
499  WHACK(received);
500  }
501 }
502 
503 void cromp_client_tester::bite_server(structures::set<octopus_request_id> &ids,
505  void *originator)
506 {
507  FUNCDEF("bite_server");
508  octopus_request_id cmd_id;
509 
511 
512  outcome ret;
513 
514  double overall_sent = 0;
515 
516 //hmmm: not very interesting boundaries below, non-randomized and identical in both places.
517  const char *bounds_init[] = { "0", "120", "220", "280" };
518  string_array boundish(4, bounds_init);
519 
520  // this computes the size of the exchange object with no extra data attached.
522  bubble test_size(_dataseg_size, boundish, 238843);
523  test_size.data().reset();
524  // set the data segment to zero length.
525  test_size.pack(temp);
526  int base_length = temp.length();
527  // this is the base packed length of the bubble object.
528 
529  int failure_count = 0;
530 
531  time_stamp start; // record when our testing started.
532 
533  for (int sends = 1; sends <= _send_count; sends++) {
534  bubble to_send(_dataseg_size, boundish, 238843);
535  int curr_sending = to_send.data_length() + base_length * 2;
536  overall_sent += curr_sending;
537  // we compute the overall sent by what's sent in the request (which is
538  // of the base length plus the attached array size) and the reply (which
539  // is the base length only since the server resets the data attachment).
540  // we go ahead and count it as sent before the send, since we're going
541  // to bomb out if the send doesn't work.
542  ret = _uplink->submit(to_send, cmd_id, _send_tries);
543  switch (ret.value()) {
544  case cromp_client::OKAY: {
545  // complete success in sending that chunk out.
546  ids.add(cmd_id); // record it.
547  if (_rpc_style) {
548  // this call is used to force single requests and replies RPC style.
549  look_for_receipts(1, ids, delinquents, true);
550  }
551  // sleep if we were asked to.
552  if (_snooze_duration) {
553  _uplink->keep_alive_pause(_snooze_duration, 60);
554  look_for_receipts(1, ids, delinquents);
555  }
556  break;
557  }
558  case cromp_client::TOO_FULL: {
559 //treating as failure right now.
560 LOG("got too full outcome!");
561  sends--;
562  overall_sent -= curr_sending;
563  continue;
564  break;
565  }
566  case cromp_client::TIMED_OUT: {
567 //treating as failure right now.
568 LOG("got timed out outcome!");
569  sends--;
570  overall_sent -= curr_sending;
571  continue;
572  break;
573  }
574  default: {
575  // a failure case that we have no other handling for.
576  if (failure_count++ < 20) {
577  sends--; // skip back for the failed one.
578  overall_sent -= curr_sending; // remove unsent portion.
579  LOG(astring("got failure outcome ") + cromp_client::outcome_name(ret)
580  + " from attempt to submit request.");
581  if (_snooze_duration) {
582  _uplink->keep_alive_pause(_snooze_duration, 60);
583  }
584  continue; // try again.
585  }
586  continuable_error(class_name(), func,
587  astring("failed to submit the request--got error ")
588  + cromp_client::outcome_name(ret));
589  break;
590  }
591  }
592 
593  _finished_loops += 1.0;
594 
595  if (ids.elements() > MAXIMUM_PENDING_REQUESTS) {
596  // grab some of the items waiting. hopefully they're back by now.
597  look_for_receipts(PENDING_REQUESTS_FORCED, ids, delinquents);
598  }
599 
600  if (! (sends % _checkpoint_count)) {
601  LOG(a_sprintf("%x send #%d", originator, sends));
602  }
603  }
604  LOG(a_sprintf("%x final send #%d", originator, _send_count));
605 
608 
609  look_for_receipts(ids.elements(), ids, delinquents);
610 
611  LOG(a_sprintf("concluded %d test requests and responses.", _send_count));
612 }
613 
614 void cromp_client_tester::grab_items()
615 {
616  FUNCDEF("grab_items");
617  octopus_request_id id(_uplink->entity(), -12);
618  // look for an id we don't expect to have any thing waiting for.
619  infoton *found = NULL_POINTER;
620  outcome ret = _uplink->retrieve_and_restore(found, id, 0);
621  WHACK(found);
622 }
623 
624 void cromp_client_tester::report(const time_stamp &start_time,
625  double bytes_transmitted, double conversations)
626 {
627  FUNCDEF("report");
628  throughput_counter bandwidth; // calculator for communication speed.
629  double duration = time_stamp().value() - start_time.value();
630  // the elapsed duration so far.
631  bandwidth.add_run(bytes_transmitted, duration, conversations * 2);
632  // create a portrait of how the run has progressed. we multiply the
633  // conversations by two since we are counting both the request and the
634  // response (send and receive) as a transfer.
635 
636  // calculate the number of bytes per item for real as it plays out in
637  // cromp sending.
638  double bytes_per_item = bandwidth.bytes_sent() / bandwidth.number_of_sends();
639 
640  bubble my_bubble(_dataseg_size); // an exemplar for our sends.
641 
642  // calculate how much space bubble's naming takes up.
643  byte_array packed_classifier;
644  structures::pack_array(packed_classifier, my_bubble.classifier());
645  double classifier_size = packed_classifier.length() - sizeof(int);
646  // that's how much space is used by our goofy classifier name. there are
647  // a few bytes extra overhead for packing a string array and we remove
648  // them from consideration; we only want credit for the name, since that
649  // is not truly overhead, given that the bubble infoton chose it.
650  double payload_portion = my_bubble.packed_size() + classifier_size;
651  // calculate the portion of our transmissions that are solely the
652  // result of what we are putting into the package.
653  double overhead = bytes_per_item - payload_portion;
654  // okay, this is how many bytes per item is cromp noise, rather than
655  // something the user is responsible for.
656  double percent_overhead = overhead / bytes_per_item;
657 
658 // change 0 to 1 to enable this section of information.
659 #if 0
660  // get additional facts about how much of a packed infoton is wasted.
661  byte_array packed_infote;
662  infoton::fast_pack(packed_infote, my_bubble);
663  log(a_sprintf("sane? -- overhead for just packed infoton is %d bytes.",
664  packed_infote.length() - payload_portion));
665  octopus_request_id example_request(_uplink->entity(), 23982);
666  byte_array packed_req_id;
667  example_request.pack(packed_req_id);
668  log(a_sprintf(" -- overhead for octo request id is %d bytes.",
669  packed_req_id.length()));
670  byte_array packed_transa;
671  cromp_transaction::flatten(packed_transa, my_bubble,
672  octopus_request_id(_uplink->entity(), 23982));
673  log(a_sprintf(" -- overhead for cromp transation is %d bytes.",
674  packed_transa.length() - payload_portion));
675 #endif
676 
677  LOG(a_sprintf("sent %.0f items, %.0f bytes, %.0f bytes per item,%s"
678  "payload %.0f bytes, overhead %.0f bytes, percent overhead %.1f%%,%s"
679  "in %.2f seconds is %f ms/item%s"
680  "at %.2f %cb/sec & %.2f items/sec.",
681  bandwidth.number_of_sends(), bandwidth.bytes_sent(),
682  bytes_per_item,
683  parser_bits::platform_eol_to_chars(),
684  payload_portion, overhead, percent_overhead * 100.0,
685  parser_bits::platform_eol_to_chars(),
686  bandwidth.total_time() / SECOND_ms,
687  bandwidth.total_time() / bandwidth.number_of_sends(),
688  parser_bits::platform_eol_to_chars(),
689  (bandwidth.kilobytes_per_second() < 1024.0?
690  bandwidth.kilobytes_per_second()
691  : bandwidth.megabytes_per_second()),
692  (bandwidth.kilobytes_per_second() < 1024.0? 'K' : 'M'),
693  bandwidth.number_of_sends() / (bandwidth.total_time() / SECOND_ms)));
694 }
695 
696 void cromp_client_tester::cause_object_reconstruction()
697 {
698  FUNCDEF("cause_object_reconstruction");
699  int rando = chaos().inclusive(1, 100);
700  if (rando > CHANCE_OF_RECONSTRUCT) return; // not doing it this time.
701  LOG(astring("reconstructing client at ") + _server_loc.text_form());
702 //below is not good when multiple threads are allowed to romp on client.
705 
706  _uplink->disconnect();
707  outcome ret = common::INVALID;
708  int counter = 100; // allowed this many times to try to reconnect.
709  while ( (ret != common::OKAY) && (counter-- >= 0) ) {
710  ret = _uplink->connect();
711  if (ret != cromp_client::OKAY) {
712  LOG(astring("couldn't reconnect this time: ")
713  + cromp_client::outcome_name(ret));
714  time_control::sleep_ms(420);
715  }
716  }
717 }
718 
719 int cromp_client_tester::execute()
720 {
721  FUNCDEF("execute");
722 
723  // testing that crompish pax are done right.
724  bubble fud(randomizer().inclusive(12, 2829));
725  byte_array packed_fud;
726  fud.pack(packed_fud);
727  if (packed_fud.length() != fud.packed_size())
728  deadly_error(class_name(), func, "bubble's packed size method is wrong.");
729 
730  if (_encryption) _uplink->enable_encryption();
731 
732  outcome ret = _uplink->connect();
733  if (ret != cromp_client::OKAY) {
734  deadly_error(class_name(), func, astring("connection failed with error: ")
735  + cromp_client::outcome_name(ret));
736  }
737 
738  thread_cabinet cab; // we store a bunch of threads here.
739 
740  LOG(a_sprintf("adding %d grabber threads to test.", _grabber_count));
741 
742  // create the extra grabber threads.
743  for (int i = 0; i < _grabber_count; i++) {
744  grabby_thread *to_add = new grabby_thread(*this);
745  cab.add_thread(to_add, false, NULL_POINTER);
746  }
747 
748  LOG(a_sprintf("adding %d transmitter threads to test.", _thread_count));
749 
750  // create the specified number of threads.
751  for (int j = 0; j < _thread_count; j++) {
752  bitey_thread *to_add = new bitey_thread(*this);
753  cab.add_thread(to_add, false, NULL_POINTER);
754  }
755 
756 //LOG("starting all threads...");
757  time_stamp start;
758  cab.start_all(NULL_POINTER);
759 //LOG("done starting threads...");
760 
761  time_control::sleep_ms(400); // wait until a few get cranked up.
762 
763 //LOG("did our initial sleep...");
764 
765  while (cab.any_running()) {
766  time_control::sleep_ms(30);
767  if (!_threads_active) {
768  break;
769  }
770 //LOG("main loop...");
771  if (time_stamp(-_report_interval) > _last_report) {
772  report(start, cromp_common::total_bytes_sent()
773  + cromp_common::total_bytes_received(),
774  _finished_loops);
775  _last_report.reset();
776  }
777  if (_reconstruct_object) {
778  cause_object_reconstruction();
779  }
780  if (!_uplink->connected()) {
781  LOG("connection dropped. trying to connect again.");
782  outcome ret = _uplink->connect();
783  if (ret != cromp_client::OKAY) {
784  // snooze a bit so as not to drive server crazy or log too much noise.
785  time_control::sleep_ms(10 * SECOND_ms);
786  }
787  }
788  }
789 
790  LOG("- done testing -");
791 
792  if (_finished_loops != double(_thread_count) * _send_count)
793  LOG(a_sprintf("number of loops was calculated differently: wanted %d, "
794  "got %d", _thread_count * _send_count, _finished_loops));
795 
796  report(start, cromp_common::total_bytes_sent()
797  + cromp_common::total_bytes_received(),
798  _thread_count * _send_count);
799 
800 //LOG("stopping all threads...");
801  cab.stop_all();
802  LOG("all threads exited.");
803 
804 #ifdef DEBUG_TESTER
807 #endif
808 
809  LOG("works for those functions tested.");
810 
811  return 0;
812 }
813 
815 
816 HOOPLE_MAIN(cromp_client_tester, )
817 
int print_instructions(bool good, const astring &program_name)
Definition: checker.cpp:45
The application_shell is a base object for console programs.
a_sprintf is a specialization of astring that provides printf style support.
Definition: astring.h:440
int length() const
Returns the current reported length of the allocated C array.
Definition: array.h:115
outcome zap(int start, int end)
Deletes from "this" the objects inclusively between "start" and "end".
Definition: array.h:769
Provides a dynamically resizable ASCII character string.
Definition: astring.h:35
const char * s() const
synonym for observe. the 's' stands for "string", if that helps.
Definition: astring.h:113
int convert(int default_value) const
Converts the string into a corresponding integer.
Definition: astring.cpp:760
virtual void text_form(base_string &state_fill) const
Provides a text view of all the important info owned by this object.
Definition: astring.cpp:130
auto_synchronizer simplifies concurrent code by automatically unlocking.
Definition: mutex.h:113
A very common template for a dynamic array of bytes.
Definition: byte_array.h:36
Outcomes describe the state of completion for an operation.
Definition: outcome.h:31
int value() const
Definition: outcome.h:51
Provides operations commonly needed on file names.
Definition: filename.h:64
const basis::astring & raw() const
returns the astring that we're holding onto for the path.
Definition: filename.cpp:97
filename basename() const
returns the base of the filename; no directory.
Definition: filename.cpp:385
a platform-independent way to acquire random numbers in a specific range.
Definition: chaos.h:51
int inclusive(int low, int high) const
< Returns a pseudo-random number r, such that "low" <= r <= "high".
Definition: chaos.h:88
An infoton is an individual request parcel with accompanying information.
Definition: infoton.h:32
const structures::string_array & classifier() const
this array of strings is the "name" for this infoton.
Definition: infoton.cpp:85
Identifies requests made on an octopus by users.
Definition: entity_defs.h:114
basis::astring text_form() const
human readable form of the request.
Provides a platform-independent object for adding threads to a program.
Definition: ethread.h:36
Manages a collection of threads.
bool any_running() const
returns true if any threads are currently running.
structures::unique_int add_thread(ethread *to_add, bool start_it, void *parm)
adds a thread to be managed by the thread_cabinet.
void start_all(void *pointer)
cranks up any threads that are not already running.
void stop_all()
makes all of the threads quit.
this type of address describes a destination out on the internet.
Reports on average bandwidth of the transfers being measured.
double kilobytes_per_second() const
returns the number of kilobytes that transfers are getting per second.
double bytes_sent() const
returns the number of bytes sent so far.
double total_time() const
the run time so far, in milliseconds.
double number_of_sends() const
returns the number of sends that have occurred.
double megabytes_per_second() const
returns the number of megabytes that transfers are getting per second.
void add_run(double size_of_send, double time_of_send, double number_of_runs=1.0)
records a run without changing the state of the current run.
int elements() const
Returns the number of elements in this set.
Definition: set.h:47
bool member(const contents &to_test) const
Returns true if the item "to_test" is a member of this set.
Definition: set.h:223
bool add(const contents &to_add)
Adds a new element "to_add" to the set.
Definition: set.h:232
bool remove(const contents &to_remove)
Removes the item "to_remove" from the set.
Definition: set.h:249
An array of strings with some additional helpful methods.
Definition: string_array.h:32
basis::astring text_form() const
A synonym for the text_format() method.
Definition: string_array.h:71
Represents a point in time relative to the operating system startup time.
Definition: time_stamp.h:38
time_representation value() const
returns the time_stamp in terms of the lower level type.
Definition: time_stamp.h:61
#define continuable_error(c, f, i)
#define deadly_error(c, f, i)
#define formal(parameter)
This macro just eats what it's passed; it marks unused formal parameters.
Definition: definitions.h:48
#define NULL_POINTER
The value representing a pointer to nothing.
Definition: definitions.h:32
#define DEFINE_CLASS_NAME(objname)
Defines the name of a class by providing a couple standard methods.
Definition: enhance_cpp.h:42
#define FUNCDEF(func_in)
FUNCDEF sets the name of a function (and plugs it into the callstack).
Definition: enhance_cpp.h:54
Provides macros that implement the 'main' program of an application.
#define HOOPLE_MAIN(obj_name, obj_args)
options that should work for most unix and linux apps.
Definition: hoople_main.h:61
Implements an application lock to ensure only one is running at once.
char ** _global_argv
The guards collection helps in testing preconditions and reporting errors.
Definition: array.h:30
void WHACK(contents *&ptr)
deletion with clearing of the pointer.
Definition: functions.h:121
const int SECOND_ms
Number of milliseconds in a second.
Definition: definitions.h:120
const int MINUTE_ms
Number of milliseconds in a minute.
Definition: definitions.h:121
A platform independent way to obtain the timestamp of a file.
Definition: byte_filer.cpp:37
A logger that sends to the console screen using the standard output device.
An extension to floating point primitives providing approximate equality.
Definition: averager.h:21
Provides access to the operating system's socket methods.
Definition: base_address.h:26
A dynamic container class that holds any kind of object via pointers.
Definition: amorph.h:55
void pack_array(basis::byte_array &packed_form, const basis::array< contents > &to_pack)
provides a way to pack any array that stores packable objects.
#include <time.h>
Definition: earth_time.cpp:37
Useful support functions for unit testing, especially within hoople.
Definition: unit_base.cpp:35
#define randomizer()
const int MAX_SEND_TRIES
const int MAXIMUM_ACQUISITION_DELAY
const int CHECKPOINT_SIZE
#define LOG(s)
const int PENDING_REQUESTS_FORCED
const int DATA_SEGMENT_SIZE
const int CHANCE_OF_RECONSTRUCT
const int GRABBER_THREADS
const int MAXIMUM_SENDS
const int NUMBER_OF_THREADS
const int REPORTING_INTERVAL
const int MAXIMUM_PENDING_REQUESTS
chaos rando