indexing description: "Router transport module" license: "MIT license (see ../../../license.txt)" author: "Beat Strasser " date: "$Date$" revision: "$Revision$" class P2P_ENDPOINT_ROUTER inherit P2P_MESSAGE_SENDER_TRANSPORT redefine init, check_dependencies, start, stop end P2P_EXCEPTION_LOG create init feature {NONE} -- Initialization init (group: P2P_PEERGROUP; id: P2P_ID; advertisement: like implementation_advertisement) is -- Initialize module do Precursor (group, id, advertisement) create local_address.make_with_id (peer_group.peer_id, Void, Void) discovery_service := peer_group.discovery_service end feature -- Access name: STRING is "jxta" -- Protocol name feature -- Basic operations process_incoming_message (message: P2P_MESSAGE; source, destination: P2P_ENDPOINT_ADDRESS) is -- Process incoming message (to service `EndpointRouter' via endpoint service) require Status_started: module_status = start_ok or module_status = suspended Message_valid: message /= Void Source_valid: source /= Void and source.is_valid Destination_valid: destination /= Void and destination.is_valid local failed: BOOLEAN el_erm, el_addr: P2P_MESSAGE_ELEMENT erm: P2P_ENDPOINT_ROUTER_MESSAGE do if not failed then logger.debugging ("endpoint_router: Processing incoming message, group id: " + peer_group.id.out) -- get endpoint router message element el_erm := message.element_by_namespace_and_name (message.namespace_jxta, jxta_endpoint_router_message_element) if el_erm /= Void then create erm.parse_from_string (el_erm.content) if not erm.is_valid then erm := Void end end if erm /= Void then -- remove endpoint router message element message.remove (el_erm) -- rewrite endpoint source address to original value create el_addr.make_string (message.namespace_jxta, endpoint_service.endpoint_source_address_element_name, Void, erm.source.out) message.replace (el_addr) -- rewrite endpoint destination address to original value create el_addr.make_string (message.namespace_jxta, endpoint_service.endpoint_destination_address_element_name, Void, erm.destination.out) message.replace (el_addr) -- now pass the message to the real service logger.debugging ("endpoint_router: Passing message to: " + erm.destination.out + ", group id: " + peer_group.id.out) endpoint_service.demux (message) else logger.error ("endpoint_router: No valid endpoint router message: discarding message, group id: " + peer_group.id.out) end else logger.error ("endpoint_router: Error processing incoming message, group id: " + peer_group.id.out) end rescue -- catch all failures from called modules/services during processing failed := True log_exceptions retry end ping (address: P2P_ENDPOINT_ADDRESS): BOOLEAN is -- Is given peer reachable? require else Status_ok: module_status = start_ok or (module_status = suspended and peer_group.endpoint_service.module_status = start_ok) Address_processable: address /= Void and address.protocol_name.is_equal (name) do Result := local_address.is_equal_without_service (address) or find_local_gateway (address, True) /= Void end has_open_connection (address: P2P_ENDPOINT_ADDRESS): BOOLEAN is -- Does open connection to peer with given `address' exist? require else Status_ok: module_status = start_ok or (module_status = suspended and peer_group.endpoint_service.module_status = start_ok) Address_processable: address /= Void and address.protocol_name.is_equal (name) do Result := local_address.is_equal_without_service (address) or find_local_gateway (address, False) /= Void end propagate (a_message: P2P_MESSAGE; a_service_name, a_service_parameter: STRING) is -- Don't do anything require else Status_ok: module_status = start_ok or (module_status = suspended and peer_group.endpoint_service.module_status = start_ok) Message_valid: a_message /= Void Service_valid: a_service_name /= Void do end send_message (address: P2P_ENDPOINT_ADDRESS; message: P2P_MESSAGE) is -- Send message from our peer (only!) to a peer require else Status_ok: module_status = start_ok or (module_status = suspended and peer_group.endpoint_service.module_status = start_ok) Address_processable: address /= Void and address.protocol_name.is_equal (name) and address.service_name /= Void Message_valid: message /= Void do send_message_with_originator (address, message, false) end send_message_with_originator (address: P2P_ENDPOINT_ADDRESS; message: P2P_MESSAGE; foreign_sender: BOOLEAN) is -- Send message to a peer and seek for routes if we're the originator require else Status_ok: module_status = start_ok or (module_status = suspended and peer_group.endpoint_service.module_status = start_ok) Address_processable: address /= Void and address.protocol_name.is_equal (name) and address.service_name /= Void Message_valid: message /= Void local el_source_addr, el_dest_addr: P2P_MESSAGE_ELEMENT gateway: P2P_ENDPOINT_ADDRESS do -- Loopback? if local_address.is_equal_without_service (address) then -- add source/destination data to message create el_source_addr.make_string (message.namespace_jxta, endpoint_service.endpoint_source_address_element_name, Void, local_address.out_without_service) message.replace (el_source_addr) create el_dest_addr.make_string (message.namespace_jxta, endpoint_service.endpoint_destination_address_element_name, Void, address.out) message.replace (el_dest_addr) endpoint_service.demux (message) else -- Find real endpoint address and try to connect directly gateway := find_local_gateway (address, True) if gateway /= Void then -- add service parameters to address create gateway.make_with_service (gateway.protocol_name, gateway.protocol_address, address.service_name, address.service_parameter) -- direct connection is possible, so send message now endpoint_service.send_message (gateway, message) elseif not foreign_sender then -- seek for a route, if we're the message's originator -- TODO 20061124 beatstr: now look for a long route (with router queries using resolver service, try several times, max 30s), -- add EndpointRouterMsg element to message logger.warn ("endpoint_router: Unable to find route to peer (not yet implemented), address: " + address.out + ", group id: " + peer_group.id.out) end end end start (args: ARRAY [STRING]) is -- start module do if module_status = initializing then -- register service listener endpoint_service.extend_service (service_name, Void, agent process_incoming_message) end Precursor (args) end stop is -- stop module do -- unregister service listener endpoint_service.prune_service (service_name, Void) Precursor end feature {NONE} -- Implementation Jxta_endpoint_router_message_element: STRING is "EndpointRouterMsg" Service_name: STRING is "EndpointRouter" discovery_service: P2P_DISCOVERY_SERVICE find_local_gateway (address: P2P_ENDPOINT_ADDRESS; connect: BOOLEAN): P2P_ENDPOINT_ADDRESS is -- Find real endpoint addresses for given peer `address', return best working endpoint or Void when direct connection impossible. -- With `connect' set, try actively to open a connection. require Address_valid: address /= Void and address.is_valid and address.protocol_name.is_equal (name) local route_advs: DS_LIST [P2P_ROUTE_ADVERTISEMENT] eas, valid_eas: DS_LIST [P2P_ENDPOINT_ADDRESS] ea: P2P_ENDPOINT_ADDRESS sender: P2P_MESSAGE_SENDER_TRANSPORT do -- get all endpoint addresses from cached route advertisement create {DS_LINKED_LIST [P2P_ENDPOINT_ADDRESS]} valid_eas.make route_advs := route_advertisements (address) from route_advs.start until route_advs.after loop eas := route_advs.item_for_iteration.destination.endpoint_addresses from eas.start until eas.after loop -- check for implemented sender transport, skipping jxta transport ea := eas.item_for_iteration sender ?= endpoint_service.message_transport (ea.protocol_name) if sender /= Void and not sender.name.is_equal (name) then if sender.has_open_connection (ea) then valid_eas.put_first (ea) else valid_eas.put_last (ea) end end if not eas.after then eas.forth end end route_advs.forth end -- try to connect until reached a peer (or use already open connection) from valid_eas.start until valid_eas.after or connection_is_open (valid_eas.item_for_iteration, connect) loop valid_eas.forth end if not valid_eas.after then Result := valid_eas.item_for_iteration end end connection_is_open (address: P2P_ENDPOINT_ADDRESS; connect: BOOLEAN): BOOLEAN is -- Is connection to `address' open? Actively try to connect when `connect' is True. do if connect then Result := endpoint_service.ping (address) else Result := endpoint_service.has_open_connection (address) end end check_dependencies (a_parent: P2P_PEERGROUP): BOOLEAN is -- Are all needed dependencies met? do Result := Precursor (a_parent) and a_parent.discovery_service /= Void end route_advertisements (address: P2P_ENDPOINT_ADDRESS): DS_LIST [P2P_ROUTE_ADVERTISEMENT] is -- List of all route advertisements for given `address' require Address_valid: address /= Void and address.is_valid and address.protocol_name.is_equal (name) local peer_advs: DS_LIST [P2P_PEER_ADVERTISEMENT] general_advs: DS_LIST [P2P_ADVERTISEMENT] route_adv: P2P_ROUTE_ADVERTISEMENT do create {DS_LINKED_LIST [P2P_ROUTE_ADVERTISEMENT]} Result.make -- fetch route advertisements from matching peer advertisement peer_advs := discovery_service.local_peer_advertisements ("PID", address.protocol_address) from peer_advs.start until peer_advs.after loop route_adv ?= peer_advs.item_for_iteration.service_parameter (peer_group.endpoint_mcid) if route_adv /= Void and route_adv.destination /= Void then Result.put_last (route_adv) end peer_advs.forth end -- fetch matching route advertisements general_advs := discovery_service.local_general_advertisements ("DstPID", address.protocol_address) from general_advs.start until general_advs.after loop route_adv ?= general_advs.item_for_iteration if route_adv /= Void and route_adv.destination /= Void then Result.put_last (route_adv) end general_advs.forth end ensure Result_set: Result /= Void end end