indexing description: "Provides resolver service" license: "MIT license (see ../../license.txt)" author: "Beat Strasser " date: "$Date$" revision: "$Revision$" class P2P_RESOLVER_SERVICE inherit P2P_MODULE redefine init end P2P_EXCEPTION_LOG create init feature {NONE} -- Initialization init (group: P2P_PEERGROUP; id: P2P_ID; advertisement: P2P_MODULE_IMPLEMENTATION_ADVERTISEMENT) is -- Initialize module local string_equality_tester: KL_EQUALITY_TESTER [STRING] do Precursor (group, id, advertisement) create query_handlers.make_default create response_handlers.make_default create srdi_handlers.make_default create string_equality_tester query_handlers.set_key_equality_tester (string_equality_tester) response_handlers.set_key_equality_tester (string_equality_tester) srdi_handlers.set_key_equality_tester (string_equality_tester) service_name := module_id.out query_service_parameter := peer_group.id.out_short + "ORes" response_service_parameter := peer_group.id.out_short + "IRes" srdi_service_parameter := peer_group.id.out_short + "Srdi" if peer_group.rendezvous_service = Void then logger.info ("resolver_service: Limited propagation possible due to unavailable rendezvous service, group id: " + peer_group.id.out) end end feature -- Access query_handler (name: STRING): PROCEDURE [ANY, TUPLE [P2P_RESOLVER_QUERY]] is -- Get registered query handler for given `name' require Name_valid: name /= Void and not name.is_empty do Result := query_handlers.item (name) end response_handler (name: STRING): PROCEDURE [ANY, TUPLE [P2P_RESOLVER_RESPONSE]] is -- Get registered response handler for given `name' require Name_valid: name /= Void and not name.is_empty do Result := response_handlers.item (name) end srdi_handler (name: STRING): PROCEDURE [ANY, TUPLE [P2P_RESOLVER_SRDI]] is -- Get registered srdi handler for given `name' require Name_valid: name /= Void and not name.is_empty do Result := srdi_handlers.item (name) end service_name: STRING query_service_parameter: STRING response_service_parameter: STRING srdi_service_parameter: STRING feature -- Element change extend_handler (name: STRING; a_query_handler: PROCEDURE [ANY, TUPLE [P2P_RESOLVER_QUERY]]; a_response_handler: PROCEDURE [ANY, TUPLE [P2P_RESOLVER_RESPONSE]]) is -- Register query and response handler under the given `name' require Name_valid: name /= Void and not name.is_empty Query_handler_valid: a_query_handler /= Void Response_handler_valid: a_response_handler /= Void do query_handlers.force (a_query_handler, name) response_handlers.force (a_response_handler, name) ensure Query_handler_set: query_handler (name) = a_query_handler Response_handler_set: response_handler (name) = a_response_handler end extend_srdi_handler (name: STRING; a_srdi_handler: PROCEDURE [ANY, TUPLE [P2P_RESOLVER_SRDI]]) is -- Register `a_srdi_handler' under the given `name' require Name_valid: name /= Void and not name.is_empty Srdi_handler_valid: a_srdi_handler /= Void do srdi_handlers.force (a_srdi_handler, name) ensure Srdi_handler_set: srdi_handler (name) = a_srdi_handler end feature -- Removal prune_handler (name: STRING) is -- Unregister query and response handler for a given `name' require Name_valid: name /= Void and not name.is_empty do query_handlers.remove (name) response_handlers.remove (name) ensure Query_handler_set: query_handler (name) = Void Response_handler_set: response_handler (name) = Void end prune_srdi_handler (name: STRING) is -- Unregister srdi handler for a given `name' require Name_valid: name /= Void and not name.is_empty do srdi_handlers.remove (name) ensure Srdi_handler_set: srdi_handler (name) = Void end feature -- Basic operations send_query (destination_peer: P2P_PEER_ID; query: P2P_RESOLVER_QUERY) is -- Propagate given query to specified address require Module_ok: module_status = start_ok or module_status = suspended Peer_valid: destination_peer /= Void and destination_peer.is_valid Query_valid: query /= Void and query.is_valid local query_msg: P2P_MESSAGE addr: P2P_ENDPOINT_ADDRESS do logger.debugging ("resolver_service: Sending resolver query to peer: " + destination_peer.out + ", handler: " + query.handler_name + ", id: " + query.query_id.out + ", group id: " + peer_group.id.out) query_msg := query_message (query) create addr.make_with_id (destination_peer, service_name, query_service_parameter) peer_group.endpoint_service.send_message_mangled (addr, query_msg, peer_group.id) logger.info ("resolver_service: Sent resolver query to peer: " + destination_peer.out + ", handler: " + query.handler_name + ", id: " + query.query_id.out + ", group id: " + peer_group.id.out) end propagate_query (query: P2P_RESOLVER_QUERY) is -- Propagate given query require Module_ok: module_status = start_ok or module_status = suspended Query_valid: query /= Void and query.is_valid local query_msg: P2P_MESSAGE do query_msg := query_message (query) if peer_group.rendezvous_service /= Void then -- propagate via rendezvous service logger.debugging ("resolver_service: Propagating resolver query with rendezvous service to handler: " + query.handler_name + ", id: " + query.query_id.out + ", group id: " + peer_group.id.out) peer_group.rendezvous_service.propagate_in_group (query_msg, service_name, query_service_parameter, peer_group.rendezvous_service.ttl_max) peer_group.rendezvous_service.propagate_to_neighbours (query_msg, service_name, query_service_parameter, max_hop_count) else -- propagate directly with endpoint service (to local network) logger.debugging ("resolver_service: Propagating resolver query to neighbours to handler: " + query.handler_name + ", id: " + query.query_id.out + ", group id: " + peer_group.id.out) peer_group.endpoint_service.propagate_mangled (query_msg, service_name, query_service_parameter, peer_group.id) end logger.info ("resolver_service: Propagated resolver query to handler: " + query.handler_name + ", id: " + query.query_id.out + ", group id: " + peer_group.id.out) end send_response (destination_peer: P2P_PEER_ID; response: P2P_RESOLVER_RESPONSE) is -- Send given response to specified address require Module_ok: module_status = start_ok or module_status = suspended Peer_valid: destination_peer /= Void and destination_peer.is_valid Response_valid: response /= Void and response.is_valid local response_msg: P2P_MESSAGE addr: P2P_ENDPOINT_ADDRESS do logger.debugging ("resolver_service: Sending resolver response to peer: " + destination_peer.out + ", handler: " + response.handler_name + ", id: " + response.query_id.out + ", group id: " + peer_group.id.out) response_msg := response_message (response) create addr.make_with_id (destination_peer, service_name, response_service_parameter) peer_group.endpoint_service.send_message_mangled (addr, response_msg, peer_group.id) logger.info ("resolver_service: Sent resolver response to peer: " + destination_peer.out + ", handler: " + response.handler_name + ", id: " + response.query_id.out + ", group id: " + peer_group.id.out) end propagate_response (response: P2P_RESOLVER_RESPONSE) is -- Propagate given query require Module_ok: module_status = start_ok or module_status = suspended Response_valid: response /= Void and response.is_valid local response_msg: P2P_MESSAGE do response_msg := response_message (response) if peer_group.rendezvous_service /= Void then -- propagate via rendezvous service logger.debugging ("resolver_service: Propagating resolver response with rendezvous service to handler: " + response.handler_name + ", id: " + response.query_id.out + ", group id: " + peer_group.id.out) peer_group.rendezvous_service.propagate (response_msg, service_name, response_service_parameter, peer_group.rendezvous_service.ttl_max) else -- propagate directly with endpoint service (to local network) logger.debugging ("resolver_service: Propagating resolver response to neighbours to handler: " + response.handler_name + ", id: " + response.query_id.out + ", group id: " + peer_group.id.out) peer_group.endpoint_service.propagate_mangled (response_msg, service_name, response_service_parameter, peer_group.id) end logger.info ("resolver_service: Propagated resolver response to handler: " + response.handler_name + ", id: " + response.query_id.out + ", group id: " + peer_group.id.out) end send_srdi_message (destination_peer: P2P_PEER_ID; srdi: P2P_RESOLVER_SRDI) is -- Send given SRDI message to specified peer require Module_ok: module_status = start_ok or module_status = suspended Peer_valid: destination_peer /= Void and destination_peer.is_valid Srdi_valid: srdi /= Void and srdi.is_valid local srdi_msg: P2P_MESSAGE addr: P2P_ENDPOINT_ADDRESS do logger.debugging ("resolver_service: Sending resolver SRDI message to peer: " + destination_peer.out + ", handler: " + srdi.handler_name + ", group id: " + peer_group.id.out) srdi_msg := srdi_message (srdi) create addr.make_with_id (destination_peer, service_name, srdi_service_parameter) peer_group.endpoint_service.send_message_mangled (addr, srdi_msg, peer_group.id) logger.info ("resolver_service: Sent resolver SRDI message to peer: " + destination_peer.out + ", handler: " + srdi.handler_name + ", group id: " + peer_group.id.out) end propagate_srdi (srdi: P2P_RESOLVER_SRDI) is -- Propagate given SRDI message require Module_ok: module_status = start_ok or module_status = suspended Srdi_valid: srdi /= Void and srdi.is_valid local srdi_msg: P2P_MESSAGE do srdi_msg := srdi_message (srdi) if peer_group.rendezvous_service /= Void then -- propagate via rendezvous service logger.debugging ("resolver_service: Propagating resolver SRDI message with rendezvous service to handler: " + srdi.handler_name + ", group id: " + peer_group.id.out) peer_group.rendezvous_service.propagate_in_group (srdi_msg, service_name, srdi_service_parameter, peer_group.rendezvous_service.ttl_max) peer_group.rendezvous_service.propagate_to_neighbours (srdi_msg, service_name, srdi_service_parameter, max_hop_count) else -- propagate directly with endpoint service (to local network) logger.debugging ("resolver_service: Propagating resolver SRDI mesasge to neighbours to handler: " + srdi.handler_name + ", group id: " + peer_group.id.out) peer_group.endpoint_service.propagate_mangled (srdi_msg, service_name, srdi_service_parameter, peer_group.id) end logger.info ("resolver_service: Propagated resolver SRDI message to handler: " + srdi.handler_name + ", group id: " + peer_group.id.out) end process_query_message (msg: P2P_MESSAGE; src, dest: P2P_ENDPOINT_ADDRESS) is -- Process incoming query message require Module_ok: module_status = start_ok or module_status = suspended Message_valid: msg /= Void Source_valid: src /= Void and src.is_valid Destination_valid: dest /= Void and dest.is_valid and equal (dest.service_name, service_name) and equal (dest.service_parameter, query_service_parameter) local handler_failed: BOOLEAN el_query: P2P_MESSAGE_ELEMENT query: P2P_RESOLVER_QUERY repropagate: BOOLEAN do if not handler_failed then -- get resolver query el_query := msg.element_by_namespace_and_name (msg.namespace_jxta, query_service_parameter) if el_query /= Void then create query.parse_from_string (el_query.content) if query.is_valid then -- check hops if query.hop_count > max_hop_count then logger.error ("resolver_service: Discarding incoming resolver query, too many hops visited, handler: " + query.handler_name + ", query id: " + query.query_id.out + ", group id: " + peer_group.id.out) repropagate := False else -- publish source peer route advertisement if query.source_peer_route /= Void and peer_group.discovery_service /= Void then if query.source_peer_route.destination_peer_id = Void then query.source_peer_route.set_destination_peer_id (query.source_peer_id) end query.source_peer_route.set_lifetime_relative (query.source_peer_route.expiration_time) peer_group.discovery_service.publish_advertisement_locally (query.source_peer_route) end -- call handler if query_handlers.has (query.handler_name) then query_handlers.item (query.handler_name).call ([query]) repropagate := query.repropagate logger.info ("resolver_service: Passed resolver query to handler: " + query.handler_name + ", query id: " + query.query_id.out + ", repropagate: " + repropagate.out + ", group id: " + peer_group.id.out) else repropagate := True logger.error ("resolver_service: No handler registered for: " + query.handler_name + ", query id: " + query.query_id.out + ", repropagate: True, group id: " + peer_group.id.out) end end -- repropagate if hop_count not exceeded and rendezvous peer if repropagate and query.hop_count < max_hop_count and peer_group.rendezvous_service /= Void and peer_group.rendezvous_service.is_rendezvous then logger.info ("resolver_service: Repropagating resolver query, handler: " + query.handler_name + ", query id: " + query.query_id.out + ", group id: " + peer_group.id.out) -- update hop count and repropagate query.increment_hop_count el_query.set_content (query.out) peer_group.rendezvous_service.propagate (msg, service_name, query_service_parameter, peer_group.rendezvous_service.ttl_max) end end end end rescue handler_failed := True log_exceptions retry end process_response_message (msg: P2P_MESSAGE; src, dest: P2P_ENDPOINT_ADDRESS) is -- Pass the incoming response message to the appropriate handler require Module_ok: module_status = start_ok or module_status = suspended Message_valid: msg /= Void Source_valid: src /= Void and src.is_valid Destination_valid: dest /= Void and dest.is_valid and equal (dest.service_name, service_name) and equal (dest.service_parameter, response_service_parameter) local handler_failed: BOOLEAN el_response: P2P_MESSAGE_ELEMENT response: P2P_RESOLVER_RESPONSE do if not handler_failed then -- get resolver response el_response := msg.element_by_namespace_and_name (msg.namespace_jxta, response_service_parameter) if el_response /= Void then create response.parse_from_string (el_response.content) if response.is_valid then if response_handlers.has (response.handler_name) then -- call handler response_handlers.item (response.handler_name).call ([response]) logger.info ("resolver_service: Passed resolver response to handler: " + response.handler_name + ", query id: " + response.query_id.out +", group id: " + peer_group.id.out) else logger.error ("resolver_service: No handler found for resolver response to handler: " + response.handler_name + ", query id: " + response.query_id.out +", group id: " + peer_group.id.out) end end end end rescue handler_failed := True log_exceptions retry end process_srdi_message (msg: P2P_MESSAGE; src, dest: P2P_ENDPOINT_ADDRESS) is -- Pass the incoming srdi message to the appropriate handler require Module_ok: module_status = start_ok or module_status = suspended Message_valid: msg /= Void Source_valid: src /= Void and src.is_valid Destination_valid: dest /= Void and dest.is_valid and equal (dest.service_name, service_name) and equal (dest.service_parameter, srdi_service_parameter) local handler_failed: BOOLEAN el_srdi: P2P_MESSAGE_ELEMENT srdi: P2P_RESOLVER_SRDI do if not handler_failed then -- get resolver srdi el_srdi := msg.element_by_namespace_and_name (msg.namespace_jxta, srdi_service_parameter) if el_srdi /= Void then create srdi.parse_from_string (el_srdi.content) if srdi.is_valid then if srdi_handlers.has (srdi.handler_name) then -- call handler srdi_handlers.item (srdi.handler_name).call ([srdi]) logger.info ("resolver_service: Passed resolver srdi to handler: " + srdi.handler_name + ", group id: " + peer_group.id.out) else logger.error ("resolver_service: No handler found for resolver srdi to handler: " + srdi.handler_name + ", group id: " + peer_group.id.out) end end end end rescue handler_failed := True log_exceptions retry end start (args: ARRAY [STRING]) is -- Start module do if module_status = initializing then -- Register endpoint listeners peer_group.endpoint_service.extend_service (service_name, query_service_parameter, agent process_query_message) peer_group.endpoint_service.extend_service (service_name, response_service_parameter, agent process_response_message) peer_group.endpoint_service.extend_service (service_name, srdi_service_parameter, agent process_srdi_message) end module_status := start_ok end suspend is -- Suspend module do module_status := suspended end stop is -- Stop module do -- Unregister endpoint listeners peer_group.endpoint_service.prune_service (service_name, query_service_parameter) peer_group.endpoint_service.prune_service (service_name, response_service_parameter) peer_group.endpoint_service.prune_service (service_name, srdi_service_parameter) module_status := stop_ok end feature {NONE} -- Implementation query_handlers: DS_HASH_TABLE [PROCEDURE [ANY, TUPLE [P2P_RESOLVER_QUERY]], STRING] response_handlers: DS_HASH_TABLE [PROCEDURE [ANY, TUPLE [P2P_RESOLVER_RESPONSE]], STRING] srdi_handlers: DS_HASH_TABLE [PROCEDURE [ANY, TUPLE [P2P_RESOLVER_SRDI]], STRING] Max_hop_count: INTEGER is 2 check_dependencies (a_parent: P2P_PEERGROUP): BOOLEAN -- Are all needed dependencies met? do Result := a_parent.endpoint_service /= Void end query_message (a_query: P2P_RESOLVER_QUERY): P2P_MESSAGE is -- Package query into a endpoint message require Query_valid: a_query /= Void and a_query.is_valid local el_query: P2P_MESSAGE_ELEMENT route: P2P_ROUTE_ADVERTISEMENT do if a_query.source_peer_route = Void and a_query.source_peer_id.is_equal (peer_group.peer_id) then -- add our route advertisement route ?= peer_group.peer_advertisement.service_parameter (endpoint_mcid) if route /= Void then route.set_destination_peer_id (peer_group.peer_id) a_query.set_source_peer_route (route) end end create Result.make create el_query.make_xml (Result.namespace_jxta, query_service_parameter, Void, a_query.out) Result.extend (el_query) ensure Result_set: Result /= Void end response_message (a_response: P2P_RESOLVER_RESPONSE): P2P_MESSAGE is -- Package response into a endpoint message require Response_valid: a_response /= Void and a_response.is_valid local el_response: P2P_MESSAGE_ELEMENT do create Result.make create el_response.make_xml (Result.namespace_jxta, response_service_parameter, Void, a_response.out) Result.extend (el_response) ensure Result_set: Result /= Void end srdi_message (a_srdi: P2P_RESOLVER_SRDI): P2P_MESSAGE is -- Package SRDI message into a endpoint message require Srdi_valid: a_srdi /= Void and a_srdi.is_valid local el_srdi: P2P_MESSAGE_ELEMENT do create Result.make -- FIXME 20070108 beatstr: Would be nice if payload would be compressed (JXTA/J2SE does that, application/gzip) create el_srdi.make_xml (Result.namespace_jxta, srdi_service_parameter, Void, a_srdi.out) Result.extend (el_srdi) ensure Result_set: Result /= Void end end