note description: "An aranea node that sends messges for profiling and analyzes the results" author: "Comerge AG, DR" date: "$Date$" revision: "$Revision$" class SENDER_NODE inherit AP_PROFILING_NODE redefine main, make_profiling_node, clean_results end PROFILING_CONSTANTS -- DT_SHARED_SYSTEM_CLOCK KL_SHARED_EXCEPTIONS rename exceptions as kl_exceptions end PROFILING_UTILS create make_profiling_node feature -- Creation make_profiling_node -- Run application. do parse_command_line print("starting sender%N") Precursor end parse_command_line -- parse the passed options local parser: AP_PARSER do create parser.make parser.set_application_description ("This is the aranea profiling application used to measure the performance of the transport bus") -- create start_sender_flag.make ('s', "start_sender") -- start_sender_flag.set_description ("Starts the sender node and performs analysis afterwards") -- parser.options.force_last (start_sender_flag) -- create start_receiver_flag.make ('r', "start_receiver") -- start_receiver_flag.set_description ("Starts the receiver node") -- parser.options.force_last (start_receiver_flag) create repetitions_option.make ('r', "repetitions") repetitions_option.set_description ("The nr of messages to send (if not specified the default is used)") repetitions_option.set_parameter_description ("NR_OF_REPETITIONS") parser.options.force_last (repetitions_option) create message_size_option.make ('s', "message_size") message_size_option.set_description ("The message payload size (if not specified the default content will be used)") message_size_option.set_parameter_description ("NR_OF_CHARACTERS") parser.options.force_last (message_size_option) create write_result_files_flag.make ('f', "write_result_files") write_result_files_flag.set_description ("Write profiling results to files") parser.options.force_last (write_result_files_flag) parser.parse_arguments end feature {NONE} -- Implementation remaining_messages: INTEGER -- the nr of remaining messages to send message_length: INTEGER -- The message payload length or -1 if not specified once if message_size_option.was_found then Result := message_size_option.parameter.to_integer else Result := -1 end end repetitions: INTEGER -- The nr. of messages to send or the default value if not specified once if repetitions_option.was_found then Result := repetitions_option.parameter else Result := repetitions_default end end send_profiling_messages -- start sending messages do remaining_messages := repetitions print("sender: sending " + repetitions.out + " profiling messges with length " + message_length.out + "...%N") send_profiling_message end send_profiling_message -- sends a single profiling message local l_msg: A_GENERAL_STRING_MESSAGE l_timestamp: INTEGER_64 l_payload: STRING do print ("sender: sending next profiling msg (" + remaining_messages.out + " remaining)%N") if message_length /= -1 then l_payload := create_message_payload (message_length) else l_payload := default_message_payload end create l_msg.make_from_string (l_payload) l_msg.set_reply_handler (agent handle_profiling_reply) -- compute timestamp before and after sending the message l_timestamp := compute_timestamp_now app_timestamps.force_last (l_timestamp) send_message (l_msg, aranea_prefix + echo_node_name) -- main_jobs.execute_all -- proccessing the reply is done in the main loop (we now that is the only job to execute) end request_results -- request the measurements from the receiver local l_msg: AP_PROFILING_RESULTS_REQUEST_MESSAGE do print ("all profiling msgs sent. requesting results from receiver%N") create l_msg.make l_msg.set_reply_handler (agent handle_results) send_message (l_msg, aranea_prefix + echo_node_name) end handle_profiling_reply (a_msg: A_MESSAGE) -- Handle the reply msg local l_timestamp: INTEGER_64 do -- measure time and book keeping l_timestamp := compute_timestamp_now app_timestamps.force_last (l_timestamp) print("sender: received profiling reply%N") remaining_messages := remaining_messages - 1 if remaining_messages > 0 then -- send another profiling message -- wait some time to calm activemq down (create {EXECUTION_ENVIRONMENT}).sleep (sleep_time) send_profiling_message else -- request measurements from the receiver node request_results end end handle_results (a_msg: A_MESSAGE) -- results have been received, perform the analysis -- the receiver sends back a list of timestamps local l_msg: AP_PROFILING_RESULTS_REPLY_MESSAGE do l_msg ?= a_msg -- start analysis print("converted receiver stamps: ") print_list (deserialize_timestamp_list (l_msg.app_timestamps)) print("sender stamps: ") print_list (app_timestamps) analyze (l_msg) end analyze (a_msg: AP_PROFILING_RESULTS_REPLY_MESSAGE) -- Analyze the measurements local l_receiver_app_timestamps, l_receiver_ems_timestamps: DS_ARRAYED_LIST [INTEGER_64] l_app_result, l_ems_result, l_result: AP_PROFILING_RESULT l_results: DS_ARRAYED_LIST [AP_PROFILING_RESULT] do l_receiver_app_timestamps := deserialize_timestamp_list (a_msg.app_timestamps) l_receiver_ems_timestamps := deserialize_timestamp_list (a_msg.ems_timestamps) clean_results if app_timestamps.count /= l_receiver_app_timestamps.count then -- something is not right here... print ("cannot compute analysis: Nr. of app timestamps from sender and receiver do not match!") Exceptions.die (-1) elseif ems_timestamps.count /= l_receiver_ems_timestamps.count then print ("cannot compute analysis: Nr. of ems timestamps from sender and receiver do not match!") Exceptions.die (-1) end -- put all the results in a container create l_results.make_default create l_app_result.make (app_timestamps, l_receiver_app_timestamps, "app") l_results.force_last (l_app_result) create l_ems_result.make (ems_timestamps, l_receiver_ems_timestamps, "ems") l_results.force_last (l_ems_result) -- perform the analysis from l_results.start until l_results.off loop l_result := l_results.item_for_iteration print_report (l_result) l_results.forth if write_result_files_flag.was_found then write_result_file (l_result.id + "-request", l_result.mean_request_transmission_time) write_result_file (l_result.id + "-response", l_result.mean_response_transmission_time) end end -- add conversion times (app - ems) if write_result_files_flag.was_found then write_result_file ("conversion-request", l_app_result.mean_request_transmission_time - l_ems_result.mean_request_transmission_time) write_result_file ("conversion-response", l_app_result.mean_response_transmission_time - l_ems_result.mean_response_transmission_time) end signalled (SIGTERM) end print_report (a_result: AP_PROFILING_RESULT) -- prints a report about 'a_result' to the terminal do print(a_result.id + " request transmissions: ") print_list(a_result.request_transmission_times) print(a_result.id + " request transmission mean: " + a_result.mean_request_transmission_time.out) print("%N%N") print(a_result.id + " response transmissions: ") print_list(a_result.response_transmission_times) print(a_result.id + " response transmission mean: " + a_result.mean_response_transmission_time.out) print("%N%N") end write_result_file (an_id: STRING; a_value: ANY) -- writes 'a_value' to a result file (pmd) using 'an_id' to uniquely identify the file local l_filename: STRING l_file: PLAIN_TEXT_FILE do l_filename := "aranea-profiling-" + an_id if message_length /= -1 then l_filename.append_string ("-l" + message_length.out) end l_filename.append ("-r" + repetitions.out) l_filename.append (".pmd") create l_file.make_create_read_write (l_filename) l_file.put_string ("YVALUE=") l_file.put_string (a_value.out) l_file.put_string ("%N") l_file.close end clean_results -- additional cleanup operations before the result is computed do -- remove unwanted ems stamps added by sending the AP_PROFILING_RESULTS_REQUEST_MESSAGE ems_timestamps.remove_last ems_timestamps.remove_last end feature -- command line parsing repetitions_option: AP_INTEGER_OPTION -- the nr. of repetitions message_size_option: AP_INTEGER_OPTION -- the nr of characters used as payload of the profiling messages write_result_files_flag: AP_FLAG -- should the results be exportetd to files? feature {NONE} -- Implementation main -- We need to redefine main otherwise we get blocked in the main loop -- local -- l_msg: A_GENERAL_STRING_MESSAGE do -- the following is just for internal testing -- create l_msg.make_from_string ("TEST") -- l_msg.set_reply_handler (agent handle_reply) -- print("sending ping msg") -- send_message (l_msg, aranea_prefix + echo_node_name) -- main_jobs.execute_all -- proccessing the reply is done in the main loop -- send_profiling_messages main_jobs.force (agent do send_profiling_messages end) Precursor end end