indexing description: "Origo Core Service" license: "MIT license (see ../../license.txt)" author: "Beat Strasser " date: "$Date$" revision: "$Revision$" class O_P2P_MODULE inherit P2P_MODULE redefine init end O_CONSTANTS P2P_CREATORS_SHARED create init feature {NONE} -- Initialization init (a_group: like peer_group; an_id: like module_id; an_advertisement: like implementation_advertisement) is -- Initialize module local pid_equality_tester: KL_EQUALITY_TESTER [P2P_PEER_ID] service_equality_tester: KL_EQUALITY_TESTER [O_SERVICE] do Precursor (a_group, an_id, an_advertisement) create pid_equality_tester create service_equality_tester create {DS_LINKED_LIST [P2P_PEER_ID]} core_peers.make core_peers.set_equality_tester (pid_equality_tester) create {DS_LINKED_LIST [O_SERVICE]} local_services.make local_services.set_equality_tester (service_equality_tester) create core_event_handlers.make xml_document_creator.extend_custom_creator ("OrigoService", agent (an_element: XM_ELEMENT): O_SERVICE_ADVERTISEMENT do create Result.make_from_element (an_element) end) xml_document_creator.extend_custom_creator ("OrigoServiceRole", agent (an_element: XM_ELEMENT): O_SERVICE_ROLE_ADVERTISEMENT do create Result.make_from_element (an_element) end) xml_document_creator.extend_custom_creator ("OrigoServiceControl", agent (an_element: XM_ELEMENT): O_SERVICE_CONTROL_MESSAGE do create Result.make_from_element (an_element) end) discovery := peer_group.discovery_service endpoint := peer_group.endpoint_service endpoint_service_name := module_id.out is_core := peer_group.peer_advertisement.name.is_equal (core_peer_name) end feature -- Access endpoint_service_name: STRING -- Used service name for endpoint messages is_core: BOOLEAN -- Is current peer an Origo core? core_peers: DS_LIST [P2P_PEER_ID] -- List of discovered core peers local_services: DS_LIST [O_SERVICE] -- Registered local services has_core_connection: BOOLEAN is -- Is current peer connected to at least one Origo core? do Result := is_core or core_peers.count > 0 end feature -- Auxiliary discover_core is -- Send core discovery queries require Network_connected: peer_group.rendezvous_service.is_connected local query: P2P_DISCOVERY_QUERY do -- create discovery query create query.make (discovery.type_peer) query.set_threshold (10) query.set_restriction ("Name", core_peer_name) -- send discovery query logger.info ("p2p_module: Sending core peer discovery request, group id: " + peer_group.id.out) discovery.query_remote_advertisements (query, Void, Void) end register (a_service: O_SERVICE) is -- Register `a_service' at core require Service_valid: a_service /= Void do if not local_services.has (a_service) then -- save as local service local_services.put_last (a_service) -- publish configuration to already available core peers send_configuration (Void, a_service) end ensure Service_set: local_services.has (a_service) end unregister (a_service: O_SERVICE) is -- Unregister `a_service' at core require Service_existent: a_service /= Void and local_services.has (a_service) local cursor: DS_LIST_CURSOR [O_SERVICE] msg: O_SERVICE_CONTROL_MESSAGE do cursor := local_services.new_cursor cursor.start cursor.search_forth (a_service) if not cursor.after then cursor.remove end -- send unregister message to core create msg.make ({O_SERVICE_CONTROL_MESSAGE}.command_unregister_service) msg.add_parameter (a_service.configuration.unique_id) send_message (msg) ensure Service_removed: not local_services.has (a_service) end publish_configuration (a_service: O_SERVICE) is -- Publish current configuration to all cores require Service_valid: a_service /= Void do send_configuration (Void, a_service) end feature -- Core extend_core_event_handler (a_handler: PROCEDURE [ANY, TUPLE [O_SERVICE_CONTROL_MESSAGE]]) is -- Register core event handler `a_handler' under its target require Handler_valid: a_handler /= Void do if not core_event_handlers.has (a_handler) then core_event_handlers.put_last (a_handler) end end prune_core_event_handler (a_target: ANY) is -- Unregister core event handler for `a_target' require Target_valid: a_target /= Void local cursor: DS_LIST_CURSOR [PROCEDURE [ANY, TUPLE]] do from cursor := core_event_handlers.new_cursor cursor.start until cursor.after loop if cursor.item.target = a_target then cursor.remove -- and go forth else cursor.forth end end end feature -- Message Transport send_message (a_msg: O_SERVICE_CONTROL_MESSAGE) is -- Send a service control message `a_msg' to responsible peer(s) require Message_valid: a_msg /= Void and a_msg.is_valid do -- send message to possible local responsible services send_message_locally (a_msg) -- send message to remote services send_message_remotely (a_msg) end process_message (a_msg: P2P_MESSAGE; a_source, a_destination: P2P_ENDPOINT_ADDRESS) is -- Process incoming endpoint message require Message_valid: a_msg /= Void Source_valid: a_source /= Void DestinationO_valid: a_destination /= Void and a_destination.service_name.is_equal (endpoint_service_name) local msgel: P2P_MESSAGE_ELEMENT control_msg: O_SERVICE_CONTROL_MESSAGE do -- parse control message msgel := a_msg.element_by_namespace_and_name (message_namespace, message_element_control) if msgel /= Void then create control_msg.parse_from_string (msgel.content) if control_msg.is_valid then logger.info ("p2p_module: Origo service control message received. Passing to responsible service(s), source: " + a_source.out + ", command: " + control_msg.control_command.out + ", group id: " + peer_group.id.out) -- pass message to responsible service(s) send_message_locally (control_msg) else logger.error ("p2p_module: Invalid Origo service control message received, source: " + a_source.out + ", group id: " + peer_group.id.out) end end end process_discovery_response (a_response: P2P_DISCOVERY_RESPONSE) is -- Process discovery response require Response_valid: a_response /= Void and a_response.is_valid local peer_advs: DS_LIST [P2P_PEER_ADVERTISEMENT] new_cores: DS_LINKED_LIST [P2P_PEER_ID] adv_cursor: DS_LIST_CURSOR [P2P_ADVERTISEMENT] sadv: O_SERVICE_ADVERTISEMENT radv: O_SERVICE_ROLE_ADVERTISEMENT msg: O_SERVICE_CONTROL_MESSAGE do if a_response.type = discovery.type_peer then logger.debugging ("p2p_module: Processing discovery peer advertisement responses, count: " + a_response.count.out + ", group id: " + peer_group.id.out) create new_cores.make -- publish peer advs locally and add new cores from peer_advs := a_response.all_peer_advertisements peer_advs.start until peer_advs.after loop discovery.publish_advertisement_locally (peer_advs.item_for_iteration) if peer_advs.item_for_iteration.name.is_equal (core_peer_name) and not core_peers.has (peer_advs.item_for_iteration.peer_id) then core_peers.put_last (peer_advs.item_for_iteration.peer_id) new_cores.put_last (peer_advs.item_for_iteration.peer_id) logger.info ("p2p_module: Discovered new core peer, peer id: " + peer_advs.item_for_iteration.peer_id.out + ", group id: " + peer_group.id.out) end peer_advs.forth end -- publish entire configuration to found core peer(s) from new_cores.start until new_cores.after loop send_configuration (new_cores.item_for_iteration, Void) new_cores.forth end elseif a_response.type = discovery.type_general then logger.debugging ("p2p_module: Processing discovery general advertisement responses, count: " + a_response.count.out + ", group id: " + peer_group.id.out) -- save source peer advertisement if a_response.peer_advertisement /= Void then discovery.publish_advertisement_locally (a_response.peer_advertisement) end -- save incoming service (role) advertisements if is_core then create msg.make ({O_SERVICE_CONTROL_MESSAGE}.command_received_service_configurations) end from adv_cursor := a_response.responses.new_cursor adv_cursor.start until adv_cursor.after loop sadv ?= adv_cursor.item radv ?= adv_cursor.item if sadv /= Void or radv /= Void then discovery.publish_advertisement_locally (adv_cursor.item) if is_core then msg.add_parameter (adv_cursor.item.unique_id) end end adv_cursor.forth end if is_core and msg.parameters.count > 0 then call_core_handlers (msg) end end end feature -- Module operations start (args: ARRAY [STRING_8]) -- Start module do if module_status = initializing then -- Register endpoint message handler endpoint.extend_service (endpoint_service_name, Void, agent process_message) -- Register discovery response handler discovery.extend_response_listener (agent process_discovery_response) end -- Register rendezvous event handler peer_group.rendezvous_service.extend_rendezvous_event_handler (agent process_rendezvous_event) module_status := start_ok end suspend -- Suspend service do -- Unregister rendezvous event handler peer_group.rendezvous_service.prune_rendezvous_event_handler (Current) module_status := suspended end stop -- Stop module in any case do -- Unregister discovery response handler discovery.remove_response_listener (Current) -- Unregister endpoint message handler endpoint.prune_service (endpoint_service_name, Void) module_status := stop_ok end feature {NONE} -- Implementation core_event_handlers: DS_LINKED_LIST [PROCEDURE [ANY, TUPLE [O_SERVICE_CONTROL_MESSAGE]]] discovery: P2P_DISCOVERY_SERVICE endpoint: P2P_ENDPOINT_SERVICE check_dependencies (a_parent: P2P_PEERGROUP): BOOLEAN -- Are all needed dependencies met? do Result := a_parent.endpoint_service /= Void and a_parent.discovery_service /= Void and a_parent.rendezvous_service /= Void end process_rendezvous_event (an_event: P2P_RENDEZVOUS_EVENT) is -- Discover core peers as soon as we're rdv or have rdv connection require Event_valid: an_event /= Void do if not is_core and (an_event.type = {P2P_RENDEZVOUS_EVENT}.type_connected_to_rendezvous or an_event.type = {P2P_RENDEZVOUS_EVENT}.type_became_rendezvous) then discover_core elseif is_core then discovery.publish_advertisement_remotely (peer_group.peer_advertisement, Void) end end send_configuration (a_peer_id: P2P_PEER_ID; a_service: O_SERVICE) is -- Send local services or only `a_service' configuration to all cores or only `a_peer_id' require Peer_id_valid: a_peer_id /= Void implies a_peer_id.is_valid local adv_list: DS_LINKED_LIST [P2P_ADVERTISEMENT] service_cursor: DS_LIST_CURSOR [O_SERVICE] role_cursor: DS_LIST_CURSOR [O_SERVICE_ROLE_ADVERTISEMENT] discovery_response: P2P_DISCOVERY_RESPONSE peer_cursor: DS_LIST_CURSOR [P2P_PEER_ID] msg: O_SERVICE_CONTROL_MESSAGE do if a_peer_id /= Void or has_core_connection then logger.debugging ("p2p_module: Publishing configuration, to all cores: " + (a_peer_id = Void).out + ", all services: " + (a_service = Void).out + ", group id: " + peer_group.id.out) -- create configuration advertisements list create adv_list.make from service_cursor := local_services.new_cursor service_cursor.start until service_cursor.after loop if a_service = Void or service_cursor.item = a_service then -- add general service configuration as parameter adv_list.put_last (service_cursor.item.configuration) -- add service role configurations as parameters from role_cursor := service_cursor.item.registered_roles.new_cursor role_cursor.start until role_cursor.after loop adv_list.put_last (role_cursor.item) role_cursor.forth end if a_service /= Void then service_cursor.go_after else service_cursor.forth end else service_cursor.forth end end if adv_list.count > 0 then if is_core then -- Publish the advertisements locally and notify the core handlers about the new configurations create msg.make ({O_SERVICE_CONTROL_MESSAGE}.command_received_service_configurations) from adv_list.start until adv_list.after loop discovery.publish_advertisement_locally (adv_list.item_for_iteration) msg.add_parameter (adv_list.item_for_iteration.unique_id) adv_list.forth end call_core_handlers (msg) else -- send advertisements as discovery responses to core(s) create discovery_response.make_with_response (discovery.type_general, adv_list) discovery_response.set_peer_advertisement (peer_group.peer_advertisement) if a_peer_id = Void then from peer_cursor := core_peers.new_cursor peer_cursor.start until peer_cursor.after loop discovery.publish_advertisements_remotely (discovery_response, peer_cursor.item) peer_cursor.forth end else discovery.publish_advertisements_remotely (discovery_response, a_peer_id) end logger.info ("p2p_module: Published advertisements, all cores: " + (a_peer_id = Void).out + ", all configs: " + (a_service = Void).out + ", group id: " + peer_group.id.out) end end end end send_message_locally (a_msg: O_SERVICE_CONTROL_MESSAGE) is -- Pass service control message `a_msg' to responsible service/role (local only) require Message_valid: a_msg /= Void and a_msg.is_valid local service_cursor: DS_LIST_CURSOR [O_SERVICE] do logger.info ("p2p_module: Sending message locally, command: " + a_msg.control_command.out + ", group id: " + peer_group.id.out) if a_msg.service_type < 0 then -- messages's destination is the core if is_core then -- call core handler call_core_handlers (a_msg) end else from service_cursor := local_services.new_cursor service_cursor.start until service_cursor.after loop if a_msg.service_type = service_cursor.item.configuration.type and (a_msg.service_name = Void or a_msg.service_name.is_equal (service_cursor.item.configuration.name)) then service_cursor.item.process_network_message (a_msg) end service_cursor.forth end end end send_message_remotely (a_msg: O_SERVICE_CONTROL_MESSAGE) is -- Pass service control message `a_msg' to responsible service/role (local only) require Message_valid: a_msg /= Void and a_msg.is_valid local msg: P2P_MESSAGE msgel: P2P_MESSAGE_ELEMENT pid_cursor: DS_LIST_CURSOR [P2P_PEER_ID] address: P2P_ENDPOINT_ADDRESS peer_id: P2P_PEER_ID role_adv: O_SERVICE_ROLE_ADVERTISEMENT directly_sent: BOOLEAN do logger.info ("p2p_module: Sending message remotely, command: " + a_msg.control_command.out + ", group id: " + peer_group.id.out) -- create endpoint message create msg.make create msgel.make_xml (message_namespace, message_element_control, Void, a_msg.out) msg.extend (msgel) if is_core and a_msg.service_type >= 0 then -- Send message to all responsible peers, if we're the core from pid_cursor := responsible_remote_services (a_msg.service_type, a_msg.service_name, a_msg.service_role).new_cursor pid_cursor.start until pid_cursor.after loop -- local services are handled elsewhere (`send_message_locally') if not peer_group.peer_id.is_equal (pid_cursor.item) then create address.make_with_id (pid_cursor.item, endpoint_service_name, Void) endpoint.send_message (address, msg) end pid_cursor.forth end directly_sent := True -- messages with fully specified recipients are going to be sent directly, others via core peer elseif a_msg.service_type >= 0 and a_msg.service_name /= Void and a_msg.service_role /= Void then -- do we have the corresponding service role advertisement available? role_adv ?= discovery.local_general_advertisement ("OrigoServiceRole/" + a_msg.service_type.out + "/" + a_msg.service_name + "/" + a_msg.service_role) if role_adv /= Void then -- only send to remote peers (local sending should be done with `send_message_locally') if not role_adv.peer_id.is_equal (peer_group.peer_id) then create address.make_with_id (role_adv.peer_id, endpoint_service_name, Void) endpoint.send_message (address, msg) end directly_sent := True end end if not is_core and core_peers.count > 0 and not directly_sent then -- send message to a random core peer_id := random_core create address.make_with_id (peer_id, endpoint_service_name, Void) endpoint.send_message (address, msg) end end random_core: P2P_PEER_ID is -- Random core peer id require Cores_available: core_peers.count > 0 do Result := core_peers.first -- ok, pseudorandom ;) ensure Result_set: Result /= Void and Result.is_valid end call_core_handlers (a_msg: O_SERVICE_CONTROL_MESSAGE) is -- Call core handlers with given `a_msg' require Message_valid: a_msg /= Void and a_msg.is_valid local core_cursor: DS_LIST_CURSOR [PROCEDURE [ANY, TUPLE [O_SERVICE_CONTROL_MESSAGE]]] do from core_cursor := core_event_handlers.new_cursor core_cursor.start until core_cursor.after loop core_cursor.item.call ([a_msg]) core_cursor.forth end end responsible_remote_services (a_type: INTEGER; a_name, a_role: STRING): DS_LIST [P2P_PEER_ID] is -- Peer ids of peers having responsible services for given service restrictions require Type_valid: a_type >= 0 local adv_cursor: DS_LIST_CURSOR [P2P_ADVERTISEMENT] sadv: O_SERVICE_ADVERTISEMENT radv: O_SERVICE_ROLE_ADVERTISEMENT services: DS_HASH_TABLE [O_SERVICE_ADVERTISEMENT, STRING] string_equality_tester: KL_EQUALITY_TESTER [STRING] pid_equality_tester: KL_EQUALITY_TESTER [P2P_PEER_ID] result_set: DS_HASH_SET [P2P_PEER_ID] do -- get all advertisements with given `a_type' adv_cursor := discovery.local_general_advertisements ("Type", a_type.out).new_cursor if a_name = Void or a_role = Void then -- get all possible service names and roles (complete when necessary) create string_equality_tester create services.make_default services.set_key_equality_tester (string_equality_tester) from adv_cursor.start until adv_cursor.after loop -- look only at service advertisements sadv ?= adv_cursor.item if sadv /= Void and (a_name = Void or (a_name.is_equal (sadv.name) and a_role = Void)) then services.force (sadv, sadv.name) end adv_cursor.forth end end -- search for all matching service role advertisements for getting their peer ids create result_set.make_default create pid_equality_tester result_set.set_equality_tester (pid_equality_tester) from adv_cursor.start until adv_cursor.after loop radv ?= adv_cursor.item if radv /= Void then if (a_name /= Void and a_name.is_equal (radv.name)) or (a_name = Void and services.has (radv.name)) then if (a_role /= Void and a_role.is_equal (radv.role)) or (a_role = Void and services.has (radv.name) and services.item (radv.name).roles.has (radv.role)) then result_set.force (radv.peer_id) end end end adv_cursor.forth end -- convert result hash set to list create {DS_ARRAYED_LIST [P2P_PEER_ID]} Result.make_from_linear (result_set) Result.set_equality_tester (pid_equality_tester) end end