indexing description: "Base class for an orgio node." author: "Patrick Ruckstuhl " date: "$Date$" revision: "$Revision$" deferred class O_NODE inherit O_SHARED_LOGGERS P2P_CONSTANTS export {NONE} all end O_CONSTANTS POSIX_SIGNAL_HANDLER redefine signal_switch_is_real_singleton end POSIX_CONSTANTS export {NONE} all end POSIX_DAEMON rename exit as daemon_exit, sleep as daemon_sleep, pid as daemon_pid end ARGUMENT_OPTION_PARSER rename make as make_argument_parser, execute as execute_argument_parser export {NONE} all redefine copyright end EXCEPTIONS export {NONE} all end feature {NONE} -- Initialization make is -- Start node local l_exec: EXECUTION_ENVIRONMENT do create start_time.make_now_utc create job_queue.make -- store the current working directory before daemonizing -- to set it again after we daemonized create l_exec working_directory := l_exec.current_working_directory -- setup and launch argument parser make_argument_parser (False, True) set_add_application_switches (False) set_use_separated_switch_values (True) -- if we have a valid command line, launch the progam -- depending on the arguments directly or daemonized execute_argument_parser (agent do -- launch daemonized? if is_daemonize then detach else execute end end) end execute is -- Execute as daemon. local l_signal_handler: POSIX_SIGNAL l_exec: EXECUTION_ENVIRONMENT l_pid_file: PLAIN_TEXT_FILE do if is_daemonize then -- write pid file create l_pid_file.make ("/var/run/origo/"+peer_name+".pid") if l_pid_file.exists and then l_pid_file.is_writable or else l_pid_file.is_creatable then l_pid_file.open_write l_pid_file.put_integer (daemon_pid) l_pid_file.close end -- set working directory again (it's lost because of the fork) create l_exec l_exec.change_working_directory (working_directory) end -- setup sigterm signal handler create l_signal_handler.make (SIGTERM) l_signal_handler.set_handler (Current) l_signal_handler.apply create_loggers origo_logger.info ("Starting Origo node") peer_start end peer_start is -- Start vamPeer platform local failed: BOOLEAN l_adv: P2P_PEER_ADVERTISEMENT do if not failed then -- create vampeer platform origo_logger.info ("Loading/configuring peer platform") configure_platform if vampeer.module_status = vampeer.initializing then origo_logger.info ("Peer platform successfully initialized") peer_module ?= opg.lookup_module (p2p_module_name) check Peer_module_loaded: peer_module /= Void end register_message_handlers -- Set the expiration time to 30 minutes l_adv := opg.discovery_service.local_peer_advertisement (opg.peer_id) if l_adv /= Void then l_adv.set_expiration_time (30*60*100) end opg.peer_advertisement.set_expiration_time (30*60*1000) -- now start platform origo_logger.info ("Starting peer network now") vampeer.start if vampeer.module_status = vampeer.start_ok then -- peer platform is started origo_logger.info ("Origo network successfully started") origo_startup main origo_shutdown -- stop node and peer platform origo_logger.debugging ("Stopping Origo node") stop else origo_logger.error ("Error starting peer platform") end else origo_logger.error ("Error configuring peer platform") end end rescue failed := True node_logger.fatal ("Exception in peer_start: "+exception_trace) retry end origo_startup is -- Called after origo is started. do end origo_shutdown is -- Called before origo is shutdown. do end main is -- Main routine after peer platform is successfully started (main event loop) require Vampeer_started: vampeer /= Void and vampeer.module_status = vampeer.start_ok do from until is_quit loop job_queue.execute_all end rescue node_logger.fatal ("Exception in main: "+exception_trace) retry end stop is -- Stop node local failed: BOOLEAN do if not failed then -- shutdown peer platform, if started if vampeer /= Void and then (vampeer.module_status = vampeer.start_ok or vampeer.module_status = vampeer.suspended) then vampeer.stop end -- Unreference it vampeer := Void opg := Void end rescue failed := True node_logger.fatal ("Exception during stop: "+exception_trace) retry end feature -- Status is_daemonize: BOOLEAN -- Should we deamonize? once Result := has_option (daemonize_switch) end log_level: L4E_PRIORITY is -- Which log level to use. local l_level: STRING once if has_option (log_switch) then l_level := option_of_name (log_switch).value if l_level.is_equal ("debug") then Result := debug_p elseif l_level.is_equal ("info") then Result := info_p else Result := warn_p end else Result := warn_p end end listen_address: STRING is -- Which address to listen on. once if has_option (listen_switch) then Result := option_of_name (listen_switch).value else Result := "127.0.0.1" end ensure Result_not_void: Result /= Void end seed_url: STRING is -- JXTA seed url. once if has_option (seed_switch) then Result := option_of_name (seed_switch).value else Result := "http://127.0.0.1/rdv.txt" end end feature -- Access start_time: DATE_TIME -- Time of the startup vampeer: P2P_PLATFORM opg: P2P_PEERGROUP Peer_name: STRING is -- Node's peer name deferred ensure Result_set: Result /= Void and not Result.is_empty end Peer_description: STRING is -- Node's peer description deferred end configuration_directory: STRING is -- Vampeers configuration directory once Result := ".jxta_origo" ensure Result_set: Result /= Void end peer_module: O_P2P_MODULE -- Peer module. copyright: STRING is "Copyright Patrick Ruckstuhl 2007. All Rights Reserved." feature -- Message sending send_message_reply (a_message, a_original_message: O_MESSAGE) -- Send a_message as a reply to a_original_message. require a_message_ok: a_message /= Void a_original_message_ok: a_original_message /= Void peer_module_set: peer_module /= Void do if not peer_module.send_message_reply (a_message, a_original_message) then origo_logger.error ("Could not send message reply.") end end feature -- Handler registration register_catch_all_message_handler (a_handler: PROCEDURE [ANY, TUPLE [O_MESSAGE]]) is -- Register a catch all message handler that receives all received messages. require a_handler_ok: a_handler /= Void peer_module_set: peer_module /= Void do peer_module.register_catch_all_message_handler (a_handler) end register_message_handler (a_namespace, a_type: STRING; a_handler: PROCEDURE [ANY, TUPLE [O_MESSAGE]]) is -- Register a message handler to deal with a_type in a_namespace. require a_namespace_ok: a_namespace /= Void and then not a_namespace.is_empty a_type_ok: a_type /= Void and then not a_type.is_empty a_handler_ok: a_handler /= Void peer_module_set: peer_module /= Void do peer_module.register_message_handler (a_namespace, a_type, a_handler) end register_main_loop_message_handler (a_namespace, a_type: STRING; a_handler: PROCEDURE [ANY, TUPLE [O_MESSAGE]]) is -- Register a message handler that should be executed in the main loop to deal with a_type in a_namespace. require a_namespace_ok: a_namespace /= Void and then not a_namespace.is_empty a_type_ok: a_type /= Void and then not a_type.is_empty a_handler_ok: a_handler /= Void peer_module_set: peer_module /= Void do -- create a wrapper agent that adds the call to the handler to the job_queue register_message_handler (a_namespace, a_type, agent (aa_handler: PROCEDURE [ANY, TUPLE [O_MESSAGE]]; a_msg: O_MESSAGE) do job_queue.force (agent aa_handler.call ([a_msg])) end (a_handler, ?)) end feature {NONE} -- Argument parsing switches: ARRAYED_LIST [ARGUMENT_SWITCH] -- Argument switches once create Result.make (4) Result.extend (create {ARGUMENT_SWITCH}.make (daemonize_switch, "Daemonize after startup?", True, False)) Result.extend (create {ARGUMENT_VALUE_SWITCH}.make (log_switch, "Log level.", True, False, "LEVEL", "Log level (debug, info, warning).", False)) Result.extend (create {ARGUMENT_VALUE_SWITCH}.make (listen_switch, "Listen address", True, False, "IP Address", "Address to listen on (127.0.0.1).", False)) Result.extend (create {ARGUMENT_VALUE_SWITCH}.make (seed_switch, "JXTA seed address", True, False, "URL", "URL of the JXTA seed (http://127.0.0.1/rdv.txt).", False)) end daemonize_switch: STRING is "d" log_switch: STRING is "log" listen_switch: STRING is "listen" seed_switch: STRING is "seed" feature {NONE} -- Callbacks signalled (signal_value: INTEGER) is -- We received a signal. do if signal_value = SIGTERM then -- add a job to the main queue that set's quit to true. job_queue.force (agent do is_quit := True end) end end feature {NONE} -- Implementation working_directory: STRING -- Working directory to use. is_quit: BOOLEAN -- Should we quit? register_message_handlers is -- Register message handlers. require peer_module_set: peer_module /= Void do end job_queue: JOB_QUEUE -- Jobs to process in main thread. create_loggers is -- Create log hierarchy and set `logger' local l_node_appender, l_vampeer_appender, l_origo_appender: L4E_FILE_APPENDER l_log_layout: L4E_DATE_TIME_LAYOUT do -- create log hierarchy create l_log_layout -- create node logger create l_node_appender.make ("node.log", true) l_node_appender.set_layout (l_log_layout) node_logger.set_priority (log_level) node_logger.add_appender (l_node_appender) -- create origo logger create l_origo_appender.make ("origo.log", true) l_origo_appender.set_layout (l_log_layout) origo_logger.set_priority (log_level) origo_logger.add_appender (l_origo_appender) -- create vampeer logger create l_vampeer_appender.make ("vampeer.log", true) l_vampeer_appender.set_layout (l_log_layout) vampeer_logger.set_priority (log_level) vampeer_logger.add_appender (l_vampeer_appender) end configure_platform is -- Configure platform and initialize net peer group require Configuration_directory_valid: configuration_directory /= Void local opg_mia: P2P_MODULE_IMPLEMENTATION_ADVERTISEMENT do create vampeer.make (configuration_directory, vampeer_logger) if not vampeer.is_configured or not vampeer.peer_name.is_equal (peer_name) then -- configure vampeer platform vampeer.configure (platform_configuration) end -- now create our private net peer group, the Origo Peer Group (OPG) if not vampeer.cache_manager.has_module_implementation_advertisement (origo_peergroup_msid) then vampeer_logger.info ("vampeer_platform: Creating new Origo peer group module implementation advertisement") opg_mia := peergroup_implementation_advertisement vampeer.cache_manager.store_module_implementation_advertisement (opg_mia) end opg ?= vampeer.load_net_peergroup (origo_peergroup_id, origo_peergroup_msid, agent peergroup_loader) if opg /= Void and opg.module_status /= opg.init_failed then opg.group_advertisement.set_name (group_name) opg.group_advertisement.set_description (group_description) vampeer.cache_manager.store_peergroup_advertisement (opg.group_advertisement) vampeer_logger.info ("vampeer_platform: Origo peer group initialized") end end peergroup_loader (a_pg: P2P_PEERGROUP; an_id: P2P_ID; a_mia: P2P_MODULE_IMPLEMENTATION_ADVERTISEMENT): P2P_MODULE is -- Load OPG require Peergroup_valid: a_pg /= Void Id_valid: an_id /= Void and an_id.is_valid Mia_valid: a_mia /= Void and a_mia.is_valid deferred end platform_configuration: P2P_CONFIGURATION is -- Create platform configuration local opg_id: P2P_PEERGROUP_UUID pid: P2P_PEER_ID tcp_conf: P2P_TCP_CONFIGURATION rdv_conf: P2P_RENDEZVOUS_CONFIGURATION do -- create peer id opg_id ?= origo_peergroup_id create pid.make_new_with_group (opg_id.uuid) -- create configuration with peer id create Result.make_with_id (pid) Result.set_name (peer_name) if peer_description /= Void then Result.set_description (peer_description) end -- add tcp configuration create tcp_conf.make_auto tcp_conf.set_multicast_enabled (False) tcp_conf.set_interface_address (listen_address) Result.add_service_parameter (transport_tcp_mcid, tcp_conf) -- add rendezvous configuration create rdv_conf.make rdv_conf.add_seed_uri (seed_url) Result.add_service_parameter (rendezvous_mcid, rdv_conf) ensure Result_set: Result /= Void and Result.is_valid end peergroup_implementation_advertisement: P2P_MODULE_IMPLEMENTATION_ADVERTISEMENT is -- The Origo peer group impl adv require Vampeer_valid: vampeer /= Void local params_doc: P2P_XML_DOCUMENT mia: P2P_MODULE_IMPLEMENTATION_ADVERTISEMENT do Result := vampeer.peergroup_implementation_advertisement (origo_peergroup_msid, group_code_name, group_description) params_doc := Result.parameter.document -- Add origo core service mia := vampeer.default_implementation_advertisement (origo_core_msid, p2p_module_code_name, p2p_module_description) params_doc.create_root_child_element ("Svc", namespace_empty) params_doc.add_child_element (params_doc.last_element, mia.document.document.root_element) end feature {NONE} -- Contracts signal_switch_is_real_singleton: BOOLEAN is True invariant job_queue_not_void: job_queue /= Void working_directory_not_void: working_directory /= Void end