indexing description: "General implementation of a peer group" license: "MIT license (see ../license.txt)" author: "Beat Strasser " date: "$Date$" revision: "$Revision$" class P2P_GENERIC_PEERGROUP inherit P2P_PEERGROUP redefine init end create init feature {NONE} -- Initialization init (group: P2P_PEERGROUP; an_id: P2P_ID; advertisement: like implementation_advertisement) is -- Initialize module local string_equality_tester: KL_EQUALITY_TESTER [STRING] do Precursor (group, an_id, advertisement) id ?= an_id create modules_hashed.make_default create string_equality_tester modules_hashed.set_key_equality_tester (string_equality_tester) -- get general configuration from parent group cache_manager := group.cache_manager configuration := group.configuration set_job_queue (group.job_queue) -- peer group parent_group := group peer_group := Current group_advertisement := cache_manager.peergroup_advertisement (id) if group_advertisement = Void then logger.info ("generic_peergroup: Creating new peer group advertisement, group id: " + id.out) create group_advertisement.make (id, implementation_advertisement.specification_id) cache_manager.store_peergroup_advertisement (group_advertisement) end -- peer if not configuration.is_reconfigured then peer_advertisement := cache_manager.peer_advertisement (id, configuration.peer_id) end if peer_advertisement = Void then logger.info ("generic_peergroup: Creating new peer advertisement for group id: " + id.out) create peer_advertisement.copy (parent_group.peer_advertisement) peer_advertisement.set_group_id (id) cache_manager.store_peer_advertisement (peer_advertisement) end peer_id := peer_advertisement.peer_id peer_name := peer_advertisement.name -- look for implementation advertisements retrieve_implementation_advertisements -- get service parameters from peer group, with peer service parameters precedence complete_configuration_with_params (configuration, group_advertisement.service_parameters) complete_configuration_with_params (configuration, peer_advertisement.service_parameters) -- get modules_list from parent group and look for disabled modules (isOff) define_modules remove_disabled_modules -- load modules load_modules -- create full startup notification variables create full_startup_mutex.make create full_startup_cv.make end feature -- Services lookup_module (a_name: STRING): P2P_MODULE is -- Look up a registered module under given `a_name' do if modules_hashed.has (a_name) then Result := modules_hashed.item (a_name) elseif parent_group /= Current then Result := parent_group.lookup_module (a_name) end end modules: DS_LINKED_LIST [TUPLE [STRING, P2P_ID, P2P_MODULE_SPECIFICATION_ID, BOOLEAN]] is -- Current list of modules do Result := modules_list.twin end feature -- Basic operations set_job_queue (a_job_queue: like job_queue) is -- Set a job queue used for this group require Job_queue_valid: a_job_queue /= Void do job_queue := a_job_queue ensure Job_queue_set: job_queue = a_job_queue end start_with_arguments (args: ARRAY [STRING]) is -- start peer group and load every module declared in `modules' (if not already started in parent group) local registered_name: STRING ownership: BOOLEAN module: P2P_MODULE temp_status: INTEGER do logger.debugging ("generic_peergroup: Starting peer group module, group id: " + id.out) -- now start everything in the same order as in `modules_list' from modules_list.start temp_status := start_ok until temp_status = start_failed or modules_list.after loop registered_name ?= modules_list.item_for_iteration.item (module_entry_name) ownership ?= modules_list.item_for_iteration.item (module_entry_ownership) -- not handled in parent group? if ownership = current_is_owner or parent_group.lookup_module (registered_name) = Void or parent_group = Current then module := lookup_module (registered_name) if module /= Void then module.start (args) temp_status := module.module_status if temp_status = start_ok then logger.info ("generic_peergroup: Module successfully started, name: " + registered_name + ", group id: " + id.out) else logger.error ("generic_peergroup: Error starting module, name: " + registered_name + ", group id: " + id.out) end else temp_status := start_failed end end modules_list.forth end module_status := temp_status if temp_status = start_ok then logger.debugging ("generic_peergroup: Successfully started peer group module, group id: " + id.out) signal_full_startup_or_failure else -- Couldn't load all modules, so stop all already started modules logger.error ("generic_peergroup: Error starting peer group module, group id: " + id.out) signal_full_startup_or_failure force_modules_action (False) force_modules_action (True) end end when_fully_started: BOOLEAN is -- Is entire peer group successfully started? Wait until entire peer group is started or a startup failure occurred. do if module_status = start_ok then Result := True else full_startup_mutex.lock full_startup_cv.wait (full_startup_mutex) full_startup_mutex.unlock Result := module_status = start_ok end rescue full_startup_mutex.unlock end suspend is -- Suspend module do force_modules_action (False) module_status := suspended end stop is -- stop peer group and stop declared modules require else Start_ok: module_status = start_ok do if module_status = start_ok then suspend end force_modules_action (True) module_status := stop_ok end load_module_by_specification (an_id: P2P_ID; specification_id: P2P_MODULE_SPECIFICATION_ID; a_name: STRING): P2P_MODULE is -- Find module with given `specification_id', initialize it with current group and `id' and when successful, register it with `a_name' local mia: P2P_MODULE_IMPLEMENTATION_ADVERTISEMENT do logger.debugging ("generic_peergroup: Loading module, id: " + an_id.out + ", msid: " + specification_id.out + ", group id: " + id.out) -- find implementation advertisement mia := cache_manager.module_implementation_advertisement (specification_id) -- initialize module if mia /= Void then if ref_net_peergroup_msid.is_equal (specification_id) then create {P2P_GENERIC_PEERGROUP} Result.init (Current, an_id, mia) elseif ref_endpoint_msid.is_equal (specification_id) then create {P2P_ENDPOINT_SERVICE} Result.init (Current, an_id, mia) elseif ref_transport_tcp_msid.is_equal (specification_id) then create {P2P_TCP_TRANSPORT} Result.init (Current, an_id, mia) elseif ref_transport_router_msid.is_equal (specification_id) then create {P2P_ENDPOINT_ROUTER} Result.init (Current, an_id, mia) elseif ref_discovery_msid.is_equal (specification_id) then create {P2P_DISCOVERY_SERVICE} Result.init (Current, an_id, mia) elseif ref_rendezvous_msid.is_equal (specification_id) then create {P2P_RENDEZVOUS_SERVICE} Result.init (Current, an_id, mia) elseif ref_resolver_msid.is_equal (specification_id) then create {P2P_RESOLVER_SERVICE} Result.init (Current, an_id, mia) else -- not a standard module, try to load extern module logger.debugging ("generic_peergroup: Trying to load extern module, id: " + an_id.out + ", name: " + a_name + ", msid: " + specification_id.out + ", group id: " + id.out) Result := load_extern_module (an_id, mia, a_name) end else logger.error ("generic_peergroup: Module implementation advertisement not found, msid: " + specification_id.out + ", group id: " + id.out) end -- register module if Result /= Void and Result.module_status /= init_failed then register_module (a_name, Result) logger.debugging ("generic_peergroup: Loaded module, id: " + an_id.out + ", msid: " + specification_id.out + ", name: " + a_name + ", group id: " + id.out) end end feature {NONE} -- Implementation Current_is_owner: BOOLEAN is True Parent_is_owner_if_available: BOOLEAN is False modules_list: like modules -- Static list of modules needed/advertised in current peer group (registered_name, assigned_id, msid, ownership) Module_entry_name: INTEGER is 1 Module_entry_id: INTEGER is 2 Module_entry_msid: INTEGER is 3 Module_entry_ownership: INTEGER is 4 modules_hashed: DS_HASH_TABLE [P2P_MODULE, STRING] -- List of registered modules (initialized or started modules) handled by current peer group full_startup_mutex: MUTEX full_startup_cv: CONDITION_VARIABLE signal_full_startup_or_failure is -- Signal full startup for all waiting threads require Fully_started_or_failure: module_status = start_ok or module_status = start_failed do full_startup_cv.broadcast end force_modules_action (should_stop: BOOLEAN) is -- Suspend/stop declared modules. Stop when `should_stop', else suspend. require Status_valid: module_status = start_ok or module_status = start_failed or module_status = suspended local action_name: STRING registered_name: STRING ownership: BOOLEAN module: P2P_MODULE do if should_stop then action_name := "Stopp" else action_name := "Suspend" end logger.debugging ("generic_peergroup: " + action_name + "ing peer group module, group id: " + id.out) -- suspend/stop modules in the reverse order as in `modules_list' from modules_list.finish until modules_list.before loop registered_name ?= modules_list.item_for_iteration.item (module_entry_name) ownership ?= modules_list.item_for_iteration.item (module_entry_ownership) -- not handled in parent group? if ownership = current_is_owner or parent_group.lookup_module (registered_name) = Void or parent_group = Current then module := lookup_module (registered_name) if module /= Void then if not should_stop and module.module_status = start_ok then module.suspend logger.info ("generic_peergroup: " + action_name + "ed module, name: " + registered_name + ", group id: " + id.out) elseif should_stop then if module.module_status = suspended then module.stop logger.info ("generic_peergroup: " + action_name + "ed module, name: " + registered_name + ", group id: " + id.out) end unregister_module (registered_name) end end end modules_list.back end if should_stop then -- destroy full startup mutex/cv full_startup_cv.destroy full_startup_mutex.destroy end logger.debugging ("generic_peergroup: " + action_name + "ed peer group module, group id: " + id.out) end register_module (a_name: STRING; a_module: P2P_MODULE) is -- Register module under given name require Name_valid: a_name /= Void and lookup_module (a_name) = Void Module_valid: a_module /= Void local class_id: P2P_MODULE_CLASS_ID do modules_hashed.force (a_module, a_name) class_id ?= a_module.module_id if class_id /= Void then update_module_reference (class_id, a_module) end ensure Module_registered: lookup_module (a_name) = a_module end unregister_module (a_name: STRING) is -- Unregister module with given `a_name' require Name_valid: a_name /= Void and lookup_module (a_name) /= Void local class_id: P2P_MODULE_CLASS_ID do class_id ?= modules_hashed.item (a_name).module_id if class_id /= Void then update_module_reference (class_id, Void) end modules_hashed.remove (a_name) ensure Module_unregistered: lookup_module (a_name) = Void end define_modules is -- Add modules from parent group to `module_list' using the parent groups instances. do logger.debugging ("generic_peergroup: Inheriting module list from parent group, current group id: " + id.out) modules_list := parent_group.modules -- change ownership here to parent from modules_list.start until modules_list.after loop modules_list.item_for_iteration.item (module_entry_ownership) := parent_is_owner_if_available modules_list.forth end ensure Modules_set: modules_list /= Void end retrieve_implementation_advertisements is -- Retrieve module implementation advertisements from peer group implementation advertisement local mia: P2P_MODULE_IMPLEMENTATION_ADVERTISEMENT params, module, adv_root: XM_ELEMENT do logger.debugging ("generic_peergroup: Retrieving module implementation advertisements from module, group id: " + id.out) params := implementation_advertisement.parameter.document.document.root_element from params.start until params.after loop module ?= params.item_for_iteration if module /= Void and module.name.is_equal ("Svc") or module.name.is_equal ("Proto") or module.name.is_equal ("App") then adv_root := module.element_by_qualified_name (namespace_jxta.uri, "MIA") if adv_root /= Void then create mia.make_from_element (adv_root) if mia.is_valid and not cache_manager.has_module_implementation_advertisement (mia.specification_id) then cache_manager.store_module_implementation_advertisement (mia) logger.debugging ("generic_peergroup: Cached retrieved module implementation advertisement, group id: " + id.out + ", msid: " + mia.specification_id.out) end end end params.forth end end load_modules is -- Load all modules given in `modules_list' local registered_name: STRING ownership: BOOLEAN assigned_id: P2P_ID spec_id: P2P_MODULE_SPECIFICATION_ID module: P2P_MODULE do logger.debugging ("generic_peergroup: Loading modules, group id: " + id.out + ", number: " + modules_list.count.out) from modules_list.start until module_status = init_failed or modules_list.after loop registered_name ?= modules_list.item_for_iteration.item (module_entry_name) ownership ?= modules_list.item_for_iteration.item (module_entry_ownership) -- not handled in parent group? if ownership = current_is_owner or parent_group.lookup_module (registered_name) = Void then assigned_id ?= modules_list.item_for_iteration.item (module_entry_id) spec_id ?= modules_list.item_for_iteration.item (module_entry_msid) module := load_module_by_specification (assigned_id, spec_id, registered_name) if module = Void or module.module_status = init_failed then module_status := init_failed logger.error ("generic_peergroup: Error loading module, name: " + registered_name + ", id: " + assigned_id.out + ", msid: " + spec_id.out) else logger.info ("generic_peergroup: Module loaded, name: " + registered_name + ", id: " + assigned_id.out + ", msid: " + spec_id.out) end else -- share module instance with parent group register_module (registered_name, parent_group.lookup_module (registered_name)) logger.info ("generic_peergroup: Sharing module instance with parent group, name: " + registered_name) end modules_list.forth end if module_status /= init_failed then logger.debugging ("generic_peergroup: Successfully loaded all modules, group id: " + id.out) end end update_module_reference (an_id: P2P_MODULE_CLASS_ID; a_module: P2P_MODULE) is -- Update module reference require Id_set: an_id /= Void do if endpoint_mcid.is_equal (an_id) then endpoint_service ?= a_module elseif discovery_mcid.is_equal (an_id) then discovery_service ?= a_module elseif rendezvous_mcid.is_equal (an_id) then rendezvous_service ?= a_module elseif resolver_mcid.is_equal (an_id) then resolver_service ?= a_module end end complete_configuration_with_params (conf_adv: P2P_CONFIGURATION; params: DS_SPARSE_TABLE [P2P_XML_CACHE, P2P_MODULE_CLASS_ID]) is -- Write each service parameter from `params' to `conf_adv' service parameters require Configuration_valid: conf_adv /= Void Params_valid: params /= Void local mcid: P2P_MODULE_CLASS_ID param: P2P_XML_CACHE do from params.start until params.after loop mcid ?= params.key_for_iteration param ?= params.item_for_iteration if not conf_adv.has_service_parameter (mcid) then conf_adv.add_service_parameter (mcid, param) logger.debugging ("generic_peergroup: Added service parameter to configuration, mcid: " + mcid.out) end params.forth end end remove_disabled_modules is -- Remove modules from `module_list' that are disabled in configuration require Modules_set: modules_list /= Void local params: DS_SPARSE_TABLE_CURSOR [P2P_XML_CACHE, P2P_MODULE_CLASS_ID] mcid: P2P_MODULE_CLASS_ID param: P2P_XML_CACHE do from params := configuration.service_parameters.new_cursor params.start until params.after loop mcid ?= params.key param ?= params.item if param.document.element_by_name ("isOff") /= Void then remove_module (mcid) end params.forth end end remove_module (mcid: P2P_MODULE_CLASS_ID) is -- Remove module with given `mcid' from `modules_list' require Id_valid: mcid /= Void Modules_set: modules_list /= Void local msid: P2P_MODULE_SPECIFICATION_ID found: BOOLEAN do from modules_list.start until found or modules_list.after loop msid ?= modules_list.item_for_iteration.item (module_entry_msid) if msid.is_of_baseclass (mcid) then modules_list.remove_at found := True end if not modules_list.after then modules_list.forth end end end load_extern_module (an_id: P2P_ID; a_mia: P2P_MODULE_IMPLEMENTATION_ADVERTISEMENT; a_name: STRING): P2P_MODULE is -- Load extern module require Id_valid: an_id /= Void and an_id.is_valid Mia_valid: a_mia /= Void and a_mia.is_valid Name_valid: a_name /= Void and not a_name.is_empty do end end