indexing description: "Handles a TCP connection to a peer" license: "MIT license (see ../../../license.txt)" author: "Beat Strasser " date: "$Date$" revision: "$Revision$" class P2P_TCP_CONNECTION inherit EXCEPTIONS P2P_SOCKET_EXTENSIONS rename make as esocket_make, make_non_blocking as esocket_make_non_blocking export {NONE} all end P2P_CREATORS_SHARED P2P_EXCEPTION_LOG create make_from_socket, make_new, ping feature {NONE} -- Initialization make_from_socket (transport: like mytransport; a_socket: like socket) is -- Create connection from already opened socket (incoming connection) require Transport_existent: transport /= Void Socket_valid: a_socket /= Void and a_socket.exists do mytransport := transport socket := a_socket set_socket_options esocket_make_non_blocking (a_socket, socket_read_timeout_short) set_checking_restraint (agent is_closed) local_no_propagate := False create socket_lock.make local_address := mytransport.local_address create remote_address.make (mytransport.name, socket.peer_address.host_address.host_address + ":" + socket.peer_address.port.out) mytransport.extend_connection (Current, remote_address.out_without_service) status := status_opened -- Enqueue job for connection lifecycle mytransport.logger.debugging ("tcp_connection: Enqueue connection lifecycle handling job for open connection, address: " + remote_address.out_without_service + ", group id: " + mytransport.peer_group.id.out) mytransport.peer_group.job_queue.force_with_tag (agent handle_lifecycle, mytransport.job_queue_tag) end make_new (transport: like mytransport; address: P2P_ENDPOINT_ADDRESS) is -- Create new connection to `address' require Transport_existent: transport /= Void Address_valid: address /= Void do mytransport := transport local_address := mytransport.local_address remote_address := address local_no_propagate := False -- doesn't matter where we get propagation messages create socket_lock.make status := status_opening mytransport.extend_connection (Current, remote_address.out_without_service) -- Enqueue job for connection lifecycle mytransport.logger.debugging ("tcp_connection: Enqueue connection lifecycle handling job for new connection, address: " + remote_address.out_without_service + ", group id: " + mytransport.peer_group.id.out) mytransport.peer_group.job_queue.force_with_tag (agent handle_lifecycle, mytransport.job_queue_tag) end ping (transport: like mytransport; address: P2P_ENDPOINT_ADDRESS) is -- Ping `address' by creating connection, do a simple handshake and close the connection. -- Set `status' to `status_closed' if ping ok or else to `status_connect_failed' on any error. require Transport_existent: transport /= Void Address_valid: address /= Void local failed: BOOLEAN do status := status_connect_failed if not failed then mytransport := transport local_address := mytransport.local_address remote_address := address local_no_propagate := True -- create socket and try to open connection create {NETWORK_STREAM_SOCKET} socket.make_client_by_port (remote_address.ip_port, remote_address.ip_host) set_socket_options esocket_make_non_blocking (socket, socket_read_timeout_short) set_checking_restraint (agent is_closed) -- TODO change to non blocking as soon as eiffel net supports non blocking calls socket.set_blocking socket.connect socket.set_non_blocking if connect_successful then mytransport.logger.debugging ("tcp_connection: Handshaking with remote peer, address: " + remote_address.out_without_service + ", group id: " + mytransport.peer_group.id.out) -- send welcome message send_welcome_message -- wait for welcome message (blocking) read_welcome_message status := status_closed else mytransport.logger.error ("tcp_connection: Error opening connection, address: " + remote_address.out_without_service + ", error: " + socket.error + ", group id: " + mytransport.peer_group.id.out) end else transport.logger.debugging ("tcp_connection: Error pinging peer, address: " + remote_address.out_without_service + ", group id: " + mytransport.peer_group.id.out) end -- close connection socket.cleanup ensure Status_set: status = status_closed or status = status_connect_failed rescue failed := True log_exceptions retry end feature -- Access remote_address: P2P_ENDPOINT_ADDRESS remote_no_propagate: BOOLEAN remote_peer_id: P2P_PEER_ID local_address: P2P_ENDPOINT_ADDRESS local_no_propagate: BOOLEAN feature -- Status report status: INTEGER Status_opening: INTEGER is 1 -- When tcp connection gets created Status_opened: INTEGER is 2 -- When tcp connection has been created, but welcome messages aren't sent/received yet Status_welcome_send_failed: INTEGER is 3 -- When tcp connection has been created, but sending of welcome message failed Status_welcome_read_failed: INTEGER is 4 -- When tcp connection has been created, but reading of welcome message failed Status_ready: INTEGER is 5 -- When tcp connection is ready to send/receive normal messages Status_closed: INTEGER is 6 -- When tcp connection is closed again Status_connect_failed: INTEGER is 7 -- When tcp connection can't be established is_ready: BOOLEAN is -- Is connection ready for sending/receiving messages? do Result := status = status_ready ensure Result_set: Result = (status = status_ready) end is_closed: BOOLEAN is -- Is connection closed? do Result := status = status_closed ensure Result_set: Result = (status = status_closed) end last_send_failed: BOOLEAN -- Has last sending of message failed? feature -- Basic operations close is -- Close current connection do if status /= status_closed then mytransport.logger.debugging ("tcp_connection: Requesting connection close, address: " + remote_address.out_without_service + ", group id: " + mytransport.peer_group.id.out) status := status_closed end end send_message (message: P2P_MESSAGE) is -- Send message require Status_ready: is_ready local wire_body: P2P_WIRE_MESSAGE wire_header: P2P_TCP_HEADER error: STRING do if error = Void then mytransport.logger.debugging ("tcp_connection: Sending message, address: " + remote_address.out_without_service + ", group id: " + mytransport.peer_group.id.out) -- body wire_body := wire_message_creator.create_from_message (wire_message_creator.mimetype_binary, message) wire_body.send_to_buffer (mytransport.wire_message_version) -- required headers create wire_header.make wire_header.set_content_type (mytransport.wire_message_format_mimetype) wire_header.set_content_length (wire_body.buffer_length) wire_header.send_to_buffer -- send write_chars (wire_header.buffer) write_chars (wire_body.buffer) socket_lock.lock send_buffer socket_lock.unlock last_send_failed := False mytransport.logger.debugging ("tcp_connection: Message sent, address: " + remote_address.out_without_service + ", group id: " + mytransport.peer_group.id.out) else last_send_failed := True mytransport.logger.debugging ("tcp_connection: Error sending message, address: " + remote_address.out_without_service + ", error: " + error + ", group id: " + mytransport.peer_group.id.out) end rescue log_exceptions error := socket.error socket_lock.unlock close -- close connection on any failure retry end feature {NONE} -- Implementation handle_lifecycle is -- Thread main routine do mytransport.logger.debugging ("tcp_connection: Executing connection lifecycle job, address: " + remote_address.out_without_service + ", group id: " + mytransport.peer_group.id.out) if status = status_opening then -- open new connection mytransport.logger.debugging ("tcp_connection: Opening connection, address: " + remote_address.out_without_service + ", group id: " + mytransport.peer_group.id.out) socket_lock.lock create {NETWORK_STREAM_SOCKET} socket.make_client_by_port (remote_address.ip_port, remote_address.ip_host) set_socket_options esocket_make_non_blocking (socket, socket_read_timeout_short) set_checking_restraint (agent is_closed) -- TODO change to non blocking as soon as eiffel net supports non blocking calls socket.set_blocking socket.connect socket.set_non_blocking if connect_successful then status := status_opened mytransport.logger.debugging ("tcp_connection: Connection opened, address: " + remote_address.out_without_service + ", group id: " + mytransport.peer_group.id.out) else status := status_connect_failed mytransport.logger.debugging ("tcp_connection: Error opening connection, address: " + remote_address.out_without_service + ", error: " + socket.error + ", group id: " + mytransport.peer_group.id.out) end socket_lock.unlock end if status = status_opened then mytransport.logger.debugging ("tcp_connection: Handshaking with remote peer, address: " + remote_address.out_without_service + ", group id: " + mytransport.peer_group.id.out) socket_lock.lock -- send welcome message send_welcome_message -- wait for welcome message (blocking) read_welcome_message status := status_ready mytransport.logger.debugging ("tcp_connection: Handshake complete, address: " + remote_address.out_without_service + ", group id: " + mytransport.peer_group.id.out) socket_lock.unlock end if status = status_ready then -- wait for incoming data and process until status failure from until status /= status_ready loop -- wait for input until timed out (see `socket_read_timeout_long') mytransport.logger.debugging ("tcp_connection: Waiting for new message, address: " + remote_address.out_without_service + ", group id: " + mytransport.peer_group.id.out) set_timeout (socket_read_timeout_long) if ready_for_reading_or_failure then read_incoming_message else mytransport.logger.debugging ("tcp_connection: Timed out waiting for new message, now closing, address: " + remote_address.out_without_service + ", group id: " + mytransport.peer_group.id.out) status := status_closed -- timed out, no input end end end -- close connection mytransport.logger.debugging ("tcp_connection: Cleaning up connection, address: " + remote_address.out_without_service + ", group id: " + mytransport.peer_group.id.out) status := status_closed mytransport.prune_connection (Current, remote_address.out_without_service) if socket_lock.is_set then socket_lock.lock if socket /= Void then socket.cleanup end socket_lock.unlock end -- stop lifecycle handling job now mytransport.logger.debugging ("tcp_connection: Connection closed, finishing connection lifecycle job, address: " + remote_address.out_without_service + ", group id: " + mytransport.peer_group.id.out) rescue log_exceptions status := status_closed if socket_lock.is_set then socket_lock.unlock end retry end mytransport: P2P_TCP_TRANSPORT socket_lock: MUTEX Socket_read_timeout_long: INTEGER is 300 -- 5 minutes Socket_read_timeout_short: INTEGER is 10 -- 10 seconds Socket_receive_buffer_size: INTEGER is 65536 -- 64 KBytes Socket_send_buffer_size: INTEGER is 65536 -- 64 KBytes Welcome_greeting: STRING is "JXTAHELLO" Welcome_space: INTEGER_8 is 0x20 Welcome_propagate: CHARACTER_8 is '0' Welcome_no_propagate: CHARACTER_8 is '1' Welcome_version: STRING is "1.1" Welcome_cr: INTEGER_8 is 0x0d Welcome_lf: INTEGER_8 is 0x0a Welcome_message_length_guess: INTEGER is 512 Welcome_message_length_max: INTEGER is 4096 Wire_message_format_mimetype: STRING is "application/x-jxta-msg" Wire_message_version: INTEGER is 1 set_socket_options is -- Set socket options require Socket_existent: socket /= Void do end read_welcome_message is -- Read welcome message from wire local welcome: STRING parts: LIST [STRING] our_addr, other_addr: P2P_ENDPOINT_ADDRESS do -- read line (until [CR]LF) create welcome.make (welcome_message_length_guess) read_byte_1 from until last_integer_8.to_character_8 = '%N' or welcome.count > welcome_message_length_max loop welcome.extend (last_integer_8.to_character_8) read_byte_1 end welcome.prune_all_trailing ('%R') if welcome.count > welcome_message_length_max then raise ("No welcome message found") end -- parse line parts := welcome.split (welcome_space.to_character_8) if parts.count /= 6 then raise ("No valid welcome message found") end -- greeting if not parts[1].is_equal (welcome_greeting) then raise ("Reading welcome message greeting failed") end -- welcome_destination create our_addr.make_from_uri (parts[2]) if not our_addr.is_valid then raise ("Welcome destination endpoint address invalid") end local_address := our_addr -- welcome_source create other_addr.make_from_uri (parts[3]) if not other_addr.is_valid then raise ("Welcome destination endpoint address invalid") end if not remote_address.is_equal_without_service (other_addr) then mytransport.extend_connection (Current, other_addr.out_without_service) mytransport.prune_connection (Current, remote_address.out_without_service) remote_address := other_addr end -- welcome_peer create remote_peer_id.make_from_urn (parts[4]) if not remote_peer_id.is_valid then raise ("Remote peer id invalid") end -- noprop remote_no_propagate := parts[5].item (1) = welcome_no_propagate -- version if not parts[6].is_equal (welcome_version) then raise ("Invalid message version") end end send_welcome_message is -- Send welcome message to wire do -- greeting space write_chars (welcome_greeting) write_byte_1 (welcome_space) -- welcome_destination space write_chars (remote_address.out_without_service) write_byte_1 (welcome_space) -- welcome_source space write_chars (local_address.out_without_service) write_byte_1 (welcome_space) -- welcome_peer space write_chars (mytransport.peer_group.peer_id.out) write_byte_1 (welcome_space) -- noprop space if local_no_propagate then write_chars (welcome_no_propagate.out) else write_chars (welcome_propagate.out) end write_byte_1 (welcome_space) -- version crlf write_chars (welcome_version) write_byte_1 (welcome_cr) write_byte_1 (welcome_lf) send_buffer end read_incoming_message is -- Read received data and process message local wire_header: P2P_TCP_HEADER wire_body_parser: P2P_WIRE_MESSAGE do -- read header socket_lock.lock if socket.exists then create wire_header.make_from_wire (socket, socket_read_timeout_short, agent is_closed) if wire_header.has_failed or not wire_header.is_valid or wire_header.content_length <= 0 or not wire_header.content_type.is_equal (mytransport.wire_message_format_mimetype) then -- close connection due to socket troubles, unrecognized content type or invalid content length socket_lock.unlock close else mytransport.logger.debugging ("tcp_connection: Reading new incoming message, address: " + remote_address.out_without_service + ", group id: " + mytransport.peer_group.id.out) -- read body wire_body_parser := wire_message_creator.create_from_wire (wire_message_creator.mimetype_binary, socket, socket_read_timeout_short, agent is_closed) socket_lock.unlock if wire_body_parser.has_failed then -- close connection on socket troubles or parsing failures mytransport.logger.debugging ("tcp_connection: Error parsing read message, address: " + remote_address.out_without_service + ", group id: " + mytransport.peer_group.id.out) close else -- send message to upper layers mytransport.process_incoming_message (wire_body_parser.message) end end else status := status_closed socket_lock.unlock end end end