diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/common.rs b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/common.rs index 1f9e8ac954c..9d82f800825 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/common.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/common.rs @@ -31,15 +31,16 @@ impl Distribution for Standard { #[rustfmt::skip] pub struct OckamPortalPacket { #[n(0)] pub connection_identifier: ConnectionIdentifier, - #[n(1)] pub sequence: u32, - #[n(2)] pub acknowledgement: u32, - #[n(3)] pub data_offset: u8, - #[n(4)] pub reserved: u8, - #[n(5)] pub flags: u8, - #[n(6)] pub window: u16, - #[n(7)] pub urgent_ptr: u16, - #[n(8)] pub options: Vec, - #[n(9)] pub payload: Vec, + #[n(1)] pub route_index: u32, + #[n(2)] pub sequence: u32, + #[n(3)] pub acknowledgement: u32, + #[n(4)] pub data_offset: u8, + #[n(5)] pub reserved: u8, + #[n(6)] pub flags: u8, + #[n(7)] pub window: u16, + #[n(8)] pub urgent_ptr: u16, + #[n(9)] pub options: Vec, + #[n(10)] pub payload: Vec, } #[allow(missing_docs)] @@ -66,9 +67,11 @@ impl OckamPortalPacket { pub fn from_raw_socket_packet( value: RawSocketPacket, connection_identifier: ConnectionIdentifier, + route_index: u32, ) -> Self { Self { connection_identifier, + route_index, sequence: value.sequence, acknowledgement: value.acknowledgement, data_offset: value.data_offset, diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/internal_processor.rs b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/internal_processor.rs index 382104a63ce..a902f6ef061 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/internal_processor.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/internal_processor.rs @@ -68,10 +68,11 @@ impl Processor for InternalProcessor { }; match &self.mode { + // Client -> Inlet packet PortalMode::Inlet { inlet } => { let inlet_shared_state = inlet.inlet_shared_state.read().unwrap().clone(); - if inlet_shared_state.is_paused { + if inlet_shared_state.is_paused() { return Ok(true); } @@ -114,19 +115,21 @@ impl Processor for InternalProcessor { let portal_packet = OckamPortalPacket::from_raw_socket_packet( parsed_packet.packet, connection.connection_identifier.clone(), + inlet_shared_state.route_index(), ); trace!("Inlet Processor: Got packet, forwarding to the other side"); ctx.forward_from_address( LocalMessage::new() - .with_onward_route(inlet_shared_state.route) + .with_onward_route(inlet_shared_state.route().clone()) .with_return_route(route![inlet.remote_worker_address.clone()]) .with_payload(minicbor::to_vec(portal_packet)?), ctx.address(), ) .await?; } + // Server -> Outlet packet PortalMode::Outlet { outlet } => { let connection = match outlet.get_connection_internal(parsed_packet.packet.destination) { @@ -149,13 +152,16 @@ impl Processor for InternalProcessor { let portal_packet = OckamPortalPacket::from_raw_socket_packet( parsed_packet.packet, connection.connection_identifier.clone(), + 0, // Doesn't matter for the outlet, as outlet can't update the route ); trace!("Outlet Processor: Got packet, forwarding to the other side"); + let return_route = connection.return_route.read().unwrap().route.clone(); + ctx.forward_from_address( LocalMessage::new() - .with_onward_route(connection.return_route.clone()) + .with_onward_route(return_route) .with_return_route(route![outlet.remote_worker_address.clone()]) .with_payload(minicbor::to_vec(portal_packet)?), ctx.address(), diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/portals.rs b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/portals.rs index 68b541ff990..121be5bc278 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/portals.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/portals.rs @@ -104,10 +104,10 @@ impl TcpTransport { let write_handle = self.start_raw_socket_processor_if_needed().await?; - let inlet_shared_state = Arc::new(RwLock::new(InletSharedState { - route: outlet_route.clone(), - is_paused: false, - })); + let inlet_shared_state = Arc::new(RwLock::new(InletSharedState::new( + false, + outlet_route.clone(), + ))); let remote_worker_address = Address::random_tagged("Ebpf.RemoteWorker.Inlet"); let internal_worker_address = Address::random_tagged("Ebpf.InternalWorker.Inlet"); diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/registry/outlet.rs b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/registry/outlet.rs index 7aedd9a6f83..ffdb1b2ba6c 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/registry/outlet.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/registry/outlet.rs @@ -129,6 +129,24 @@ struct OutletConnectionKey { connection_identifier: ConnectionIdentifier, } +/// Updatable return_route to the Inlet (updatable by the Inlet) +pub struct OutletConnectionReturnRoute { + /// Route + pub route: Route, + /// Number of the route. Starts from 0 and Inlet updates it each time. + pub route_index: u32, +} + +impl OutletConnectionReturnRoute { + /// Constructor. Route index starts with 0 + pub fn new(route: Route) -> Self { + Self { + route, + route_index: 0, + } + } +} + /// Outlet mapping pub struct OutletConnection { /// Identity Identifier of the other side @@ -138,7 +156,7 @@ pub struct OutletConnection { /// Assigned port on our machine for a specific connection pub assigned_port: Port, /// Route to the other side PortalWorker - pub return_route: Route, // TODO: Update it if the inlet updates the route + pub return_route: Arc>, /// To hold the port pub _tcp_listener: Arc, } diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/remote_worker.rs b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/remote_worker.rs index 19445e5b37e..14812c6c762 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/remote_worker.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/remote_worker.rs @@ -1,6 +1,6 @@ use crate::ebpf_portal::{ - Inlet, InletConnection, OckamPortalPacket, Outlet, OutletConnection, Port, - TcpTransportEbpfSupport, + Inlet, InletConnection, OckamPortalPacket, Outlet, OutletConnection, + OutletConnectionReturnRoute, Port, TcpTransportEbpfSupport, }; use log::{debug, trace}; use ockam_core::{async_trait, Any, Result, Route, Routed, Worker}; @@ -9,7 +9,7 @@ use ockam_transport_core::TransportError; use pnet::packet::tcp::MutableTcpPacket; use pnet::transport::TransportSender; use std::net::{IpAddr, Ipv4Addr}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, RwLock}; use tokio::net::TcpListener; use tracing::warn; @@ -86,7 +86,7 @@ impl RemoteWorker { connection_identifier: msg.connection_identifier.clone(), assigned_port, _tcp_listener: Arc::new(tcp_listener), - return_route, + return_route: Arc::new(RwLock::new(OutletConnectionReturnRoute::new(return_route))), }); outlet.add_connection(connection.clone()); @@ -176,7 +176,17 @@ impl RemoteWorker { outlet: &Outlet, connection: &OutletConnection, msg: OckamPortalPacket, + return_route: Route, ) -> Result<()> { + { + let mut connection_return_route = connection.return_route.write().unwrap(); + + if connection_return_route.route_index < msg.route_index { + connection_return_route.route_index = msg.route_index; + connection_return_route.route = return_route; + } + } + self.handle( msg, connection.assigned_port, @@ -205,6 +215,7 @@ impl Worker for RemoteWorker { let identifier = None; // FIXME: Should be the Identifier of the other side match &self.mode { + // Outlet -> Inlet packet PortalMode::Inlet { inlet } => { match inlet.get_connection_external(identifier, msg.connection_identifier.clone()) { Some(connection) => { @@ -217,21 +228,26 @@ impl Worker for RemoteWorker { return Ok(()); } + // Inlet -> Outlet packet PortalMode::Outlet { outlet } => { if let Some(connection) = outlet .get_connection_external(identifier.clone(), msg.connection_identifier.clone()) { - self.handle_outlet(outlet, &connection, msg).await?; + self.handle_outlet(outlet, &connection, msg, return_route) + .await?; return Ok(()); } - if msg.flags == 2 { + // Checks that SYN flag is set, and every other flag is not set. + const SYN: u8 = 2; + if msg.flags == SYN { let connection = self - .new_outlet_connection(outlet, identifier, &msg, return_route) + .new_outlet_connection(outlet, identifier, &msg, return_route.clone()) .await?; - self.handle_outlet(outlet, &connection, msg).await?; + self.handle_outlet(outlet, &connection, msg, return_route) + .await?; return Ok(()); } diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/inlet_listener.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/inlet_listener.rs index 21532a54a48..bbdb62fe9e3 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/inlet_listener.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/inlet_listener.rs @@ -22,8 +22,44 @@ use tracing::{debug, error, instrument}; /// from outside the worker: update the route to the outlet or pause it. #[derive(Debug, Clone)] pub struct InletSharedState { - pub route: Route, - pub is_paused: bool, + route: Route, + is_paused: bool, + // Starts with 0 and increments each time when inlet updates the route to the outlet + // (e.g. when reconnecting), this will allow outlet to figure out what is the most recent + // return_route even if messages arrive out-of-order + route_index: u32, +} + +impl InletSharedState { + pub fn route(&self) -> &Route { + &self.route + } + + pub fn update_route(&mut self, new_route: Route) { + self.route = new_route; + // Overflow here is very unlikely... + self.route_index += 1; + } + + pub fn is_paused(&self) -> bool { + self.is_paused + } + + pub fn set_is_paused(&mut self, is_paused: bool) { + self.is_paused = is_paused; + } + + pub fn route_index(&self) -> u32 { + self.route_index + } + + pub fn new(is_paused: bool, route: Route) -> Self { + Self { + route, + is_paused, + route_index: 0, + } + } } /// A TCP Portal Inlet listen processor @@ -76,6 +112,7 @@ impl TcpInletListenProcessor { let inlet_shared_state = InletSharedState { route: outlet_listener_route, is_paused: options.is_paused, + route_index: 0, }; let inlet_shared_state = Arc::new(RwLock::new(inlet_shared_state)); let processor = Self::new(registry, inner, inlet_shared_state.clone(), options); diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_message.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_message.rs index 000ac8846eb..e8efcd1314f 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_message.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_message.rs @@ -14,6 +14,9 @@ pub enum PortalMessage<'de> { /// or from the target to the Inlet was dropped Disconnect, /// Message with binary payload and packet counter + // TODO: Add route_index. May not be as important as for eBPF portals, as regular portals + // require reliable channel anyways. And if PortalMessage is sent over a channel that + // guarantees ordering, we don't need route_index Payload(&'de [u8], Option), } diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/transport/portals.rs b/implementations/rust/ockam/ockam_transport_tcp/src/transport/portals.rs index 770887b710e..b54e23742fe 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/transport/portals.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/transport/portals.rs @@ -222,9 +222,9 @@ impl TcpInlet { ) -> Result<()> { let mut inlet_shared_state = self.inlet_shared_state.write().unwrap(); - let new_route = Self::build_new_full_route(new_route, &inlet_shared_state.route)?; + let new_route = Self::build_new_full_route(new_route, inlet_shared_state.route())?; let next = new_route.next()?.clone(); - inlet_shared_state.route = new_route; + inlet_shared_state.update_route(new_route); self.update_flow_controls(flow_controls, next); @@ -235,7 +235,7 @@ impl TcpInlet { pub fn pause(&self) { let mut inlet_shared_state = self.inlet_shared_state.write().unwrap(); - inlet_shared_state.is_paused = true; + inlet_shared_state.set_is_paused(true); } fn update_flow_controls(&self, flow_controls: &FlowControls, next: Address) { @@ -257,12 +257,12 @@ impl TcpInlet { pub fn unpause(&self, flow_controls: &FlowControls, new_route: Route) -> Result<()> { let mut inlet_shared_state = self.inlet_shared_state.write().unwrap(); - let new_route = Self::build_new_full_route(new_route, &inlet_shared_state.route)?; + let new_route = Self::build_new_full_route(new_route, inlet_shared_state.route())?; let next = new_route.next()?.clone(); - inlet_shared_state.route = - Self::build_new_full_route(new_route, &inlet_shared_state.route)?; - inlet_shared_state.is_paused = false; + inlet_shared_state.update_route(new_route); + inlet_shared_state.set_is_paused(false); + self.update_flow_controls(flow_controls, next); Ok(())