indexing description: "TCP transport module" license: "MIT license (see ../../../license.txt)" author: "Beat Strasser " date: "$Date$" revision: "$Revision$" class P2P_TCP_TRANSPORT inherit P2P_MESSAGE_SENDER_TRANSPORT redefine init, start, suspend, stop end P2P_MESSAGE_RECEIVER_TRANSPORT redefine init, start, suspend, stop end THREAD export {NONE} all end P2P_CREATORS_SHARED EXCEPTIONS export {NONE} all end create init feature {NONE} -- Initialization init (group: P2P_PEERGROUP; id: P2P_ID; advertisement: like implementation_advertisement) is -- Initialize module local mcid: P2P_MODULE_CLASS_ID do create connections.make (initial_connections_size) create connections_lock.make create outgoing_queue.make create outgoing_queue_lock.make create outgoing_queue_cv.make Precursor (group, id, advertisement) -- get configuration parameters from advertisement mcid ?= id configuration ?= group.configuration.service_parameter (mcid) if configuration /= Void and configuration.is_valid then logger.debugging ("tcp_transport: TCP configuration valid, group id: " + peer_group.id.out) else -- Failure in configuration. Cannot start. module_status := init_failed logger.error ("tcp_transport: No or invalid tcp advertisement provided in configuration, group id: " + peer_group.id.out) end end feature -- Access name: STRING is "tcp" -- Protocol name Multicast_ip: STRING is "224.0.1.85" Multicast_port: INTEGER is 1234 Multicast_max_packet_size: INTEGER is 16384 Wire_message_format_mimetype: STRING is "application/x-jxta-msg" -- Mimetype of wire message format Wire_message_version: INTEGER is 1 -- Version of wire message format running_connection (address: STRING): P2P_TCP_CONNECTION is -- Currently registered connection for this `address', e.g. tcp://123.123.123.123:9700 (no host name) require Status_started: module_status = start_ok Address_existent: address /= Void do connections_lock.acquire_read_lock Result := connections.item (address) connections_lock.release_reader_lock rescue connections_lock.release_reader_lock end feature {P2P_TCP_CONNECTION} -- Internal access Job_queue_tag: STRING is "vampeer_transport_tcp" feature -- Element Change extend_connection (connection: P2P_TCP_CONNECTION; address: STRING) is -- Register `connection' with given `address', e.g. tcp://123.123.123.123:9700 (no host name) require Status_started: module_status = start_ok Connection_valid: connection /= Void Address_existent: address /= Void local old_connection: like connection do connections_lock.acquire_write_lock connections.force (connection, address) if connections.found then old_connection := connections.found_item connections_lock.release_writer_lock logger.debugging ("tcp_transport: Registered renewed connection to address: " + address + ", group id: " + peer_group.id.out) old_connection.close else connections_lock.release_writer_lock logger.debugging ("tcp_transport: Registered new connection to address: " + address + ", group id: " + peer_group.id.out) end rescue connections_lock.release_writer_lock end feature -- Removal prune_connection (connection: P2P_TCP_CONNECTION; address: STRING) is -- Unregister given `connection' with `address', e.g. tcp://123.123.123.123:9700 (no host name) require Status_started: module_status = start_ok Connection_valid: connection /= Void Address_existent: address /= Void do connections_lock.acquire_write_lock if connections.item (address) = connection then connections.remove (address) logger.debugging ("tcp_transport: Unregistered connection to address: " + address + ", group id: " + peer_group.id.out) end connections_lock.release_writer_lock rescue connections_lock.release_writer_lock end feature -- Module operations start (args: ARRAY [STRING]) is -- start module do logger.debugging ("tcp_transport: Starting module, id: " + module_id.out + ", group id: " + peer_group.id.out) -- prepare for starting module_status := initializing qm_status := queue_manager_running if connections.count > 0 then create connections.make (initial_connections_size) end if outgoing_queue.count > 0 then outgoing_queue_lock.lock create outgoing_queue.make outgoing_queue_lock.unlock end -- start server create_server if server /= Void and server.is_running then -- start server thread (waiting for connections) server.launch logger.debugging ("tcp_transport: TCP server successfully started, group id: " + peer_group.id.out) -- start queue manager launch logger.debugging ("tcp_transport: Outgoing queue manager thread launched, group id: " + peer_group.id.out) -- register transport, mark module as started Precursor (args) else -- can't launch tcp server, fail logger.error ("tcp_transport: Error starting TCP server, group id: " + peer_group.id.out) module_status := start_failed end end suspend is -- stop module and wait until all child threads are finished local conn_list: ARRAYED_LIST [P2P_TCP_CONNECTION] do logger.debugging ("tcp_transport: Stopping module, id: " + module_id.out + ", group id: " + peer_group.id.out) -- terminate our queue manager thread qm_status := queue_manager_stopping outgoing_queue_cv.signal -- notify queue manager to stop logger.debugging ("tcp_transport: Signaled queue manager thread to stop, group id: " + peer_group.id.out) -- stop server server.shutdown -- stop connections logger.debugging ("tcp_transport: Closing all open connections, group id: " + peer_group.id.out) connections_lock.acquire_read_lock conn_list := connections.linear_representation connections_lock.release_reader_lock conn_list.do_all (agent (c: P2P_TCP_CONNECTION) do c.close end) -- wait for child threads join -- queue manager thread logger.debugging ("tcp_transport: Outgoing queue manager stopped, group id: " + peer_group.id.out) server.join -- server server := Void logger.debugging ("tcp_transport: TCP server stopped, group id: " + peer_group.id.out) peer_group.job_queue.join_jobs_for_tag (job_queue_tag) logger.debugging ("tcp_transport: All connection lifecycle handling jobs stopped, group id: " + peer_group.id.out) -- mark module as suspended Precursor logger.debugging ("tcp_transport: Module successfully suspended, id: " + module_id.out + ", group id: " + peer_group.id.out) rescue connections_lock.release_reader_lock end stop is -- Stop module do -- unregister transport, mark module as stopped Precursor -- Destroy data structures outgoing_queue_lock.destroy outgoing_queue_cv.destroy connections_lock.destroy logger.debugging ("tcp_transport: Module successfully stopped, id: " + module_id.out + ", group id: " + peer_group.id.out) end feature -- Basic operations process_incoming_message (message: P2P_MESSAGE) is -- Process incoming message (no queueing, directly call demuxing routines from upper layers) local failed: BOOLEAN do if not failed then logger.debugging ("tcp_transport: Processing incoming message, group id: " + peer_group.id.out) endpoint_service.demux (message) else logger.error ("tcp_transport: Error processing incoming message, group id: " + peer_group.id.out) end rescue -- catch all failures from called modules/services during processing failed := True logger.error ("tcp_transport: Exception while processing incoming message, trace:%N" + exception_trace) retry end ping (address: P2P_ENDPOINT_ADDRESS): BOOLEAN is -- Ping a peer local resolved_address: like address result_store: STRING result_lock: MUTEX result_cv: CONDITION_VARIABLE do -- Resolve host name to ip, if necessary resolved_address := address.address_with_resolved_host_name if resolved_address = Void or resolved_address.has_ipv6_address then logger.debugging ("tcp_transport: Invalid host name, address: " + address.out + ", group id: " + peer_group.id.out) Result := False elseif not has_running_connection (resolved_address.out_without_service) then logger.debugging ("tcp_transport: Pinging peer with new connection, address: " + resolved_address.out + ", group id: " + peer_group.id.out) -- create mutex and condition variable create result_lock.make create result_cv.make -- add request for opening connection to outgoing queue result_lock.lock create result_store.make_empty outgoing_queue_extend (resolved_address, Void, agent (lock: MUTEX; cv: CONDITION_VARIABLE; store: STRING; an_address: P2P_ENDPOINT_ADDRESS; success: BOOLEAN) do lock.lock if success then store.append_character ('1') end lock.unlock cv.signal end (result_lock, result_cv, result_store, ?, ?)) -- now wait for signaled completed request result_cv.wait (result_lock) Result := result_store.count > 0 -- clean up threading stuff result_lock.unlock result_cv.destroy result_lock.destroy logger.debugging ("tcp_transport: Pinged peer with new connection, address: " + resolved_address.out + ", group id: " + peer_group.id.out + ", responded: " + Result.out) else -- we have an open connection to this address, so we don't test Result := True logger.debugging ("tcp_transport: Peer is up (we have an open connection to it), address: " + resolved_address.out + ", group id: " + peer_group.id.out) end end has_open_connection (address: P2P_ENDPOINT_ADDRESS): BOOLEAN is -- Does open connection to peer with given `address' exist? local resolved_address: like address do -- Resolve host name to ip, if necessary resolved_address := address.address_with_resolved_host_name if resolved_address /= Void and not resolved_address.has_ipv6_address then Result := has_running_connection (resolved_address.out_without_service) else logger.debugging ("tcp_transport: Invalid host name, address: " + address.out + ", group id: " + peer_group.id.out) end end propagate (message: P2P_MESSAGE; service_name, service_parameter: STRING) is -- Send broadcast message through IP multicast, if enabled local el_source_addr, el_dest_addr: P2P_MESSAGE_ELEMENT multicast_address: P2P_ENDPOINT_ADDRESS wire_header: P2P_TCP_HEADER wire_body: P2P_WIRE_MESSAGE socket: NETWORK_DATAGRAM_SOCKET do if configuration.multicast_enabled then if service_parameter /= Void then logger.debugging ("tcp_transport: Propagating message to service: " + service_name.out + ", param: " + service_parameter.out + ", group id: " + peer_group.id.out) else logger.debugging ("tcp_transport: Propagating message to service: " + service_name.out + ", group id: " + peer_group.id.out) end -- add source/destination data to message create el_source_addr.make_string (message.namespace_jxta, endpoint_service.endpoint_source_address_element_name, Void, local_address.out_without_service) message.replace (el_source_addr) create multicast_address.make_with_service (name, multicast_ip + ":" + multicast_port.out, service_name, service_parameter) create el_dest_addr.make_string (message.namespace_jxta, endpoint_service.endpoint_destination_address_element_name, Void, multicast_address.out) message.replace (el_dest_addr) -- body wire_body := wire_message_creator.create_from_message (wire_message_creator.mimetype_binary, message) wire_body.send_to_buffer (wire_message_version) -- header create wire_header.make wire_header.set_content_length (wire_body.buffer_length) wire_header.set_content_type (wire_message_format_mimetype) wire_header.replace_header (multicast_source_endpoint_address_header_name, local_address.out) wire_header.send_to_buffer -- send datagram if Multicast_jxta_header.count + wire_header.buffer_length + wire_body.buffer_length < multicast_max_packet_size then logger.debugging ("tcp_transport: Creating multicast client socket, ip: " + multicast_ip + ", port: " + multicast_port.out + ", group id: " + peer_group.id.out) create socket.make_targeted_to_ip (multicast_ip, multicast_port) -- TODO change to non blocking as soon as eiffel net supports non blocking calls socket.set_blocking socket.put_string (Multicast_jxta_header + wire_header.buffer + wire_body.buffer) socket.set_non_blocking socket.cleanup logger.debugging ("tcp_transport: Successfully sent multicast packet, group id: " + peer_group.id.out) else logger.error ("tcp_transport: Multicast packet size oversized, service: " + service_name.out + ", group id: " + peer_group.id.out) end end end send_message (address: P2P_ENDPOINT_ADDRESS; message: P2P_MESSAGE) is -- Queues message for a specific peer local resolved_address: like address el_source_addr, el_dest_addr: P2P_MESSAGE_ELEMENT do resolved_address := address.address_with_resolved_host_name if resolved_address /= Void and not resolved_address.has_ipv6_address then logger.debugging ("tcp_transport: Sending message to address: " + resolved_address.out + ", group id: " + peer_group.id.out) -- add source/destination data to message create el_source_addr.make_string (message.namespace_jxta, endpoint_service.endpoint_source_address_element_name, Void, local_address.out_without_service) message.replace (el_source_addr) create el_dest_addr.make_string (message.namespace_jxta, endpoint_service.endpoint_destination_address_element_name, Void, resolved_address.out) message.replace (el_dest_addr) -- add message to outgoing queue outgoing_queue_extend (resolved_address, message, Void) logger.debugging ("tcp_transport: Outgoing message queued, address: " + resolved_address.out + ", group id: " + peer_group.id.out) yield else logger.error ("tcp_transport: Invalid host name, address: " + address.out + ", group id: " + peer_group.id.out) end end handle_incoming_connection (socket: NETWORK_STREAM_SOCKET) is -- Handle incoming connection require Status_started: module_status = start_ok Socket_existent: socket /= Void local connection: P2P_TCP_CONNECTION failed: BOOLEAN do if not failed then -- Connection will enqueue a new job for its lifecycle handling -- Cannot register now as we don't know exactly the remotes public endpoint address logger.debugging ("tcp_transport: Receiving new incoming connection, initializing, group id: " + peer_group.id.out) create connection.make_from_socket (Current, socket) end rescue failed := True logger.error ("tcp_transport: Exception while handling incoming connection, trace:%N" + exception_trace) retry end feature {NONE} -- Implementation execute is -- Outgoing message queue managing thread local address: P2P_ENDPOINT_ADDRESS message: P2P_MESSAGE connection_trials: INTEGER result_handler: PROCEDURE [ANY, TUPLE [P2P_ENDPOINT_ADDRESS, BOOLEAN]] connection: P2P_TCP_CONNECTION wait_step: INTEGER should_stop: BOOLEAN do logger.debugging ("tcp_transport: Queue manager thread starting, group id: " + peer_group.id.out) -- loop until module stopped from until qm_status = queue_manager_stopping loop -- loop through all outgoing messages in queue from outgoing_queue_lock.lock if outgoing_queue.count = 0 then -- wait until someone puts messages in the queue or we should stop logger.debugging ("tcp_transport: Queue manager waiting for new messages to send, group id: " + peer_group.id.out) outgoing_queue_cv.wait (outgoing_queue_lock) end if outgoing_queue.count > 0 then should_stop := False outgoing_queue.start else should_stop := True -- we should stop end until should_stop or outgoing_queue.after loop -- processing next message in queue: address ?= outgoing_queue.item.item (1) message ?= outgoing_queue.item.item (2) connection_trials ?= outgoing_queue.item.item (3) result_handler ?= outgoing_queue.item.item (4) outgoing_queue_lock.unlock connection := running_connection (address.out_without_service) -- create new connection if necessary if connection = Void and connection_trials < max_connection_trials then connection_trials := increment_connection_trials logger.debugging ("tcp_transport: Queue manager creating new connection for address: " + address.out + ", connection trials: " + connection_trials.out + ", group id: " + peer_group.id.out) create connection.make_new (Current, address) yield elseif connection /= Void then logger.debugging ("tcp_transport: Queue manager reusing open connection for address: " + address.out + ", connection trials: " + connection_trials.out + ", group id: " + peer_group.id.out) end if connection /= Void then -- wait for ready connection from wait_step := 0 until connection.is_ready or connection.is_closed or wait_step >= wait_for_ready_connection_max_steps loop sleep (wait_for_ready_connection_interval) wait_step := wait_step + 1 end if connection.is_ready then -- connection is ready if message /= Void then -- send message (blocking) logger.debugging ("tcp_transport: Connection ready for sending queued message, address: " + address.out + ", group id: " + peer_group.id.out) connection.send_message (message) if connection.last_send_failed then if connection.is_developer_exception then logger.error ("tcp_transport: Error sending message to address: " + address.out + ", error: " + connection.developer_exception_name + ", group id: " + peer_group.id.out) else logger.error ("tcp_transport: Error sending message to address: " + address.out + ", error: ?, group id: " + peer_group.id.out) end else -- message successfully sent call_result_handler (result_handler, address, True) outgoing_queue_remove logger.debugging ("tcp_transport: Removed message from outgoing queue, address: " + address.out + ", group id: " + peer_group.id.out) end else -- simple connection open request successful, just call handler call_result_handler (result_handler, address, True) outgoing_queue_remove logger.debugging ("tcp_transport: Connection ready, removed request from outgoing queue, address: " + address.out + ", group id: " + peer_group.id.out) end end else -- still no success after `max_connection_trials', discard message/request! call_result_handler (result_handler, address, False) outgoing_queue_remove logger.error ("tcp_transport: Discarding message/request to address (timed out): " + address.out + ", group id: " + peer_group.id.out) end outgoing_queue_lock.lock if not outgoing_queue.after then outgoing_queue.forth end end outgoing_queue_lock.unlock end logger.debugging ("tcp_transport: Terminating queue manager, group id: " + peer_group.id.out) rescue outgoing_queue_lock.unlock end configuration: P2P_TCP_CONFIGURATION server: P2P_TCP_SERVER qm_status: INTEGER connections: HASH_TABLE [P2P_TCP_CONNECTION, STRING] connections_lock: READ_WRITE_LOCK outgoing_queue: LINKED_LIST [TUPLE [P2P_ENDPOINT_ADDRESS, P2P_MESSAGE, INTEGER, PROCEDURE [ANY, TUPLE [P2P_ENDPOINT_ADDRESS, BOOLEAN]]]] outgoing_queue_lock: MUTEX outgoing_queue_cv: CONDITION_VARIABLE Queue_manager_running: INTEGER is 0 Queue_manager_stopping: INTEGER is 1 Initial_connections_size: INTEGER is 10 Wait_for_ready_connection_interval: INTEGER_64 is 50000000 -- 50ms Wait_for_ready_connection_max_steps: INTEGER is 10 -- 500ms Max_connection_trials: INTEGER is 3 Multicast_source_endpoint_address_header_name: STRING is "srcEA" Multicast_jxta_header: STRING is "JXTA" create_server is -- Start TCP server and configure local address require Configuration_valid: configuration /= Void and configuration.is_valid local local_ip: STRING ra: P2P_ROUTE_ADVERTISEMENT apa: P2P_ACCESSPOINT_ADVERTISEMENT do -- create server instance and bind port if configuration.has_interface_address then -- use configured interface local_ip := configuration.interface_address else -- use detected interface local_ip := peer_group.configuration.local_ip end if configuration.has_port then -- use configured port create server.make (Current, local_ip, configuration.port, configuration.port) else -- use a random user port create server.make (Current, local_ip, 1024, {NATURAL_16}.max_value) end -- create local endpoint address create local_address.make (name, local_ip + ":" + server.port.out) -- create/update peer's route advertisement ra := endpoint_service.route_advertisement apa := endpoint_service.accesspoint_advertisement if ra /= Void and apa /= Void then apa.remove_addresses_with_protocol (name) apa.add_address (local_address) ra.renew_document peer_group.peer_advertisement.renew_document else -- create new route/accesspoint advertisement create apa.make apa.add_address (local_address) create ra.make (apa) peer_group.peer_advertisement.replace_service_parameter (endpoint_mcid, ra) end -- save peer advertisement peer_group.cache_manager.store_peer_advertisement (peer_group.peer_advertisement) if peer_group.discovery_service /= Void then -- publish our route advertisement; set the peer id ra := ra.twin ra.set_destination_peer_id (peer_group.peer_id) peer_group.discovery_service.publish_advertisement_locally (ra) end ensure Server_set: server /= Void Apa_set: endpoint_service.accesspoint_advertisement /= Void end outgoing_queue_extend (address: P2P_ENDPOINT_ADDRESS; message: P2P_MESSAGE; result_handler: PROCEDURE [ANY, TUPLE [P2P_ENDPOINT_ADDRESS, BOOLEAN]]) is -- add message to queue locking the queue require Address_valid: address /= Void do outgoing_queue_lock.lock outgoing_queue.extend ([address, message, 0, result_handler]) outgoing_queue_lock.unlock outgoing_queue_cv.signal -- notify queue manager rescue outgoing_queue_lock.unlock end increment_connection_trials: INTEGER is -- increment connection trials counter for current message do outgoing_queue_lock.lock Result ?= outgoing_queue.item.item (3) Result := Result + 1 outgoing_queue.item.item (3) := Result outgoing_queue_lock.unlock rescue outgoing_queue_lock.unlock end outgoing_queue_remove is -- remove current message from queue locking the queue do outgoing_queue_lock.lock outgoing_queue.remove outgoing_queue_lock.unlock rescue outgoing_queue_lock.unlock end has_running_connection (address: STRING): BOOLEAN is -- Has currently registered connection for this `address'? require Status_started: module_status = start_ok Address_existent: address /= Void do connections_lock.acquire_read_lock Result := connections.has (address) connections_lock.release_reader_lock rescue connections_lock.release_reader_lock end call_result_handler (handler: PROCEDURE [ANY, TUPLE [P2P_ENDPOINT_ADDRESS, BOOLEAN]]; address: P2P_ENDPOINT_ADDRESS; success: BOOLEAN) is -- Call result handler require Address_set: address /= Void local failed: BOOLEAN do if not failed and handler /= Void then handler.call ([address, success]) end rescue -- Catch all exceptions from handler failed := True logger.debugging ("tcp_transport: Exception while handling event result, trace:%N" + exception_trace) retry end end