44 using namespace basis;
46 using namespace cromp;
59 #define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger::get(), astring(s))
129 cromp_client_tester();
130 ~cromp_client_tester();
132 virtual int execute();
148 void cause_object_reconstruction();
151 void increment_thread_count() {
152 FUNCDEF(
"increment_thread_count");
158 void decrement_thread_count() {
159 FUNCDEF(
"decrement_thread_count");
165 void report(
const time_stamp &start_time,
double bytes_transmitted,
166 double conversations);
175 double _finished_loops;
181 int _checkpoint_count;
183 int _report_interval;
184 int _snooze_duration;
186 bool _reconstruct_object;
196 class bitey_thread :
public ethread
199 bitey_thread(cromp_client_tester &parent)
200 :
ethread(), _parent(parent) {}
202 void perform_activity(
void *
formal(ptr)) {
204 _parent.increment_thread_count();
205 _parent.bite_server(_ids, _delinquents,
this);
206 _parent.decrement_thread_count();
210 cromp_client_tester &_parent;
219 class grabby_thread :
public ethread
222 grabby_thread(cromp_client_tester &parent)
223 :
ethread(), _parent(parent) {}
225 void perform_activity(
void *
formal(ptr)) {
226 while (!should_stop()) {
227 _parent.grab_items();
228 if (_rando.inclusive(0, 100) > 10)
229 time_control::sleep_ms(_rando.inclusive(5, 38));
234 cromp_client_tester &_parent;
240 cromp_client_tester::cromp_client_tester()
246 _finished_loops(0.0),
252 _checkpoint_count(0),
257 _reconstruct_object(false),
268 if (args.find(
"help", indy,
false)
269 || (args.find(
"?", indy,
false))
270 || (args.find(
'?', indy,
false)) ) {
278 if (args.get_value(
"port", port_text,
false)) {
280 port = port_text.
convert(5678);
282 _server_loc.port = port;
287 if (args.find(
"encrypt", indy,
false) || (args.find(
'e', indy,
true)) ) {
289 LOG(
"turning on encryption.");
294 if (args.find(
"rpc", indy,
false) || (args.find(
'R', indy,
true)) ) {
298 LOG(
"turning on RPC style requests.");
305 if (args.get_value(
"host", host_temp,
false)) {
307 hostname = host_temp;
310 strcpy(_server_loc.hostname, hostname.s());
314 if (args.get_value(
"sends", send_temp,
false)) {
315 LOG(
astring(
"using send count: ") + send_temp);
316 send_count = send_temp.
convert(send_count);
317 if (send_count <= 0) send_count = 1;
319 _send_count = send_count;
323 if (args.get_value(
"threads", thread_temp,
false)) {
324 LOG(
astring(
"using thread count: ") + thread_temp);
325 thread_count = thread_temp.
convert(thread_count);
326 if (thread_count <= 0) thread_count = 1;
328 _thread_count = thread_count;
332 if (args.get_value(
"grab", grabber_temp,
false)) {
333 LOG(
astring(
"using grabber count: ") + grabber_temp);
334 grabber_count = grabber_temp.
convert(grabber_count);
335 if (grabber_count < 0) grabber_count = 0;
337 _grabber_count = grabber_count;
341 if (args.get_value(
"trysend", send_tries_temp,
false)) {
342 LOG(
astring(
"using send tries: ") + send_tries_temp);
343 send_tries = send_tries_temp.
convert(send_tries);
344 if (send_tries < 0) send_tries = 0;
346 _send_tries = send_tries;
353 if (args.get_value(
"print", checkpoint_temp,
false)) {
354 LOG(
astring(
"using checkpoint count: ") + checkpoint_temp);
355 checkpoint_count = checkpoint_temp.
convert(checkpoint_count);
356 if (checkpoint_count <= 0) checkpoint_count = 1;
358 _checkpoint_count = checkpoint_count;
362 if (args.get_value(
"dataseg", dataseg_temp,
false)) {
363 LOG(
astring(
"using dataseg size: ") + dataseg_temp);
364 dataseg_size = dataseg_temp.
convert(dataseg_size);
365 if (dataseg_size < 0) dataseg_size = 0;
367 _dataseg_size = dataseg_size;
371 if (args.get_value(
"report", report_temp,
false)) {
372 LOG(
astring(
"using report interval: ") + report_temp);
373 report_interval = report_temp.
convert(report_interval);
374 if (report_interval <= 0) report_interval = 1;
377 _report_interval = report_interval;
380 int snooze_duration = 0;
381 if (args.get_value(
"snooze", snooze_temp,
false)) {
382 LOG(
astring(
"using snooze duration: ") + snooze_temp);
383 snooze_duration = snooze_temp.
convert(snooze_duration);
384 if (snooze_duration < 0) snooze_duration = 0;
386 _snooze_duration = snooze_duration;
388 if (args.find(
"reconstruct", indy,
false)) {
389 LOG(
"saw reconstruct flag; will periodically tear down object.");
390 _reconstruct_object =
true;
400 cromp_client_tester::~cromp_client_tester()
412 This program connects to a cromp test server and exchanges packets to test\n\
413 the performance of the cromp protocol. All command line flags are optional\n\
414 but can be added to specify how the test should be performed. Currently,\n\
415 the valid options are:\n\
416 --help\tShow this set of command-line help.\n\
418 --port N\tConnect to the server on the port specified.\n\
419 --host X\tConnect to server at IP address or hostname X.\n\
420 --encrypt\tEncrypt the connection. Server must do this also.\n\
422 --sends N\tThe number of sends to perform.\n\
423 --threads N\tNumber of threads competing for single cromp link.\n\
424 --grab N\tNumber of additional threads stressing retrievals.\n\
425 --trysend N\tCount of tries for sending if not all data went out.\n\
426 --print N\tItems handled in between showing send counter.\n\
427 --dataseg N\tSize of extra data packed in each test packet.\n\
428 --report N\tDuration of time between reports, in seconds.\n\
429 --snooze N\tSleep N ms between each send; this invalidates timing info.\n\
430 --rpc\tEmulate Remote Procedure Call by awaiting each response.\n\
436 void cromp_client_tester::look_for_receipts(
int count,
450 outcome ret = _uplink->acquire(received, the_id, delay);
451 int acquire_duration = int(
time_stamp().value() - start_acquire.
value());
453 LOG(
"passed time limit for acquire! this is the faux dual-cpu bug!");
454 LOG(
a_sprintf(
"there were %d items left to acquire.", count));
455 LOG(
a_sprintf(
"pending %d bytes to send, %d bytes accumulated.",
456 _uplink->pending_sends(), _uplink->accumulated_bytes()));
457 LOG(
a_sprintf(
"the data bin had %d items awaiting pickup.",
458 _uplink->octo()->responses().items_held()));
459 if (ret != cromp_client::TIMED_OUT) {
460 LOG(
"cromp client lied about outcome?? didn't call this timed out!!");
464 if (ret != cromp_client::OKAY) {
465 if (ret != cromp_client::TIMED_OUT) {
466 LOG(
astring(
"failed to acquire the response--got error ")
467 + cromp_client::outcome_name(ret));
472 if (delinquents.
member(the_id))
474 astring(
"a delinquent response is still missing: ")
477 delinquents += the_id;
485 "received packet was NULL_POINTER even though outcome was OKAY!");
497 if (delinquents.
member(the_id))
498 delinquents.
remove(the_id);
514 double overall_sent = 0;
517 const char *bounds_init[] = {
"0",
"120",
"220",
"280" };
522 bubble test_size(_dataseg_size, boundish, 238843);
523 test_size.data().reset();
525 test_size.pack(
temp);
526 int base_length =
temp.length();
529 int failure_count = 0;
533 for (
int sends = 1; sends <= _send_count; sends++) {
534 bubble to_send(_dataseg_size, boundish, 238843);
535 int curr_sending = to_send.data_length() + base_length * 2;
536 overall_sent += curr_sending;
542 ret = _uplink->submit(to_send, cmd_id, _send_tries);
543 switch (ret.
value()) {
544 case cromp_client::OKAY: {
549 look_for_receipts(1, ids, delinquents,
true);
552 if (_snooze_duration) {
553 _uplink->keep_alive_pause(_snooze_duration, 60);
554 look_for_receipts(1, ids, delinquents);
558 case cromp_client::TOO_FULL: {
560 LOG(
"got too full outcome!");
562 overall_sent -= curr_sending;
566 case cromp_client::TIMED_OUT: {
568 LOG(
"got timed out outcome!");
570 overall_sent -= curr_sending;
576 if (failure_count++ < 20) {
578 overall_sent -= curr_sending;
579 LOG(
astring(
"got failure outcome ") + cromp_client::outcome_name(ret)
580 +
" from attempt to submit request.");
581 if (_snooze_duration) {
582 _uplink->keep_alive_pause(_snooze_duration, 60);
587 astring(
"failed to submit the request--got error ")
588 + cromp_client::outcome_name(ret));
593 _finished_loops += 1.0;
600 if (! (sends % _checkpoint_count)) {
604 LOG(
a_sprintf(
"%x final send #%d", originator, _send_count));
609 look_for_receipts(ids.
elements(), ids, delinquents);
611 LOG(
a_sprintf(
"concluded %d test requests and responses.", _send_count));
614 void cromp_client_tester::grab_items()
620 outcome ret = _uplink->retrieve_and_restore(found,
id, 0);
624 void cromp_client_tester::report(
const time_stamp &start_time,
625 double bytes_transmitted,
double conversations)
631 bandwidth.
add_run(bytes_transmitted, duration, conversations * 2);
640 bubble my_bubble(_dataseg_size);
645 double classifier_size = packed_classifier.
length() -
sizeof(int);
650 double payload_portion = my_bubble.packed_size() + classifier_size;
653 double overhead = bytes_per_item - payload_portion;
656 double percent_overhead = overhead / bytes_per_item;
662 infoton::fast_pack(packed_infote, my_bubble);
663 log(
a_sprintf(
"sane? -- overhead for just packed infoton is %d bytes.",
664 packed_infote.
length() - payload_portion));
667 example_request.pack(packed_req_id);
668 log(
a_sprintf(
" -- overhead for octo request id is %d bytes.",
671 cromp_transaction::flatten(packed_transa, my_bubble,
673 log(
a_sprintf(
" -- overhead for cromp transation is %d bytes.",
674 packed_transa.
length() - payload_portion));
677 LOG(
a_sprintf(
"sent %.0f items, %.0f bytes, %.0f bytes per item,%s"
678 "payload %.0f bytes, overhead %.0f bytes, percent overhead %.1f%%,%s"
679 "in %.2f seconds is %f ms/item%s"
680 "at %.2f %cb/sec & %.2f items/sec.",
683 parser_bits::platform_eol_to_chars(),
684 payload_portion, overhead, percent_overhead * 100.0,
685 parser_bits::platform_eol_to_chars(),
688 parser_bits::platform_eol_to_chars(),
696 void cromp_client_tester::cause_object_reconstruction()
698 FUNCDEF(
"cause_object_reconstruction");
706 _uplink->disconnect();
709 while ( (ret != common::OKAY) && (counter-- >= 0) ) {
710 ret = _uplink->connect();
711 if (ret != cromp_client::OKAY) {
713 + cromp_client::outcome_name(ret));
714 time_control::sleep_ms(420);
719 int cromp_client_tester::execute()
726 fud.pack(packed_fud);
727 if (packed_fud.
length() != fud.packed_size())
728 deadly_error(class_name(), func,
"bubble's packed size method is wrong.");
730 if (_encryption) _uplink->enable_encryption();
732 outcome ret = _uplink->connect();
733 if (ret != cromp_client::OKAY) {
735 + cromp_client::outcome_name(ret));
740 LOG(
a_sprintf(
"adding %d grabber threads to test.", _grabber_count));
743 for (
int i = 0; i < _grabber_count; i++) {
744 grabby_thread *
to_add =
new grabby_thread(*
this);
748 LOG(
a_sprintf(
"adding %d transmitter threads to test.", _thread_count));
751 for (
int j = 0; j < _thread_count; j++) {
752 bitey_thread *
to_add =
new bitey_thread(*
this);
761 time_control::sleep_ms(400);
766 time_control::sleep_ms(30);
767 if (!_threads_active) {
771 if (
time_stamp(-_report_interval) > _last_report) {
772 report(start, cromp_common::total_bytes_sent()
773 + cromp_common::total_bytes_received(),
775 _last_report.reset();
777 if (_reconstruct_object) {
778 cause_object_reconstruction();
780 if (!_uplink->connected()) {
781 LOG(
"connection dropped. trying to connect again.");
782 outcome ret = _uplink->connect();
783 if (ret != cromp_client::OKAY) {
790 LOG(
"- done testing -");
792 if (_finished_loops !=
double(_thread_count) * _send_count)
793 LOG(
a_sprintf(
"number of loops was calculated differently: wanted %d, "
794 "got %d", _thread_count * _send_count, _finished_loops));
796 report(start, cromp_common::total_bytes_sent()
797 + cromp_common::total_bytes_received(),
798 _thread_count * _send_count);
802 LOG(
"all threads exited.");
809 LOG(
"works for those functions tested.");
int print_instructions(bool good, const astring &program_name)
The application_shell is a base object for console programs.
a_sprintf is a specialization of astring that provides printf style support.
int length() const
Returns the current reported length of the allocated C array.
outcome zap(int start, int end)
Deletes from "this" the objects inclusively between "start" and "end".
Provides a dynamically resizable ASCII character string.
const char * s() const
synonym for observe. the 's' stands for "string", if that helps.
int convert(int default_value) const
Converts the string into a corresponding integer.
virtual void text_form(base_string &state_fill) const
Provides a text view of all the important info owned by this object.
auto_synchronizer simplifies concurrent code by automatically unlocking.
A very common template for a dynamic array of bytes.
Outcomes describe the state of completion for an operation.
Provides operations commonly needed on file names.
const basis::astring & raw() const
returns the astring that we're holding onto for the path.
filename basename() const
returns the base of the filename; no directory.
a platform-independent way to acquire random numbers in a specific range.
int inclusive(int low, int high) const
< Returns a pseudo-random number r, such that "low" <= r <= "high".
An infoton is an individual request parcel with accompanying information.
const structures::string_array & classifier() const
this array of strings is the "name" for this infoton.
Identifies requests made on an octopus by users.
basis::astring text_form() const
human readable form of the request.
Provides a platform-independent object for adding threads to a program.
Manages a collection of threads.
bool any_running() const
returns true if any threads are currently running.
structures::unique_int add_thread(ethread *to_add, bool start_it, void *parm)
adds a thread to be managed by the thread_cabinet.
void start_all(void *pointer)
cranks up any threads that are not already running.
void stop_all()
makes all of the threads quit.
this type of address describes a destination out on the internet.
Reports on average bandwidth of the transfers being measured.
double kilobytes_per_second() const
returns the number of kilobytes that transfers are getting per second.
double bytes_sent() const
returns the number of bytes sent so far.
double total_time() const
the run time so far, in milliseconds.
double number_of_sends() const
returns the number of sends that have occurred.
double megabytes_per_second() const
returns the number of megabytes that transfers are getting per second.
void add_run(double size_of_send, double time_of_send, double number_of_runs=1.0)
records a run without changing the state of the current run.
int elements() const
Returns the number of elements in this set.
bool member(const contents &to_test) const
Returns true if the item "to_test" is a member of this set.
bool add(const contents &to_add)
Adds a new element "to_add" to the set.
bool remove(const contents &to_remove)
Removes the item "to_remove" from the set.
An array of strings with some additional helpful methods.
basis::astring text_form() const
A synonym for the text_format() method.
Represents a point in time relative to the operating system startup time.
time_representation value() const
returns the time_stamp in terms of the lower level type.
#define continuable_error(c, f, i)
#define deadly_error(c, f, i)
#define formal(parameter)
This macro just eats what it's passed; it marks unused formal parameters.
#define NULL_POINTER
The value representing a pointer to nothing.
#define DEFINE_CLASS_NAME(objname)
Defines the name of a class by providing a couple standard methods.
#define FUNCDEF(func_in)
FUNCDEF sets the name of a function (and plugs it into the callstack).
Provides macros that implement the 'main' program of an application.
#define HOOPLE_MAIN(obj_name, obj_args)
options that should work for most unix and linux apps.
Implements an application lock to ensure only one is running at once.
The guards collection helps in testing preconditions and reporting errors.
void WHACK(contents *&ptr)
deletion with clearing of the pointer.
const int SECOND_ms
Number of milliseconds in a second.
const int MINUTE_ms
Number of milliseconds in a minute.
A platform independent way to obtain the timestamp of a file.
A logger that sends to the console screen using the standard output device.
An extension to floating point primitives providing approximate equality.
Provides access to the operating system's socket methods.
A dynamic container class that holds any kind of object via pointers.
void pack_array(basis::byte_array &packed_form, const basis::array< contents > &to_pack)
provides a way to pack any array that stores packable objects.
Useful support functions for unit testing, especially within hoople.
const int MAXIMUM_ACQUISITION_DELAY
const int CHECKPOINT_SIZE
const int PENDING_REQUESTS_FORCED
const int DATA_SEGMENT_SIZE
const int CHANCE_OF_RECONSTRUCT
const int GRABBER_THREADS
const int NUMBER_OF_THREADS
const int REPORTING_INTERVAL
const int MAXIMUM_PENDING_REQUESTS