indexing description: "Client version of the origo message service." author: "Patrick Ruckstuhl " date: "$Date$" revision: "$Revision$" class O_P2P_MODULE_CLIENT inherit O_P2P_MODULE redefine process_rendezvous_event, init end THREAD_CONTROL export {NONE} all end create init feature {NONE} -- Initialization init (a_group: like peer_group; an_id: like module_id; an_advertisement: like implementation_advertisement) is -- Initialize do Precursor (a_group, an_id, an_advertisement) create core_peers.make (5) create core_peers_lock.make end feature -- Basic operation send_message_core (a_message: O_MESSAGE): BOOLEAN is -- Send a_message to the core, return true on success. require a_message_ok: a_message /= Void local l_cnt: INTEGER l_rnd_dbl: DOUBLE l_rnd: INTEGER do core_peers_lock.acquire_read_lock l_cnt := core_peers.count -- if we don't have any cores, discover them if l_cnt = 0 then core_peers_lock.release_reader_lock discover_core core_peers_lock.acquire_read_lock l_cnt := core_peers.count end from until Result or l_cnt = 0 loop -- choose a random core l_rnd_dbl := rand.double_item rand.forth -- we got a random number between 0 and 1, convert it to be between 1 and l_cnt l_rnd := 1 + (l_rnd_dbl * (l_cnt-1)).rounded -- send message Result := send_message_peer (a_message, core_peers.i_th (l_rnd)) -- on failure, remove the core we tried if not Result then core_peers_lock.release_reader_lock core_peers_lock.acquire_write_lock core_peers.go_i_th (l_rnd) core_peers.remove core_peers_lock.release_writer_lock core_peers_lock.acquire_read_lock l_cnt := core_peers.count end end core_peers_lock.release_reader_lock end feature {NONE} -- Message transport process_core_discovery_response (a_response: P2P_DISCOVERY_RESPONSE) is -- Process core discovery response local l_adv, l_own_adv: P2P_PEER_ADVERTISEMENT l_responses: DS_LIST [P2P_ADVERTISEMENT] do l_responses := a_response.responses if l_responses /= Void then from l_responses.start until l_responses.after loop l_adv ?= l_responses.item_for_iteration check advertisement: l_adv /= Void end origo_logger.info ("Core discovery response from "+l_adv.name+" "+l_adv.peer_id.out) -- publish received adv locally discovery.publish_advertisement_locally (l_adv) -- publish our adv to the core l_own_adv := peer_group.peer_advertisement -- lasts for 30 minutes l_own_adv.set_expiration_time (30*60*1000) discovery.publish_advertisement_remotely (l_own_adv, l_adv.peer_id) -- store peer id core_peers_lock.acquire_write_lock core_peers.force (l_adv.peer_id) core_peers_lock.release_writer_lock l_responses.forth end end end process_rendezvous_event (an_event: P2P_RENDEZVOUS_EVENT) is -- Discover core peers as soon as we're rdv or have rdv connection do if an_event.type = {P2P_RENDEZVOUS_EVENT}.type_connected_to_rendezvous or an_event.type = {P2P_RENDEZVOUS_EVENT}.type_became_rendezvous or an_event.type = {P2P_RENDEZVOUS_EVENT}.type_reconnected_to_rendezvous then discover_core end end feature -- Auxiliary discover_core is -- Send core discovery queries require Network_connected: peer_group.rendezvous_service.is_connected local query: P2P_DISCOVERY_QUERY do origo_logger.info ("Core discovery") -- Clear old core peers core_peers_lock.acquire_write_lock core_peers.wipe_out core_peers_lock.release_writer_lock -- create discovery query create query.make (discovery.type_peer) query.set_threshold (10) query.set_restriction ("Name", core_peer_name) -- send discovery query logger.info ("p2p_module: Sending core peer discovery request, group id: " + peer_group.id.out) discovery.query_remote_advertisements (query, Void, agent process_core_discovery_response) end feature -- Access core_peers: ARRAYED_LIST [P2P_PEER_ID] -- Core peers. core_peers_lock: READ_WRITE_LOCK -- Protection for core_peers feature {NONE} -- Implementation rand: RANDOM is -- Random number generator. once -- we don't seed it, as the default seed is good enough for our purpose create Result.make end invariant core_peers_not_void: core_peers /= Void core_peers_lock_not_void: core_peers_lock /= Void end