indexing description: "Base class for the origo message service." author: "Patrick Ruckstuhl " date: "$Date$" revision: "$Revision$" deferred class O_P2P_MODULE inherit P2P_MODULE redefine init end O_CONSTANTS P2P_CREATORS_SHARED O_SHARED_MESSAGE_FACTORY O_SHARED_LOGGERS EXCEPTIONS feature {NONE} -- Initialization init (a_group: like peer_group; an_id: like module_id; an_advertisement: like implementation_advertisement) is -- Initialize module do Precursor (a_group, an_id, an_advertisement) discovery := peer_group.discovery_service endpoint := peer_group.endpoint_service endpoint_service_name := module_id.out create ack_waits.make (100) create ack_waits_mutex.make create received_set.make_equal (received_window_size) create received_queue.make create received_mutex.make create catch_all_message_handlers create message_handlers.make (5) create reply_message_handlers.make (20) create reply_message_handlers_mutex.make -- prepare worker threads create worker_jobs.make create worker_pool.make (worker_pool_size, worker_jobs) end feature -- Access endpoint_service_name: STRING -- Used service name for endpoint messages feature -- Basic operation send_message_peer (a_message: O_MESSAGE; a_peer: P2P_PEER_ID): BOOLEAN is -- Send a_message to a_peer. Return true if the sending was successful. require a_message_not_void: a_message /= Void a_peer_ok: a_peer /= Void local l_address: P2P_ENDPOINT_ADDRESS l_ack_mutex: MUTEX l_cv: CONDITION_VARIABLE l_retry: NATURAL do -- register the reply handler if a_message.reply_handler /= Void then reply_message_handlers_mutex.lock reply_message_handlers.force (a_message.reply_handler, a_message.id) reply_message_handlers_mutex.unlock end -- set sending peer a_message.set_peer (peer_group.peer_id) a_message.set_peer_name (peer_group.peer_name) -- create a CONDITION_VARIABLE to wait on the ack create l_cv.make create l_ack_mutex.make ack_waits_mutex.lock ack_waits.force ([l_cv, l_ack_mutex], a_message.id) ack_waits_mutex.unlock from until Result or l_retry > send_retries loop -- send message create l_address.make_with_id (a_peer, endpoint_service_name, Void) l_ack_mutex.lock if origo_logger.is_enabled_for (debug_p) then origo_logger.debugging ("Sending "+a_message.id.out+" "+a_message.namespace+"::"+a_message.type+" to peer "+a_peer.out+" retry "+l_retry.out) end endpoint.send_message (l_address, a_message.p2p_message) origo_logger.debugging ("Sent") -- wait on ack Result := l_cv.wait_with_timeout (l_ack_mutex, send_ack_timeout) l_ack_mutex.unlock l_retry := l_retry + 1 end ensure message_peer_updated: a_message.peer = peer_group.peer_id and a_message.peer_name = peer_group.peer_name end send_message_reply (a_message, a_original_message: O_MESSAGE): BOOLEAN is -- Send a_message as a reply to a_original_message. Return true if the sending was successful. require a_message_ok: a_message /= Void a_original_message_ok: a_original_message /= Void do check peer_set: a_original_message.peer /= Void end a_message.set_reply_id (a_original_message.id) Result := send_message_peer (a_message, a_original_message.peer) end feature -- Update register_catch_all_message_handler (a_handler: PROCEDURE [ANY, TUPLE [O_MESSAGE]]) is -- Register a catch all message handler that receives all received messages. require a_handler_ok: a_handler /= Void do catch_all_message_handlers.force (a_handler) end register_message_handler (a_namespace, a_type: STRING; a_handler: PROCEDURE [ANY, TUPLE [O_MESSAGE]]) is -- Register a message handler to deal with a_type in a_namespace. require a_namespace_ok: a_namespace /= Void and then not a_namespace.is_empty a_type_ok: a_type /= Void and then not a_type.is_empty a_handler_ok: a_handler /= Void local l_message_handlers: HASH_TABLE [PROCEDURE [ANY, TUPLE [O_MESSAGE]], STRING] do l_message_handlers := message_handlers.item (a_namespace) if l_message_handlers = Void then create l_message_handlers.make (5) message_handlers.force (l_message_handlers, a_namespace) end l_message_handlers.force (a_handler, a_type) end feature {NONE} -- Message Transport process_message (a_msg: P2P_MESSAGE; a_source, a_destination: P2P_ENDPOINT_ADDRESS) is -- Process incoming endpoint message require Message_valid: a_msg /= Void Source_valid: a_source /= Void Destination_valid: a_destination /= Void and a_destination.service_name.is_equal (endpoint_service_name) local l_ack_msg: P2P_MESSAGE_ELEMENT l_id: UUID l_ack: P2P_MESSAGE l_msg: O_MESSAGE l_address: P2P_ENDPOINT_ADDRESS l_rclck: TUPLE [condition: CONDITION_VARIABLE; mutex: MUTEX] do -- was it an ack message? l_ack_msg := a_msg.element_by_namespace_and_name (message_namespace, ack_element) if l_ack_msg /= Void then -- if someone is waiting on the ack, signal and remove condition_variable create l_id.make_from_string (l_ack_msg.content) ack_waits_mutex.lock if origo_logger.is_enabled_for (debug_p) then origo_logger.debugging ("Received ack "+l_id.out) end l_rclck := ack_waits.item (l_id) if l_rclck /= Void then -- lock the mutex, we are not allowed to enter this before the sender thread waits l_rclck.mutex.lock l_rclck.condition.signal ack_waits.remove (l_id) l_rclck.mutex.unlock end ack_waits_mutex.unlock -- normal message else -- get message l_msg := message_factory.new_message_from_p2p_message (a_msg) if message_factory.is_error then origo_logger.error ("Message factory error with message: "+a_msg.out) else if origo_logger.is_enabled_for (debug_p) then origo_logger.debugging ("Message received "+l_msg.id.out+" "+l_msg.namespace+"::"+l_msg.type+" from peer "+l_msg.peer.out+" "+l_msg.peer_name) end -- send ack create l_ack.make l_ack.extend (create {P2P_MESSAGE_ELEMENT}.make_string (message_namespace, ack_element, Void, l_msg.id.out)) create l_address.make_with_id (l_msg.peer, endpoint_service_name, Void) endpoint.send_message (l_address, l_ack) if origo_logger.is_enabled_for (debug_p) then origo_logger.debugging ("Ack sent "+l_msg.id.out) end -- if we did not yet forward this message, forward it and mark it as forwarded l_id := l_msg.id received_mutex.lock if not received_set.has (l_id) then received_set.put (l_id) received_queue.put (l_id) -- if we have reached the queue limit, remove the oldest entry if received_queue.count >= received_window_size then received_set.remove (received_queue.item) received_queue.remove end -- add a new job to the worker_job queue which forwards the message worker_jobs.force (agent (aa_msg: O_MESSAGE) local l_retried: BOOLEAN ll_id: UUID l_handlers: HASH_TABLE [PROCEDURE [ANY, TUPLE [O_MESSAGE]], STRING] l_reply_handler: PROCEDURE [ANY, TUPLE [O_MESSAGE]] do if not l_retried then -- if a reply handler is set, call it, otherwise call the normal handlers ll_id := aa_msg.reply_id reply_message_handlers_mutex.lock l_reply_handler := reply_message_handlers.item (ll_id) if l_reply_handler /= Void then reply_message_handlers.remove (ll_id) reply_message_handlers_mutex.unlock l_reply_handler.call ([aa_msg]) else reply_message_handlers_mutex.unlock catch_all_message_handlers.call ([aa_msg]) l_handlers := message_handlers.item (aa_msg.namespace) if l_handlers /= Void then if l_handlers.has_key (aa_msg.type) then l_handlers.found_item.call ([aa_msg]) end end end end rescue l_retried := True node_logger.error ("Exception: "+exception_trace) retry end (l_msg)) end received_mutex.unlock end end end process_discovery_response (a_response: P2P_DISCOVERY_RESPONSE) is -- Process discovery response require Response_valid: a_response /= Void and a_response.is_valid do end process_rendezvous_event (an_event: P2P_RENDEZVOUS_EVENT) is -- Discover core peers as soon as we're rdv or have rdv connection require Event_valid: an_event /= Void do end feature -- Module operations start (args: ARRAY [STRING_8]) -- Start module do if module_status = initializing then -- Register endpoint message handler endpoint.extend_service (endpoint_service_name, Void, agent process_message) -- Register discovery response handler discovery.extend_response_listener (agent process_discovery_response) end -- Register rendezvous event handler peer_group.rendezvous_service.extend_rendezvous_event_handler (agent process_rendezvous_event) module_status := start_ok -- Start the worker threads worker_pool.start end suspend -- Suspend service do -- Unregister rendezvous event handler peer_group.rendezvous_service.prune_rendezvous_event_handler (Current) module_status := suspended end stop -- Stop module in any case do -- Unregister discovery response handler discovery.remove_response_listener (Current) -- Unregister endpoint message handler endpoint.prune_service (endpoint_service_name, Void) -- Stop the worker threads worker_pool.stop module_status := stop_ok end feature {NONE} -- Implementation worker_jobs: JOB_QUEUE -- Jobs to execute in the worker threads. worker_pool: WORKER_POOL -- Worker pool that works on worker_jobs. discovery: P2P_DISCOVERY_SERVICE endpoint: P2P_ENDPOINT_SERVICE check_dependencies (a_parent: P2P_PEERGROUP): BOOLEAN -- Are all needed dependencies met? do Result := a_parent.endpoint_service /= Void and a_parent.discovery_service /= Void and a_parent.rendezvous_service /= Void end ack_waits: HASH_TABLE [TUPLE [condition: CONDITION_VARIABLE; mutex: MUTEX], UUID] -- Table for message uuids that wait on acks. ack_waits_mutex: MUTEX -- Mutex protection for ack_waits. received_set: DS_HASH_SET [UUID] -- Table with uuids of received messages. received_queue: DS_LINKED_STACK [UUID] -- UUID of received messages, used to remove old entries from received_table. received_mutex: MUTEX -- Mutex protection for received_(set|queue) catch_all_message_handlers: ACTION_SEQUENCE [TUPLE [O_MESSAGE]] -- Message handlers that are always called. message_handlers: HASH_TABLE [HASH_TABLE [PROCEDURE [ANY, TUPLE [O_MESSAGE]], STRING], STRING] -- Message handlers for a type in a namespace. reply_message_handlers: HASH_TABLE [PROCEDURE [ANY, TUPLE [O_MESSAGE]], UUID] -- Message handlers that wait on a reply of a certain message. reply_message_handlers_mutex: MUTEX -- Lock for reply_message_handlers. feature {NONE} -- Constants worker_pool_size: NATURAL_16 is 32 -- Number of worker threads. invariant ack_waits_not_void: ack_waits /= Void ack_waits_mutex_not_void: ack_waits_mutex /= Void received_set_not_void: received_set /= Void received_queue_not_void: received_queue /= Void received_mutex_not_void: received_mutex /= Void catch_all_message_handlers_not_void: catch_all_message_handlers /= Void message_handlers_not_void: message_handlers /= Void reply_message_handlers_not_void: reply_message_handlers /= Void reply_message_handlers_mutex_not_void: reply_message_handlers_mutex /= Void end