indexing description: "Provides Endpoint Service" license: "MIT license (see ../../license.txt)" author: "Beat Strasser " date: "$Date$" revision: "$Revision$" class P2P_ENDPOINT_SERVICE inherit P2P_MODULE redefine init end create init feature {NONE} -- Initialization init (group: P2P_PEERGROUP; id: P2P_ID; advertisement: P2P_MODULE_IMPLEMENTATION_ADVERTISEMENT) is -- Initialize module local string_equality_tester: KL_EQUALITY_TESTER [STRING] filter_equality_tester: KL_EQUALITY_TESTER [TUPLE [ FUNCTION [ANY, TUPLE [P2P_MESSAGE, P2P_ENDPOINT_ADDRESS, P2P_ENDPOINT_ADDRESS], P2P_MESSAGE], STRING, STRING]] do Precursor (group, id, advertisement) -- create data structures create string_equality_tester create filter_equality_tester create transports.make (initial_transports_count) transports.set_key_equality_tester (string_equality_tester) create services.make (initial_services_count) services.set_key_equality_tester (string_equality_tester) create incoming_message_filters.make incoming_message_filters.set_equality_tester (filter_equality_tester) create outgoing_message_filters.make outgoing_message_filters.set_equality_tester (filter_equality_tester) end feature -- Access Endpoint_source_address_element_name: STRING is "EndpointSourceAddress" Endpoint_destination_address_element_name: STRING is "EndpointDestinationAddress" Endpoint_source_peer_element_name: STRING is "EndpointHeaderSrcPeer" Group_mangling_service_prefix: STRING is "EndpointService:" message_transport (name: STRING): P2P_MESSAGE_TRANSPORT is -- Get message transport by name do if transports.has (name) then Result := transports.item (name) end end message_transports: DS_LIST [P2P_MESSAGE_TRANSPORT] is -- Get all registered message transports local cursor: DS_HASH_TABLE_CURSOR [P2P_MESSAGE_TRANSPORT, STRING] do create {DS_ARRAYED_LIST [P2P_MESSAGE_TRANSPORT]} Result.make (transports.count) from cursor := transports.new_cursor cursor.start until cursor.after loop Result.put_last (cursor.item) cursor.forth end ensure Result_existent: Result /= Void end registered_services: DS_LIST [STRING] is -- Get all registered service names local string_equality_tester: KL_EQUALITY_TESTER [STRING] do create {DS_ARRAYED_LIST [STRING]} Result.make_from_linear (services.keys) create string_equality_tester Result.set_equality_tester (string_equality_tester) ensure Result_existent: Result /= Void and Result.equality_tester /= Void end handler_name (service_name, service_parameter: STRING): STRING is -- Handler name for `service_name' and `service_parameter' require Name_valid: service_name /= Void and not service_name.has ('/') do Result := service_name.twin if service_parameter /= Void then Result.append ("/" + service_parameter) end ensure Handler_set: Result /= Void end route_advertisement: P2P_ROUTE_ADVERTISEMENT is -- Peers current route advertisement included in the peers advertisement do Result ?= peer_group.peer_advertisement.service_parameter (endpoint_mcid) end accesspoint_advertisement: P2P_ACCESSPOINT_ADVERTISEMENT is -- Peers current destination accesspoint advertisement included in the route advertisement local ra: like route_advertisement do ra := route_advertisement if ra /= Void and ra.is_valid then Result := ra.destination end end feature -- Status is_processable (address: P2P_ENDPOINT_ADDRESS): BOOLEAN is -- Is message transport for given address registered and can be handled? require Address_valid: address /= Void do Result := transports.has (address.protocol_name) end feature -- Element change extend_message_transport (transport: P2P_MESSAGE_TRANSPORT) is -- Register message transport require Name_valid: transport.name /= Void do transports.force (transport, transport.name) logger.info ("endpoint_service: Extended message transport, name: " + transport.name + ", group: " + peer_group.id.out) ensure Transport_registered: message_transport (transport.name) = transport end extend_incoming_message_filter (filter: FUNCTION [ANY, TUPLE [P2P_MESSAGE, P2P_ENDPOINT_ADDRESS, P2P_ENDPOINT_ADDRESS], P2P_MESSAGE]; namespace: STRING; name: STRING) is -- Register incoming message filter. `namespace' and `name' restrict the filter appliance to messages -- which have elements with given namespace or element names. require Filter_valid: filter /= Void local entry: TUPLE [FUNCTION [ANY, TUPLE [P2P_MESSAGE, P2P_ENDPOINT_ADDRESS, P2P_ENDPOINT_ADDRESS], P2P_MESSAGE], STRING, STRING] do entry := [filter, namespace, name] if not incoming_message_filters.has (entry) then incoming_message_filters.put_last (entry) logger.info ("endpoint_service: Extended incoming message filter, group: " + peer_group.id.out) end end extend_outgoing_message_filter (filter: FUNCTION [ANY, TUPLE [P2P_MESSAGE, P2P_ENDPOINT_ADDRESS, P2P_ENDPOINT_ADDRESS], P2P_MESSAGE]; namespace: STRING; name: STRING) is -- Register outgoing message filter. `namespace' and `name' restrict the filter appliance to messages -- which have elements with given namespace or element names. require Filter_valid: filter /= Void local entry: TUPLE [FUNCTION [ANY, TUPLE [P2P_MESSAGE, P2P_ENDPOINT_ADDRESS, P2P_ENDPOINT_ADDRESS], P2P_MESSAGE], STRING, STRING] do entry := [filter, namespace, name] if not outgoing_message_filters.has (entry) then outgoing_message_filters.put_last (entry) logger.info ("endpoint_service: Extended outgoing message filter, group: " + peer_group.id.out) end end extend_service (service_name, service_parameter: STRING; listener: PROCEDURE [ANY, TUPLE [P2P_MESSAGE, P2P_ENDPOINT_ADDRESS, P2P_ENDPOINT_ADDRESS]]) is -- Register message listener with its service name -- Silently replaces old listener for the given service name. require Name_valid: service_name /= Void and not service_name.has ('/') Listener_valid: listener /= Void local handler: STRING do handler := handler_name (service_name, service_parameter) services.force (listener, handler) if not handler.substring (1, group_mangling_service_prefix.count).is_equal (group_mangling_service_prefix) then -- register service in parent group if peer_group.parent_group.endpoint_service /= Current and peer_group.parent_group.endpoint_service /= Void then peer_group.parent_group.endpoint_service.extend_service (group_mangling_service_prefix + peer_group.id.out_short, handler, listener) elseif peer_group.parent_group.id.is_worldgroup_id then -- as we don't have a running world peer group, we register the indirect service name also here services.force (listener, handler_name (group_mangling_service_prefix + peer_group.id.out_short, handler)) end end logger.info ("endpoint_service: Extended service, name: " + handler + ", group: " + peer_group.id.out) ensure Service_registered: registered_services.has (handler_name (service_name, service_parameter)) end feature -- Removal prune_message_transport (transport: P2P_MESSAGE_TRANSPORT) is -- Unregister message transport require Transport_existent: transport /= Void do transports.remove (transport.name) logger.info ("endpoint_service: Pruned message transport, name: " + transport.name + ", group: " + peer_group.id.out) ensure Transport_pruned: message_transport (transport.name) = Void end prune_incoming_message_filter (filter_target: ANY; namespace: STRING; name: STRING) is -- Unregister incoming message filter require Filter_target_valid: filter_target /= Void do prune_message_filter (filter_target, namespace, name, True) logger.info ("endpoint_service: Pruned incoming message filter, group: " + peer_group.id.out) end prune_outgoing_message_filter (filter_target: ANY; namespace: STRING; name: STRING) is -- Unregister outgoing message filter require Filter_target_valid: filter_target /= Void do prune_message_filter (filter_target, namespace, name, False) logger.info ("endpoint_service: Pruned outgoing message filter, group: " + peer_group.id.out) end prune_service (service_name, service_parameter: STRING) is -- Unregister service and its message listener require Service_valid: service_name /= Void local handler: STRING do handler := handler_name (service_name, service_parameter) services.remove (handler) if not handler.substring (1, group_mangling_service_prefix.count).is_equal (group_mangling_service_prefix) then -- unregister service in parent group if peer_group.parent_group.endpoint_service /= Current and peer_group.parent_group.endpoint_service /= Void then peer_group.parent_group.endpoint_service.prune_service (group_mangling_service_prefix + peer_group.id.out_short, handler) elseif peer_group.parent_group.id.is_worldgroup_id then -- as we don't have a running world peer group, we unregister the indirect service name also here services.remove (handler_name (group_mangling_service_prefix + peer_group.id.out_short, handler)) end end logger.info ("endpoint_service: Pruned service, name: " + handler + ", group: " + peer_group.id.out) ensure Service_pruned: not registered_services.has (handler_name (service_name, service_parameter)) end feature -- Basic operations start (args: ARRAY [STRING]) is -- Start module do module_status := start_ok end suspend is -- Suspend module do module_status := suspended end stop is -- Stop module do module_status := stop_ok end demux (message: P2P_MESSAGE) is -- Process an incoming message and forward it to the appropriate listener require Status_started: module_status = start_ok or module_status = suspended Message_existent: message /= Void local el_src_addr, el_dst_addr, el_src_peer: P2P_MESSAGE_ELEMENT source_address, dest_address: P2P_ENDPOINT_ADDRESS filtered_message: P2P_MESSAGE listener: PROCEDURE [ANY, TUPLE [P2P_MESSAGE, P2P_ENDPOINT_ADDRESS, P2P_ENDPOINT_ADDRESS]] listener_name: STRING do -- get source and destination endpoint address el_src_addr := message.jxta_element_by_name (endpoint_source_address_element_name) el_dst_addr := message.jxta_element_by_name (endpoint_destination_address_element_name) el_src_peer := message.jxta_element_by_name (endpoint_source_peer_element_name) if el_src_addr /= Void and el_dst_addr /= Void and (el_src_peer = Void or not el_src_peer.content.is_equal (peer_group.peer_id.out)) then logger.debugging ("endpoint_service: Demuxing incoming message, group: " + peer_group.id.out + ", source: " + el_src_addr.content + ", dest: " + el_dst_addr.content) -- clean endpoint elements in message message.remove (el_src_addr) message.remove (el_dst_addr) if el_src_peer /= Void then message.remove (el_src_peer) end create source_address.make_from_uri (el_src_addr.content) create dest_address.make_from_uri (el_dst_addr.content) if source_address.is_valid and dest_address.is_valid and dest_address.service_name /= Void then -- find effective destination address (group demangling) dest_address := effective_destination_address (dest_address) -- process filters filtered_message := process_filters (message, source_address, dest_address, True) if filtered_message /= Void then -- look for a listener if dest_address.service_parameter /= Void then listener_name := dest_address.service_name + "/" + dest_address.service_parameter if services.has (listener_name) then listener := services.item (listener_name) end end if listener = Void then listener_name := dest_address.service_name if services.has (listener_name) then listener := services.item (listener_name) end end if listener /= Void then -- call listener listener.call ([filtered_message, source_address, dest_address]) logger.debugging ("endpoint_service: Message forwarded to listener: " + listener_name + ", group id: " + peer_group.id.out) else -- message discarded: no listener found logger.info ("endpoint_service: Message discarded (no listener found)" + ", group id: " + peer_group.id.out) end else -- message filtered out logger.info ("endpoint_service: Message filtered out, source: " + source_address.out + ", dest: " + dest_address.out + ", group id: " + peer_group.id.out) end else -- message discarded: invalid addresses in message or no service name set logger.info ("endpoint_service: Discarded message (invalid addresses/no service name), group id: " + peer_group.id.out) end else -- message discarded: invalid message, no addresses set logger.info ("endpoint_service: Discarded new incoming, invalid message, group id: " + peer_group.id.out) end end ping (address: P2P_ENDPOINT_ADDRESS): BOOLEAN is -- Does given peer respond? require Status_started: module_status = start_ok Address_processable: is_processable (address) local sender: P2P_MESSAGE_SENDER_TRANSPORT do logger.debugging ("endpoint_service: Pinging peer, address: " + address.out + ", group id: " + peer_group.id.out) if transports.has (address.protocol_name) then sender ?= transports.item (address.protocol_name) if sender /= Void then Result := sender.ping (address) logger.info ("endpoint_service: Pinged peer, address: " + address.out + ", group id: " + peer_group.id.out + ", responded: " + Result.out) end end end has_open_connection (address: P2P_ENDPOINT_ADDRESS): BOOLEAN is -- Does an open connection to peer with given `address' exist? require Status_started: module_status = start_ok Address_processable: is_processable (address) local sender: P2P_MESSAGE_SENDER_TRANSPORT do if transports.has (address.protocol_name) then sender ?= transports.item (address.protocol_name) Result := sender /= Void and sender.has_open_connection (address) end logger.debugging ("endpoint_service: Looked for open connection, address: " + address.out + ", is open: " + Result.out + ", group id: " + peer_group.id.out) end propagate (message: P2P_MESSAGE; service_name, service_parameter: STRING) is -- Apply outgoing filters and propagate message with every registered transport protocol require Status_started: module_status = start_ok Message_valid: message /= Void Service_valid: service_name /= Void local modified_message, filtered_message: P2P_MESSAGE sender: P2P_MESSAGE_SENDER_TRANSPORT el_source_peer: P2P_MESSAGE_ELEMENT destination: P2P_ENDPOINT_ADDRESS cursor: DS_HASH_TABLE_CURSOR [P2P_MESSAGE_TRANSPORT, STRING] do if service_parameter /= Void then logger.debugging ("endpoint_service: Propagating message to service: " + service_name + ", param: " + service_parameter + ", group id: " + peer_group.id.out) else logger.debugging ("endpoint_service: Propagating message to service: " + service_name + ", group id: " + peer_group.id.out) end -- add source peer element modified_message := message.twin -- we shouldn't change callers message create el_source_peer.make_string (modified_message.namespace_jxta, endpoint_source_peer_element_name, Void, peer_group.peer_id.out) modified_message.replace (el_source_peer) -- loop through all protocols from cursor := transports.new_cursor cursor.start until cursor.after loop sender ?= cursor.item if sender /= Void then create destination.make_with_id (peer_group.id, service_name, service_parameter) filtered_message := process_filters (modified_message, sender.local_address, destination, False) sender.propagate (filtered_message, service_name, service_parameter) logger.debugging ("endpoint_service: Message propagated with transport: " + sender.name + ", group id: " + peer_group.id.out) end cursor.forth end end propagate_mangled (message: P2P_MESSAGE; service_name, service_parameter: STRING; redirection: P2P_PEERGROUP_ID) is -- Apply outgoing filters and propagate message with every registered transport protocol using a mangled address using `redirection' group require Status_started: module_status = start_ok Message_valid: message /= Void Service_valid: service_name /= Void Redirection_valid: redirection /= Void do propagate (message, group_mangling_service_prefix + redirection.out_short, handler_name (service_name, service_parameter)) end send_message (address: P2P_ENDPOINT_ADDRESS; message: P2P_MESSAGE) is -- Apply outgoing filters and send message to a peer require Status_started: module_status = start_ok Address_processable: is_processable (address) and address.service_name /= Void Message_valid: message /= Void local sender: P2P_MESSAGE_SENDER_TRANSPORT filtered_message: P2P_MESSAGE do logger.debugging ("endpoint_service: Sending message to address: " + address.out + ", group id: " + peer_group.id.out) if transports.has (address.protocol_name) then sender ?= transports.item (address.protocol_name) if sender /= Void then filtered_message := process_filters (message.twin, sender.local_address, address, False) sender.send_message (address, filtered_message) logger.info ("endpoint_service: Message sent (queued) to address: " + address.out + ", group id: " + peer_group.id.out) end end end send_message_mangled (address: P2P_ENDPOINT_ADDRESS; message: P2P_MESSAGE; redirection: P2P_PEERGROUP_ID) is -- Apply outgoing filters and send message to a peer using a mangled address using `redirection' group require Status_started: module_status = start_ok Address_processable: is_processable (address) and address.service_name /= Void Redirection_valid: redirection /= Void Message_valid: message /= Void local mangled_address: P2P_ENDPOINT_ADDRESS do create mangled_address.make_with_service (address.protocol_name, address.protocol_address, group_mangling_service_prefix + redirection.out_short, handler_name (address.service_name, address.service_parameter)) send_message (mangled_address, message) end feature {NONE} -- Implementation transports: DS_HASH_TABLE [P2P_MESSAGE_TRANSPORT, STRING] incoming_message_filters: DS_LINKED_LIST [ TUPLE [ FUNCTION [ANY, TUPLE [P2P_MESSAGE, P2P_ENDPOINT_ADDRESS, P2P_ENDPOINT_ADDRESS], P2P_MESSAGE], STRING, STRING]] outgoing_message_filters: like incoming_message_filters services: DS_HASH_TABLE [PROCEDURE [ANY, TUPLE [P2P_MESSAGE, P2P_ENDPOINT_ADDRESS, P2P_ENDPOINT_ADDRESS]], STRING] prune_message_filter (filter_target: ANY; namespace: STRING; name: STRING; incoming: BOOLEAN) is -- Unregister message filter require Filter_target_valid: filter_target /= Void local cursor: DS_LIST_CURSOR [TUPLE [FUNCTION [ANY, TUPLE [P2P_MESSAGE, P2P_ENDPOINT_ADDRESS, P2P_ENDPOINT_ADDRESS], P2P_MESSAGE], STRING, STRING]] curfilter: FUNCTION [ANY, TUPLE, P2P_MESSAGE] curnamespace, curname: STRING do from if incoming then cursor := incoming_message_filters.new_cursor else cursor := outgoing_message_filters.new_cursor end cursor.start until cursor.after loop curfilter ?= cursor.item.item (1) curnamespace ?= cursor.item.item (2) curname ?= cursor.item.item (3) if filter_target = curfilter.target and equal (namespace, curnamespace) and equal (name, curname) then cursor.remove else cursor.forth end end end process_filters (message: P2P_MESSAGE; source, destination: P2P_ENDPOINT_ADDRESS; incoming: BOOLEAN): P2P_MESSAGE is -- Process incoming/outgoing filters to message -- Return modified message or Void if message should be discarded require Status_started: module_status = start_ok Message_existent: message /= Void Addresses_existent: source /= Void and destination /= Void local filterlist: DS_LIST_CURSOR [TUPLE [ FUNCTION [ANY, TUPLE [P2P_MESSAGE, P2P_ENDPOINT_ADDRESS, P2P_ENDPOINT_ADDRESS], P2P_MESSAGE], STRING, STRING]] filter: FUNCTION [ANY, TUPLE [P2P_MESSAGE, P2P_ENDPOINT_ADDRESS, P2P_ENDPOINT_ADDRESS], P2P_MESSAGE] namespace, name: STRING apply_filter: BOOLEAN do -- choose filter list if incoming then filterlist := incoming_message_filters.new_cursor else filterlist := outgoing_message_filters.new_cursor end -- loop through all filters from filterlist.start Result := message -- start with original message until Result = Void or filterlist.after loop -- apply filter? namespace ?= filterlist.item.item (2) name ?= filterlist.item.item (3) if namespace = Void and name = Void then apply_filter := True elseif namespace /= Void and name /= Void and message.element_by_namespace_and_name (namespace, name) /= Void then apply_filter := True elseif namespace /= Void and name = Void and message.uses_namespace (namespace) then apply_filter := True elseif namespace = Void and name /= Void and message.element_by_name (name) /= Void then apply_filter := True else apply_filter := False end -- call filter filter ?= filterlist.item.item (1) if apply_filter and filter /= Void then filter.call ([Result, source, destination]) Result := filter.last_result end filterlist.forth end end check_dependencies (a_parent: P2P_PEERGROUP): BOOLEAN is -- Are all needed dependencies met? do Result := True end effective_destination_address (an_address: P2P_ENDPOINT_ADDRESS): P2P_ENDPOINT_ADDRESS is -- Effective destination address of `an_address' (demangled service name) require Address_valid: an_address /= Void and an_address.is_valid and an_address.service_name /= Void local sname, sparm: STRING pos: INTEGER do if an_address.service_parameter /= Void and an_address.service_name.substring (1, group_mangling_service_prefix.count) .is_equal (group_mangling_service_prefix) then -- demangle service name/parameter pos := an_address.service_parameter.index_of ('/', 1) if pos /= 0 then sname := an_address.service_parameter.substring (1, pos - 1) sparm := an_address.service_parameter.substring (pos + 1, an_address.service_parameter.count) else sname := an_address.service_parameter end create Result.make_with_service (an_address.protocol_name, an_address.protocol_address, sname, sparm) else Result := an_address end ensure Result_set: Result /= Void and Result.is_valid end feature {NONE} -- Constants Initial_transports_count: INTEGER is 5 Initial_services_count: INTEGER is 10 end