indexing description: "Provides Rendezvous Service" license: "MIT license (see ../../license.txt)" author: "Beat Strasser " date: "$Date$" revision: "$Revision$" class P2P_RENDEZVOUS_SERVICE inherit P2P_MODULE redefine init end THREAD export {NONE} all end DT_SHARED_SYSTEM_CLOCK export {NONE} all end L4E_PRIORITY_CONSTANTS export {NONE} all end P2P_RANDOM_SHARED P2P_EXCEPTION_LOG create init feature {NONE} -- Initialization init (group: P2P_PEERGROUP; id: P2P_ID; advertisement: like implementation_advertisement) is -- Initialize module local peer_id_equality_tester: KL_EQUALITY_TESTER [P2P_PEER_ID] address_equality_tester: KL_EQUALITY_TESTER [P2P_ENDPOINT_ADDRESS] conf: P2P_RENDEZVOUS_CONFIGURATION logging_message: STRING do Precursor (group, id, advertisement) -- read configuration to get available rendezvous peers conf ?= peer_group.configuration.service_parameter (rendezvous_mcid) if conf /= Void and conf.is_valid and conf.type = conf.type_client then -- seed uris are already resolved to addresses by the platform create {DS_ARRAYED_LIST [P2P_ENDPOINT_ADDRESS]} available_rendezvous.make_from_linear (conf.seed_peers) -- create data structures create msg_id_list.make (0, msg_id_storage_size - 1) msg_id_list.compare_objects create msg_id_list_lock.make create connected_rdv.make_default create connected_rdv_lock.make create peer_id_equality_tester connected_rdv.set_key_equality_tester (peer_id_equality_tester) create address_equality_tester available_rendezvous.set_equality_tester (address_equality_tester) create rendezvous_event_handlers.make -- set some parameters propagate_service_parameter := peer_group.id.out_short propagate_msg_element_name := propagate_msg_element_name_prefix + peer_group.id.out_short lease_service_name := id.out lease_service_parameter := peer_group.id.out_short -- Only rendezvous client implemented is_rendezvous := False -- Log available rendezvous if logger.is_enabled_for (info_p) then logging_message := "rendezvous_service: Initialized, available rendezvous peers:" from available_rendezvous.start until available_rendezvous.after loop logging_message.append (" " + available_rendezvous.item_for_iteration.out_without_service) available_rendezvous.forth end logger.info (logging_message + ", group id: " + peer_group.id.out) end else module_status := init_failed logger.error ("rendezvous_service: Invalid configuration or unimplemented mode set, id: " + module_id.out + ", group id: " + peer_group.id.out) end end feature -- Status report is_rendezvous: BOOLEAN -- Is local peer a rendezvous? feature -- Rendezvous client Ttl_max: INTEGER is 50 available_rendezvous: DS_LIST [P2P_ENDPOINT_ADDRESS] -- Available rendezvous peers add_available_rendezvous (an_address: P2P_ENDPOINT_ADDRESS) is -- Add available rendezvous require Status_valid: module_status = start_ok or module_status = suspended do if not available_rendezvous.has (an_address) then available_rendezvous.put_last (an_address) end end connect (a_rendezvous_peer: P2P_ENDPOINT_ADDRESS) is -- Connect to `a_rendezvous_peer' require Status_valid: module_status = start_ok Peer_valid: a_rendezvous_peer /= Void and a_rendezvous_peer.is_valid Module_started: module_status = start_ok local peer_id: P2P_PEER_ID do add_available_rendezvous (a_rendezvous_peer) peer_id := a_rendezvous_peer.peer_id if peer_id /= Void and not is_connected_to_rendezvous (peer_id) then send_lease_request (a_rendezvous_peer, True) elseif peer_id = Void then send_lease_request (a_rendezvous_peer, True) end logger.info ("rendezvous_service: Connect request sent to: " + a_rendezvous_peer.out_without_service + ", group id: " + peer_group.id.out) end disconnect (a_rendezvous_peer: P2P_PEER_ID) is -- Disconnect from `a_rendezvous_peer' require Status_valid: module_status = start_ok or module_status = suspending Peer_valid: a_rendezvous_peer /= Void and a_rendezvous_peer.is_valid Module_started: module_status = start_ok or module_status = suspending local address: P2P_ENDPOINT_ADDRESS do if is_connected_to_rendezvous (a_rendezvous_peer) then create address.make_with_id (a_rendezvous_peer, Void, Void) send_lease_request (address, False) remove_connected_rendezvous (a_rendezvous_peer) logger.info ("rendezvous_service: Disconnect request sent to: " + a_rendezvous_peer.out + ", group id: " + peer_group.id.out) -- call event handlers call_event_handlers ({P2P_RENDEZVOUS_EVENT}.type_disconnected_from_rendezvous, a_rendezvous_peer) end end is_connected: BOOLEAN is -- Are we connected to the minimal number of rendezvous peers? require Status_valid: module_status = start_ok or module_status = suspended or module_status = suspending do Result := connected_rdv.count >= min_connected_rendezvous_number end is_connected_to_rendezvous (a_rendezvous_peer: P2P_PEER_ID): BOOLEAN is -- Are we connected to `a_rendezvous_peer'? require Status_valid: module_status = start_ok or module_status = suspended or module_status = suspending Peer_valid: a_rendezvous_peer /= Void and a_rendezvous_peer.is_valid do connected_rdv_lock.acquire_read_lock Result := connected_rdv.has (a_rendezvous_peer) connected_rdv_lock.release_reader_lock ensure Result_set: Result = connected_rendezvous.has (a_rendezvous_peer) rescue connected_rdv_lock.release_reader_lock end connected_rendezvous: DS_LIST [P2P_PEER_ID] is -- List of rendezvous peers we're connected to require Status_valid: module_status = start_ok or module_status = suspended or module_status = suspending local cursor: DS_HASH_TABLE_CURSOR [TUPLE, P2P_PEER_ID] do connected_rdv_lock.acquire_read_lock from create {DS_ARRAYED_LIST [P2P_PEER_ID]} Result.make (connected_rdv.count) cursor := connected_rdv.new_cursor cursor.start until cursor.after loop Result.put_last (cursor.key) cursor.forth end connected_rdv_lock.release_reader_lock ensure Result_set: Result /= Void rescue connected_rdv_lock.release_reader_lock end process_lease_protocol_message (a_message: P2P_MESSAGE; a_source, a_destination: P2P_ENDPOINT_ADDRESS) is -- Process incoming message from endpoint service require Module_started: module_status = start_ok or module_status = suspended or module_status = suspending Message_valid: a_message /= Void Source_valid: a_source /= Void and a_source.is_valid Destination_valid: a_destination /= Void and a_destination.is_valid do if module_status = start_ok then if a_message.element_by_namespace_and_name (a_message.namespace_jxta, lease_connected_id_element_name) /= Void then process_lease_granted_message (a_message) elseif a_message.element_by_namespace_and_name (a_message.namespace_jxta, lease_disconnect_element_name) /= Void then process_lease_disconnect_message (a_message) end end end extend_rendezvous_event_handler (a_handler: PROCEDURE [ANY, TUPLE [P2P_RENDEZVOUS_EVENT]]) is -- Register rendezvous event handler `a_handler' with its target require Handler_valid: a_handler /= Void do rendezvous_event_handlers.put_last (a_handler) end prune_rendezvous_event_handler (a_target: ANY) is -- Prune rendezvous event handler registered for `a_target' require Target_valid: a_target /= Void local handler_cursor: DS_LIST_CURSOR [PROCEDURE [ANY, TUPLE [P2P_RENDEZVOUS_EVENT]]] do from handler_cursor := rendezvous_event_handlers.new_cursor handler_cursor.start until handler_cursor.after loop if handler_cursor.item.target = a_target then handler_cursor.remove -- and move forth else handler_cursor.forth end end end feature -- Message propagation propagate_to_peers (a_message: P2P_MESSAGE; peers: DS_LIST [P2P_PEER_ID]; a_service_name, a_service_parameter: STRING; a_ttl: INTEGER) is -- Propagate `a_message' to specified `peers' require Module_started: module_status = start_ok or module_status = suspended Message_valid: a_message /= Void Peers_valid: peers /= Void Service_name_valid: a_service_name /= Void Ttl_valid: a_ttl > 0 local propmsg: P2P_RENDEZVOUS_PROPAGATE peer_cursor: DS_LIST_CURSOR [P2P_PEER_ID] peer_address: P2P_ENDPOINT_ADDRESS log_parameter: STRING do if a_service_parameter /= Void then log_parameter := ", service parameter: " + a_service_parameter else create log_parameter.make_empty end logger.debugging ("rendezvous_service: Propagating message to peers, service: " + a_service_name + log_parameter + ", group id: " + peer_group.id.out) -- create/update propagation message propmsg := update_propagation_element (a_message, a_service_name, a_service_parameter, a_ttl) store_message_id (propmsg.message_id) from peer_cursor := peers.new_cursor peer_cursor.start until peer_cursor.after loop -- check if peer already has processed message if not propmsg.has_path (peer_cursor.item.out) then create peer_address.make_with_id (peer_cursor.item, propagate_service_name, propagate_service_parameter) peer_group.endpoint_service.send_message_mangled (peer_address, a_message.twin, peer_group.id) end peer_cursor.forth end logger.info ("rendezvous_service: Propagated message to peers, service: " + a_service_name + log_parameter + ", group id: " + peer_group.id.out) end propagate (a_message: P2P_MESSAGE; a_service_name, a_service_parameter: STRING; a_ttl: INTEGER) is -- Propagate `a_message' to local neighbours and in peer group require Module_started: module_status = start_ok Message_valid: a_message /= Void Service_name_valid: a_service_name /= Void Ttl_valid: a_ttl > 0 Connected: is_connected do propagate_to_neighbours (a_message, a_service_name, a_service_parameter, a_ttl) propagate_in_group (a_message, a_service_name, a_service_parameter, a_ttl) end propagate_to_neighbours (a_message: P2P_MESSAGE; a_service_name, a_service_parameter: STRING; a_ttl: INTEGER) is -- Propagate `a_message' to local neighbours require Module_started: module_status = start_ok or module_status = suspended Message_valid: a_message /= Void Service_name_valid: a_service_name /= Void Ttl_valid: a_ttl > 0 local propmsg: P2P_RENDEZVOUS_PROPAGATE log_parameter: STRING do if a_service_parameter /= Void then log_parameter := ", service parameter: " + a_service_parameter else create log_parameter.make_empty end logger.debugging ("rendezvous_service: Propagating message to neighbours, service: " + a_service_name + log_parameter + ", group id: " + peer_group.id.out) -- create/update propagation message propmsg := update_propagation_element (a_message, a_service_name, a_service_parameter, a_ttl) store_message_id (propmsg.message_id) -- send message via endpoint service propagation (broadcasting) peer_group.endpoint_service.propagate_mangled (a_message, propagate_service_name, propagate_service_parameter, peer_group.id) logger.info ("rendezvous_service: Propagated message to neighbours, service: " + a_service_name + log_parameter + ", group id: " + peer_group.id.out) end propagate_in_group (a_message: P2P_MESSAGE; a_service_name, a_service_parameter: STRING; a_ttl: INTEGER) is -- Propagate `a_message' in peer group require Module_started: module_status = start_ok Message_valid: a_message /= Void Service_name_valid: a_service_name /= Void Ttl_valid: a_ttl > 0 do -- send message to connected rendezvous propagate_to_peers (a_message, connected_rendezvous, a_service_name, a_service_parameter, a_ttl) end process_propagated_message (a_message: P2P_MESSAGE; a_source, a_destination: P2P_ENDPOINT_ADDRESS) is -- Process incoming message from endpoint service require Module_started: module_status = start_ok or module_status = suspended Message_valid: a_message /= Void Source_valid: a_source /= Void and a_source.is_valid Destination_valid: a_destination /= Void and a_destination.is_valid local el_propmsg, el_src, el_real_dest: P2P_MESSAGE_ELEMENT propmsg: P2P_RENDEZVOUS_PROPAGATE real_dest: P2P_ENDPOINT_ADDRESS do logger.debugging ("rendezvous_service: Receiving message, source: " + a_source.out + ", destination: " + a_destination.out + ", group id: " + peer_group.id.out) -- fetch propagate message element el_propmsg := a_message.element_by_namespace_and_name (a_message.namespace_jxta, propagate_msg_element_name) propmsg := check_propagation_message (el_propmsg) if propmsg /= Void then -- Store message id store_message_id (propmsg.message_id) -- create new source endpoint address create el_src.make_string (a_message.namespace_jxta, peer_group.endpoint_service.endpoint_source_address_element_name, Void, a_source.out) a_message.replace (el_src) -- create new destination endpoint address with real service name/parameter create real_dest.make_with_service (a_destination.protocol_name, a_destination.protocol_address, propmsg.destination_service_name, propmsg.destination_service_parameter) create el_real_dest.make_string (a_message.namespace_jxta, peer_group.endpoint_service.endpoint_destination_address_element_name, Void, real_dest.out) a_message.replace (el_real_dest) -- let the endpoint service demux the adapted message logger.debugging ("rendezvous_service: Handing propagated message to listeners, source: " + a_source.out + ", new destination: " + real_dest.out + ", group id: " + peer_group.id.out) peer_group.endpoint_service.demux (a_message) else logger.info ("rendezvous_service: Received message with invalid propagate element; message discarded, source: " + a_source.out + ", destination: " + a_destination.out + ", group id: " + peer_group.id.out) end end feature -- Basic operations start (args: ARRAY [STRING]) is -- start module local seed_cursor: DS_LIST_CURSOR [P2P_ENDPOINT_ADDRESS] do if module_status = initializing then -- Register our service listeners in endpoint service peer_group.endpoint_service.extend_service (propagate_service_name, propagate_service_parameter, agent process_propagated_message) peer_group.endpoint_service.extend_service (lease_service_name, Void, agent process_lease_protocol_message) -- prune un-processable available rendezvous from seed_cursor := available_rendezvous.new_cursor seed_cursor.start until seed_cursor.after loop -- FIXME 20070104 beatstr: TCP module should be able to support IPv6 if not peer_group.endpoint_service.is_processable (seed_cursor.item) or seed_cursor.item.has_ipv6_address then logger.debugging ("rendezvous_service: Unprocessable available rendezvous removed: " + seed_cursor.item.out + ", group id: " + peer_group.id.out) seed_cursor.remove -- and move forth else seed_cursor.forth end end end -- mark module as started, so the connection manager runs properly module_status := start_ok -- start rendezvous connection manager launch end suspend is -- Suspend module do -- tell rendezvous connection manager to stop module_status := suspending -- wait for rendezvous connection manager to stop join module_status := suspended end stop is -- stop module do -- Unregister our service listeners in endpoint service peer_group.endpoint_service.prune_service (propagate_service_name, propagate_service_parameter) peer_group.endpoint_service.prune_service (lease_service_name, Void) -- destroy mutexes msg_id_list_lock.lock msg_id_list_lock.unlock msg_id_list_lock.destroy connected_rdv_lock.acquire_write_lock connected_rdv_lock.release_writer_lock connected_rdv_lock.destroy module_status := stop_ok end feature {NONE} -- Message propagation protocol Propagate_service_name: STRING is "JxtaPropagate" propagate_service_parameter: STRING -- peer group id Propagate_msg_element_name_prefix: STRING is "RendezVousPropagate" Propagate_msg_element_name: STRING -- msg_element_name_prefix + peer group unique id msg_id_list: ARRAY [ARRAY [NATURAL_8]] msg_id_list_cursor: INTEGER Msg_id_storage_size: INTEGER is 512 msg_id_list_lock: MUTEX check_propagation_message (a_message_element: P2P_MESSAGE_ELEMENT): P2P_RENDEZVOUS_PROPAGATE is -- `a_message''s propagate element, if valid local propmsg: P2P_RENDEZVOUS_PROPAGATE do if a_message_element /= Void then -- parse propagate document create propmsg.parse_from_string (a_message_element.content) if propmsg.is_valid then -- check TTL, check message id for duplicate and detect loop if propmsg.ttl > 0 and not is_known_message_id (propmsg.message_id) and not propmsg.has_path (peer_group.peer_id.out) then Result := propmsg end end end end is_known_message_id (a_msg_id: ARRAY [NATURAL_8]): BOOLEAN is -- Is given `a_msg_id' a known message id? do msg_id_list_lock.lock Result := msg_id_list.has (a_msg_id) msg_id_list_lock.unlock rescue msg_id_list_lock.unlock end store_message_id (a_msg_id: ARRAY [NATURAL_8]) is -- Store `a_msg_id' in list. Keep maximal `Msg_id_storage_size' entries in list. do msg_id_list_lock.lock msg_id_list.put (a_msg_id, msg_id_list_cursor) msg_id_list_cursor := (msg_id_list_cursor + 1) \\ msg_id_storage_size msg_id_list_lock.unlock rescue msg_id_list_lock.unlock end update_propagation_element (a_message: P2P_MESSAGE; a_service_name, a_service_parameter: STRING; a_ttl: INTEGER): P2P_RENDEZVOUS_PROPAGATE is -- Update propagation message element and return new propagation message require Message_valid: a_message /= Void Service_name_valid: a_service_name /= Void local el_propmsg: P2P_MESSAGE_ELEMENT do -- fetch propagate message element, if already available el_propmsg := a_message.element_by_namespace_and_name (a_message.namespace_jxta, propagate_msg_element_name) if el_propmsg /= Void then -- parse propagate document create Result.parse_from_string (el_propmsg.content) if Result.is_valid and not Result.has_path (peer_group.peer_id.out) then -- update propagate message and save it in message Result.decrease_ttl Result.add_path (peer_group.peer_id.out) el_propmsg.set_content (Result.out) elseif not Result.is_valid then Result := Void end end if Result = Void then -- create new propagate message with new message id create Result.make (a_service_name, a_service_parameter, a_ttl.min (ttl_max), peer_group.peer_id.out) create el_propmsg.make_xml (a_message.namespace_jxta, propagate_msg_element_name, Void, Result.out) a_message.replace (el_propmsg) end ensure Result_set: Result /= Void end feature {NONE} -- Rendezvous lease protocol connected_rdv: DS_HASH_TABLE [TUPLE [DT_DATE_TIME, DT_DATE_TIME], P2P_PEER_ID] connected_rdv_lock: READ_WRITE_LOCK last_tried_rendezvous: P2P_ENDPOINT_ADDRESS Lease_connect_element_name: STRING is "Connect" Lease_connected_lease_element_name: STRING is "ConnectedLease" Lease_connected_id_element_name: STRING is "ConnectedPeer" Lease_connected_adv_element_name: STRING is "RdvAdvReply" Lease_disconnect_element_name: STRING is "Disconnect" lease_service_name: STRING -- assigned module id lease_service_parameter: STRING -- peer group unique id Connection_manager_processing_interval: INTEGER is 20 -- 10s Connection_manager_sleep_interval: INTEGER_64 is 500000000 -- 500ms Lease_renewal_min: INTEGER is 300000 -- 5min Min_connected_rendezvous_number: INTEGER is 1 send_lease_request (a_rendezvous: P2P_ENDPOINT_ADDRESS; should_connect: BOOLEAN) is -- Send lease connect or disconnect request to `a_rendezvous' require Module_started: module_status = start_ok or module_status = suspending Rendezvous_valid: a_rendezvous /= Void and a_rendezvous.is_valid local message: P2P_MESSAGE request_name: STRING el_peer_adv: P2P_MESSAGE_ELEMENT address: P2P_ENDPOINT_ADDRESS do -- create lease request message create message.make if should_connect then request_name := lease_connect_element_name else request_name := lease_disconnect_element_name end create el_peer_adv.make_xml (message.namespace_jxta, request_name, Void, peer_group.peer_advertisement.out) message.extend (el_peer_adv) -- create rendezvous endpoint address create address.make_with_service (a_rendezvous.protocol_name, a_rendezvous.protocol_address, lease_service_name, lease_service_parameter) -- send message to rendezvous peer peer_group.endpoint_service.send_message_mangled (address, message, peer_group.id) end process_lease_granted_message (a_message: P2P_MESSAGE) is -- Process incoming lease granted message require Module_started: module_status = start_ok Message_valid: a_message /= Void local el_lease, el_id, el_adv: P2P_MESSAGE_ELEMENT lease: INTEGER peer_id: P2P_PEER_ID rdv_adv: P2P_PEER_ADVERTISEMENT lease_until: DT_DATE_TIME renew_lease_at: DT_DATE_TIME is_renewal: BOOLEAN do -- fetch message elements for lease time, peer id and advertisement el_lease := a_message.element_by_namespace_and_name (a_message.namespace_jxta, lease_connected_lease_element_name) el_id := a_message.element_by_namespace_and_name (a_message.namespace_jxta, lease_connected_id_element_name) el_adv := a_message.element_by_namespace_and_name (a_message.namespace_jxta, lease_connected_adv_element_name) -- parse time, peer id (and advertisement) if el_lease /= Void and el_lease.content.is_integer then lease := el_lease.content.to_integer end create peer_id.make_from_urn (el_id.content) if el_adv /= Void then create rdv_adv.parse_from_string (el_adv.content) if rdv_adv.is_valid then if peer_group.discovery_service /= Void then if lease > 0 then rdv_adv.set_lifetime_relative (lease.as_integer_64 * 2) else rdv_adv.set_lifetime_relative (rdv_adv.expiration_time) end -- save rendezvous peer advertisement peer_group.discovery_service.publish_advertisement_locally (rdv_adv) end else logger.error ("rendezvous_service: Discarded rendezvous peer advertisement (invalid)") end end if lease > 0 and peer_id.is_valid and module_status = start_ok then -- add rendezvous to connected rendezvous list lease_until := utc_system_clock.date_time_now renew_lease_at := lease_until.twin lease_until.add_milliseconds (lease) renew_lease_at.add_milliseconds (lease_renewal_min.max (lease // 2)) connected_rdv_lock.acquire_write_lock is_renewal := connected_rdv.has (peer_id) connected_rdv.force ([lease_until, renew_lease_at], peer_id) connected_rdv_lock.release_writer_lock last_tried_rendezvous := Void logger.info ("rendezvous_service: Connected to rendezvous peer: " + peer_id.out + ", group id: " + peer_group.id.out) -- call event handlers if is_renewal then call_event_handlers ({P2P_RENDEZVOUS_EVENT}.type_reconnected_to_rendezvous, peer_id) else call_event_handlers ({P2P_RENDEZVOUS_EVENT}.type_connected_to_rendezvous, peer_id) end else logger.error ("rendezvous_service: Failed to process incoming ConnectedLease message, group id: " + peer_group.id.out) end rescue if connected_rdv_lock.is_set then connected_rdv_lock.release_writer_lock end end process_lease_disconnect_message (a_message: P2P_MESSAGE) is -- Process incoming lease disconnect message require Module_started: module_status = start_ok Message_valid: a_message /= Void local el_adv: P2P_MESSAGE_ELEMENT rdv_adv: P2P_RENDEZVOUS_ADVERTISEMENT do -- parse peer advertisement el_adv := a_message.element_by_namespace_and_name (a_message.namespace_jxta, lease_disconnect_element_name) create rdv_adv.parse_from_string (el_adv.content) -- remove rendezvous from connected rendezvous list if rdv_adv.is_valid then remove_connected_rendezvous (rdv_adv.peer_id) logger.info ("rendezvous_service: Disconnected from rendezvous peer: " + rdv_adv.peer_id.out + ", group id: " + peer_group.id.out) -- call handlers call_event_handlers ({P2P_RENDEZVOUS_EVENT}.type_disconnected_from_rendezvous, rdv_adv.peer_id) else logger.error ("rendezvous_service: Failed to process incoming Disconnect message, group id: " + peer_group.id.out) end end connect_to_random is -- Connect to a random rendezvous from `available_rendezvous' require Module_started: module_status = start_ok local random: INTEGER trial_list: DS_ARRAYED_LIST [P2P_ENDPOINT_ADDRESS] do from create trial_list.make_from_linear (available_rendezvous) until trial_list.count = 0 or module_status = suspending loop -- find a random rendezvous random := (rand.item \\ trial_list.count) + 1 rand.forth -- can we connect? if peer_group.endpoint_service.ping (trial_list.item (random)) then -- connect send_lease_request (trial_list.item (random), True) last_tried_rendezvous := trial_list.item (random) logger.info ("rendezvous_service: Connect request sent to rendezvous peer: " + trial_list.item (random).out_without_service + ", group id: " + peer_group.id.out) trial_list.wipe_out else -- try another logger.error ("rendezvous_service: Failed to send Connect request to rendezvous peer: " + trial_list.item (random).out_without_service + ", group id: " + peer_group.id.out) remove_available_rendezvous (trial_list.item (random)) trial_list.remove (random) end end end execute is -- Rendezvous connection manager local failed: BOOLEAN step: INTEGER conn_cursor: DS_HASH_TABLE_CURSOR [TUPLE, P2P_PEER_ID] peer_id: P2P_PEER_ID do if not failed and peer_group.when_fully_started then logger.debugging ("rendezvous_service: Starting rendezvous connection manager, group id: " + peer_group.id.out) from step := 0 until module_status = suspending loop -- keep us connected to at least `min_connected_rendezvous_number' rendezvous peers if step = 0 then manage_rendezvous_connections end step := (step + 1) \\ connection_manager_processing_interval sleep (connection_manager_sleep_interval) end -- ordinary module stop: disconnect from all connected rendezvous peers from connected_rdv_lock.acquire_read_lock conn_cursor := connected_rdv.new_cursor conn_cursor.start until conn_cursor.after loop peer_id := conn_cursor.key connected_rdv_lock.release_reader_lock -- send disconnect request, remove rendezvous from this list (moves cursor forth) disconnect (peer_id) connected_rdv_lock.acquire_read_lock end connected_rdv_lock.release_reader_lock end logger.debugging ("rendezvous_service: Stopped rendezvous connection manager, group id: " + peer_group.id.out) rescue failed := True log_exceptions connected_rdv_lock.release_reader_lock retry end manage_rendezvous_connections is -- Manage rendezvous connections (request lease renewals) local cursor: DS_HASH_TABLE_CURSOR [TUPLE [DT_DATE_TIME, DT_DATE_TIME], P2P_PEER_ID] leased_until, renew_at: DT_DATE_TIME address: P2P_ENDPOINT_ADDRESS peer_id: P2P_PEER_ID do -- loop through all running connections from connected_rdv_lock.acquire_read_lock cursor := connected_rdv.new_cursor cursor.start until cursor.after loop -- check for died connection and update connected rendezvous list peer_id := cursor.key leased_until ?= cursor.item.item (1) renew_at ?= cursor.item.item (2) connected_rdv_lock.release_reader_lock if time_passed (leased_until) then -- remove rendezvous and move cursor forth remove_connected_rendezvous (peer_id) logger.info ("rendezvous_service: Lease timed out, removing connection to rendezvous peer: " + peer_id.out + ", group id: " + peer_group.id.out) -- call event handlers call_event_handlers ({P2P_RENDEZVOUS_EVENT}.type_disconnected_from_rendezvous, peer_id) connected_rdv_lock.acquire_read_lock else -- renew lease, if needed if time_passed (renew_at) then create address.make_with_id (peer_id, Void, Void) send_lease_request (address, True) logger.info ("rendezvous_service: Lease renewal request sent to rendezvous peer: " + address.out_without_service + ", group id: " + peer_group.id.out) end connected_rdv_lock.acquire_read_lock cursor.forth end end connected_rdv_lock.release_reader_lock -- connect to one new random rendezvous if too few rendezvous connections if connected_rdv.count < min_connected_rendezvous_number then if last_tried_rendezvous /= Void then remove_available_rendezvous (last_tried_rendezvous) end connect_to_random end rescue connected_rdv_lock.release_reader_lock end feature {NONE} -- Implementation rendezvous_event_handlers: DS_LINKED_LIST [PROCEDURE [ANY, TUPLE [P2P_RENDEZVOUS_EVENT]]] call_event_handlers (a_type: INTEGER; a_peer_id: P2P_PEER_ID) is -- require Peer_valid: a_peer_id /= Void and a_peer_id.is_valid local event: P2P_RENDEZVOUS_EVENT handler_cursor: DS_LIST_CURSOR [PROCEDURE [ANY, TUPLE [P2P_RENDEZVOUS_EVENT]]] do create event.make (a_type, a_peer_id) from handler_cursor := rendezvous_event_handlers.new_cursor handler_cursor.start until handler_cursor.after loop call_handler (handler_cursor.item, event) if not handler_cursor.after then handler_cursor.forth end end end call_handler (a_handler: PROCEDURE [ANY, TUPLE [P2P_RENDEZVOUS_EVENT]]; a_event: P2P_RENDEZVOUS_EVENT) is -- Call handler and catch all exceptions require Handler_valid: a_handler /= Void Event_valid: a_event /= Void local failed: BOOLEAN do if not failed then a_handler.call ([a_event]) end rescue failed := True log_exceptions retry end check_dependencies (a_parent: P2P_PEERGROUP): BOOLEAN is -- Are all needed dependencies met? do Result := a_parent.endpoint_service /= Void end time_passed (a_time: DT_DATE_TIME): BOOLEAN is -- Is `a_time' passed already? do Result := a_time <= utc_system_clock.date_time_now end remove_connected_rendezvous (a_peer_id: P2P_PEER_ID) is -- Remove connected rendezvous, synchronized require Id_valid: a_peer_id /= Void do connected_rdv_lock.acquire_write_lock connected_rdv.remove (a_peer_id) connected_rdv_lock.release_writer_lock rescue if connected_rdv_lock.is_set then connected_rdv_lock.release_writer_lock end end remove_available_rendezvous (an_address: P2P_ENDPOINT_ADDRESS) is -- Remove available rendezvous require Address_valid: an_address /= Void local cursor: DS_LIST_CURSOR [P2P_ENDPOINT_ADDRESS] do cursor := available_rendezvous.new_cursor cursor.start cursor.search_forth (an_address) if not cursor.after then cursor.remove end ensure Address_removed: not available_rendezvous.has (an_address) end end