diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/session_replacer.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/session_replacer.rs index 72f4525b3a8..226424ccd2a 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/session_replacer.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/session_replacer.rs @@ -158,10 +158,9 @@ impl InletSessionReplacer { // Finally, attempt to create/update inlet using the new route let inlet_address = match self.inlet.clone() { Some(inlet) => { - inlet.unpause( - self.context.flow_controls(), - normalized_stripped_route.clone(), - )?; + inlet + .unpause(&self.context, normalized_stripped_route.clone()) + .await?; inlet.processor_address().cloned() } @@ -216,7 +215,7 @@ impl InletSessionReplacer { async fn pause_inlet(&mut self) { if let Some(inlet) = self.inlet.as_mut() { - inlet.pause(); + inlet.pause().await; } } @@ -387,7 +386,7 @@ impl AdditionalSessionReplacer for InletSessionReplacer { additional_sc.update_remote_node_route(route![puncture.sender_address()])?; let new_route = route![additional_sc.clone()]; - inlet.unpause(self.context.flow_controls(), new_route.clone())?; + inlet.unpause(&self.context, new_route.clone()).await?; self.additional_route = Some(new_route.clone()); @@ -401,14 +400,14 @@ impl AdditionalSessionReplacer for InletSessionReplacer { match self.main_route.as_ref() { Some(main_route) if enable_fallback => { // Switch Inlet to the main route - let res = inlet.unpause(self.context.flow_controls(), main_route.clone()); + let res = inlet.unpause(&self.context, main_route.clone()).await; if let Some(err) = res.err() { error!("Error switching Inlet to the main route {}", err); } } _ => { - inlet.pause(); + inlet.pause().await; } } } diff --git a/implementations/rust/ockam/ockam_identity/src/models/identifiers.rs b/implementations/rust/ockam/ockam_identity/src/models/identifiers.rs index dbcf37b99e5..5f8df8d8e9b 100644 --- a/implementations/rust/ockam/ockam_identity/src/models/identifiers.rs +++ b/implementations/rust/ockam/ockam_identity/src/models/identifiers.rs @@ -1,7 +1,8 @@ -use crate::alloc::string::ToString; use core::fmt::{Debug, Formatter}; use minicbor::{CborLen, Decode, Encode}; +use crate::alloc::string::ToString; + /// Identifier length pub const IDENTIFIER_LEN: usize = 32; diff --git a/implementations/rust/ockam/ockam_transport_core/src/error.rs b/implementations/rust/ockam/ockam_transport_core/src/error.rs index ea1f1bcf459..9f9ee4a2c3e 100644 --- a/implementations/rust/ockam/ockam_transport_core/src/error.rs +++ b/implementations/rust/ockam/ockam_transport_core/src/error.rs @@ -80,6 +80,8 @@ pub enum TransportError { ReadCaps(String), /// eBPF prerequisites check failed EbpfPrerequisitesCheckFailed(String), + /// The Identifier of the other side of the portal has changed when updating the route + IdentifierChanged, } impl ockam_core::compat::error::Error for TransportError {} @@ -127,6 +129,10 @@ impl core::fmt::Display for TransportError { Self::EbpfPrerequisitesCheckFailed(e) => { write!(f, "eBPF prerequisites check failed: {}", e) } + Self::IdentifierChanged => write!( + f, + "identifier of the other side of the portal has changed when updating the route" + ), } } } @@ -167,6 +173,7 @@ impl From for Error { | RemovingOutletPort(_) => Kind::Io, ReadCaps(_) => Kind::Io, EbpfPrerequisitesCheckFailed(_) => Kind::Misuse, + IdentifierChanged => Kind::Conflict, }; Error::new(Origin::Transport, kind, err) diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/internal_processor.rs b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/internal_processor.rs index a902f6ef061..a1fc5b20812 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/internal_processor.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/internal_processor.rs @@ -2,8 +2,9 @@ use crate::ebpf_portal::{ Inlet, InletConnection, OckamPortalPacket, Outlet, ParsedRawSocketPacket, PortalMode, }; use log::{debug, trace, warn}; -use ockam_core::{async_trait, route, LocalMessage, Processor, Result}; +use ockam_core::{async_trait, route, LocalInfoIdentifier, LocalMessage, Processor, Result}; use ockam_node::Context; +use ockam_transport_core::TransportError; use rand::random; use std::net::Ipv4Addr; use std::sync::Arc; @@ -36,15 +37,14 @@ impl InternalProcessor { async fn new_inlet_connection( inlet: &Inlet, + their_identifier: Option, src_ip: Ipv4Addr, parsed_packet: &ParsedRawSocketPacket, ) -> Result> { // TODO: eBPF Remove connection eventually - // TODO: Make sure the connection can't be spoofed by someone having access to that Outlet - let connection = Arc::new(InletConnection { - identifier: None, + their_identifier, connection_identifier: random(), inlet_ip: parsed_packet.destination_ip, client_ip: src_ip, @@ -70,7 +70,7 @@ impl Processor for InternalProcessor { match &self.mode { // Client -> Inlet packet PortalMode::Inlet { inlet } => { - let inlet_shared_state = inlet.inlet_shared_state.read().unwrap().clone(); + let inlet_shared_state = inlet.inlet_shared_state.read().await.clone(); if inlet_shared_state.is_paused() { return Ok(true); @@ -86,6 +86,11 @@ impl Processor for InternalProcessor { parsed_packet.packet.source_ip, parsed_packet.packet.source ); + + if connection.their_identifier != inlet_shared_state.their_identifier() { + return Err(TransportError::IdentifierChanged)?; + } + connection } None => { @@ -105,6 +110,7 @@ impl Processor for InternalProcessor { ); Self::new_inlet_connection( inlet, + inlet_shared_state.their_identifier(), parsed_packet.packet.source_ip, &parsed_packet, ) @@ -152,7 +158,7 @@ impl Processor for InternalProcessor { let portal_packet = OckamPortalPacket::from_raw_socket_packet( parsed_packet.packet, connection.connection_identifier.clone(), - 0, // Doesn't matter for the outlet, as outlet can't update the route + 0, // Doesn't matter for the outlet, as outlets can't update the route ); trace!("Outlet Processor: Got packet, forwarding to the other side"); diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/portals.rs b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/portals.rs index 121be5bc278..5f3158f3a9e 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/portals.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/portals.rs @@ -7,11 +7,11 @@ use core::fmt::Debug; use log::{debug, error}; use nix::unistd::Uid; use ockam_core::{Address, DenyAll, Result, Route}; -use ockam_node::compat::asynchronous::resolve_peer; +use ockam_node::compat::asynchronous::{resolve_peer, RwLock}; use ockam_node::{ProcessorBuilder, WorkerBuilder}; use ockam_transport_core::{HostnamePort, TransportError}; use std::net::{IpAddr, SocketAddrV4}; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use tokio::net::TcpListener; use tokio::sync::mpsc::channel; use tracing::instrument; @@ -104,10 +104,9 @@ impl TcpTransport { let write_handle = self.start_raw_socket_processor_if_needed().await?; - let inlet_shared_state = Arc::new(RwLock::new(InletSharedState::new( - false, - outlet_route.clone(), - ))); + let inlet_shared_state = + InletSharedState::create(self.ctx(), outlet_route.clone(), false).await?; + let inlet_shared_state = Arc::new(RwLock::new(inlet_shared_state)); let remote_worker_address = Address::random_tagged("Ebpf.RemoteWorker.Inlet"); let internal_worker_address = Address::random_tagged("Ebpf.InternalWorker.Inlet"); diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/registry/inlet.rs b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/registry/inlet.rs index cc461e9758e..f2831d45e4a 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/registry/inlet.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/registry/inlet.rs @@ -1,16 +1,18 @@ use crate::ebpf_portal::{ConnectionIdentifier, ParsedRawSocketPacket, Port}; use crate::portal::InletSharedState; -use ockam_core::Address; +use ockam_core::compat::sync::Arc; +use ockam_core::compat::sync::RwLock as SyncRwLock; +use ockam_core::{Address, LocalInfoIdentifier}; +use ockam_node::compat::asynchronous::RwLock as AsyncRwLock; use std::collections::HashMap; use std::net::Ipv4Addr; -use std::sync::{Arc, RwLock}; use tokio::net::TcpListener; use tokio::sync::mpsc::Sender; /// Inlet registry #[derive(Default, Clone)] pub(crate) struct InletRegistry { - inlets: Arc>>, + inlets: Arc>>, } impl InletRegistry { @@ -29,7 +31,7 @@ impl InletRegistry { sender: Sender, port: Port, tcp_listener: TcpListener, - inlet_shared_state: Arc>, + inlet_shared_state: Arc>, ) -> Inlet { let mut inlets = self.inlets.write().unwrap(); @@ -69,12 +71,12 @@ pub struct Inlet { /// Port pub port: Port, /// Route to the corresponding Outlet - pub inlet_shared_state: Arc>, + pub inlet_shared_state: Arc>, /// Hold to mark the port as taken pub _tcp_listener: Arc, /// Same map with different key - connections1: Arc>>>, - connections2: Arc>>>, + connections1: Arc>>>, + connections2: Arc>>>, } impl Inlet { @@ -89,7 +91,7 @@ impl Inlet { ); self.connections2.write().unwrap().insert( InletConnectionKey2 { - identifier: connection.identifier.clone(), + their_identifier: connection.their_identifier.clone(), connection_identifier: connection.connection_identifier.clone(), }, connection, @@ -115,14 +117,14 @@ impl Inlet { /// Get mapping pub(crate) fn get_connection_external( &self, - identifier: Option, // Identity + their_identifier: Option, // Identity connection_identifier: ConnectionIdentifier, ) -> Option> { self.connections2 .read() .unwrap() .get(&InletConnectionKey2 { - identifier, + their_identifier, connection_identifier, }) .cloned() @@ -137,14 +139,14 @@ struct InletConnectionKey1 { #[derive(Hash, PartialEq, Eq)] struct InletConnectionKey2 { - identifier: Option, + their_identifier: Option, connection_identifier: ConnectionIdentifier, } /// Inlet Mapping pub struct InletConnection { /// Identity Identifier of the other side - pub identifier: Option, + pub their_identifier: Option, /// Unique connection Identifier pub connection_identifier: ConnectionIdentifier, /// We can listen of multiple IPs diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/registry/outlet.rs b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/registry/outlet.rs index ffdb1b2ba6c..9ebc5d77b91 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/registry/outlet.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/registry/outlet.rs @@ -1,5 +1,5 @@ use crate::ebpf_portal::{ConnectionIdentifier, ParsedRawSocketPacket, Port}; -use ockam_core::{Address, Route}; +use ockam_core::{Address, LocalInfoIdentifier, Route}; use std::collections::HashMap; use std::net::Ipv4Addr; use std::sync::{Arc, RwLock}; @@ -87,7 +87,7 @@ impl Outlet { .insert(connection.assigned_port, connection.clone()); self.connections2.write().unwrap().insert( OutletConnectionKey { - identifier: connection.identifier.clone(), + their_identifier: connection.their_identifier.clone(), connection_identifier: connection.connection_identifier.clone(), }, connection, @@ -109,14 +109,14 @@ impl Outlet { /// Get mapping pub(crate) fn get_connection_external( &self, - identifier: Option, // Identity + their_identifier: Option, // Identity connection_identifier: ConnectionIdentifier, ) -> Option> { self.connections2 .read() .unwrap() .get(&OutletConnectionKey { - identifier, + their_identifier, connection_identifier, }) .cloned() @@ -125,7 +125,7 @@ impl Outlet { #[derive(Hash, PartialEq, Eq)] struct OutletConnectionKey { - identifier: Option, + their_identifier: Option, connection_identifier: ConnectionIdentifier, } @@ -150,7 +150,7 @@ impl OutletConnectionReturnRoute { /// Outlet mapping pub struct OutletConnection { /// Identity Identifier of the other side - pub identifier: Option, + pub their_identifier: Option, /// Unique connection Identifier pub connection_identifier: ConnectionIdentifier, /// Assigned port on our machine for a specific connection diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/remote_worker.rs b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/remote_worker.rs index 14812c6c762..b18d0d5c104 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/remote_worker.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/remote_worker.rs @@ -3,7 +3,9 @@ use crate::ebpf_portal::{ OutletConnectionReturnRoute, Port, TcpTransportEbpfSupport, }; use log::{debug, trace}; -use ockam_core::{async_trait, Any, Result, Route, Routed, Worker}; +use ockam_core::{ + async_trait, Any, LocalInfoIdentifier, Result, Route, Routed, SecureChannelLocalInfo, Worker, +}; use ockam_node::Context; use ockam_transport_core::TransportError; use pnet::packet::tcp::MutableTcpPacket; @@ -65,7 +67,7 @@ impl RemoteWorker { async fn new_outlet_connection( &self, outlet: &Outlet, - identifier: Option, + identifier: Option, msg: &OckamPortalPacket, return_route: Route, ) -> Result> { @@ -82,7 +84,7 @@ impl RemoteWorker { debug!("New TCP connection. Assigned socket {}", local_addr); let connection = Arc::new(OutletConnection { - identifier, + their_identifier: identifier, connection_identifier: msg.connection_identifier.clone(), assigned_port, _tcp_listener: Arc::new(tcp_listener), @@ -207,17 +209,20 @@ impl Worker for RemoteWorker { _ctx: &mut Self::Context, msg: Routed, ) -> Result<()> { + let their_identifier = SecureChannelLocalInfo::find_info(msg.local_message()) + .map(|l| l.their_identifier()) + .ok(); let return_route = msg.return_route(); let payload = msg.into_payload(); let msg: OckamPortalPacket = minicbor::decode(&payload)?; - let identifier = None; // FIXME: Should be the Identifier of the other side - match &self.mode { // Outlet -> Inlet packet PortalMode::Inlet { inlet } => { - match inlet.get_connection_external(identifier, msg.connection_identifier.clone()) { + match inlet + .get_connection_external(their_identifier, msg.connection_identifier.clone()) + { Some(connection) => { self.handle_inlet(inlet, &connection, msg).await?; } @@ -230,9 +235,10 @@ impl Worker for RemoteWorker { } // Inlet -> Outlet packet PortalMode::Outlet { outlet } => { - if let Some(connection) = outlet - .get_connection_external(identifier.clone(), msg.connection_identifier.clone()) - { + if let Some(connection) = outlet.get_connection_external( + their_identifier.clone(), + msg.connection_identifier.clone(), + ) { self.handle_outlet(outlet, &connection, msg, return_route) .await?; @@ -243,7 +249,7 @@ impl Worker for RemoteWorker { const SYN: u8 = 2; if msg.flags == SYN { let connection = self - .new_outlet_connection(outlet, identifier, &msg, return_route.clone()) + .new_outlet_connection(outlet, their_identifier, &msg, return_route.clone()) .await?; self.handle_outlet(outlet, &connection, msg, return_route) diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/inlet_listener.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/inlet_listener.rs index bbdb62fe9e3..07e2941b9e6 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/inlet_listener.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/inlet_listener.rs @@ -1,13 +1,14 @@ use crate::portal::addresses::{Addresses, PortalType}; use crate::portal::tls_certificate::TlsCertificateProvider; -use crate::portal::{ReadHalfMaybeTls, WriteHalfMaybeTls}; +use crate::portal::{InletSharedState, ReadHalfMaybeTls, WriteHalfMaybeTls}; use crate::{portal::TcpPortalWorker, TcpInlet, TcpInletOptions, TcpRegistry}; use log::warn; use ockam_core::compat::net::SocketAddr; -use ockam_core::compat::sync::{Arc, RwLock}; +use ockam_core::compat::sync::Arc; use ockam_core::errcode::{Kind, Origin}; use ockam_core::{async_trait, compat::boxed::Box, Result}; use ockam_core::{Address, Processor, Route}; +use ockam_node::compat::asynchronous::RwLock; use ockam_node::Context; use ockam_transport_core::{HostnamePort, TransportError}; use rustls::pki_types::CertificateDer; @@ -18,50 +19,6 @@ use tokio::time::Instant; use tokio_rustls::{TlsAcceptor, TlsStream}; use tracing::{debug, error, instrument}; -/// State shared between `TcpInletListenProcessor` and `TcpInlet` to allow manipulating its state -/// from outside the worker: update the route to the outlet or pause it. -#[derive(Debug, Clone)] -pub struct InletSharedState { - route: Route, - is_paused: bool, - // Starts with 0 and increments each time when inlet updates the route to the outlet - // (e.g. when reconnecting), this will allow outlet to figure out what is the most recent - // return_route even if messages arrive out-of-order - route_index: u32, -} - -impl InletSharedState { - pub fn route(&self) -> &Route { - &self.route - } - - pub fn update_route(&mut self, new_route: Route) { - self.route = new_route; - // Overflow here is very unlikely... - self.route_index += 1; - } - - pub fn is_paused(&self) -> bool { - self.is_paused - } - - pub fn set_is_paused(&mut self, is_paused: bool) { - self.is_paused = is_paused; - } - - pub fn route_index(&self) -> u32 { - self.route_index - } - - pub fn new(is_paused: bool, route: Route) -> Self { - Self { - route, - is_paused, - route_index: 0, - } - } -} - /// A TCP Portal Inlet listen processor /// /// TCP Portal Inlet listen processors are created by `TcpTransport` @@ -109,11 +66,8 @@ impl TcpInletListenProcessor { } }; let socket_addr = inner.local_addr().map_err(TransportError::from)?; - let inlet_shared_state = InletSharedState { - route: outlet_listener_route, - is_paused: options.is_paused, - route_index: 0, - }; + let inlet_shared_state = + InletSharedState::create(ctx, outlet_listener_route, options.is_paused).await?; let inlet_shared_state = Arc::new(RwLock::new(inlet_shared_state)); let processor = Self::new(registry, inner, inlet_shared_state.clone(), options); @@ -227,9 +181,9 @@ impl Processor for TcpInletListenProcessor { let addresses = Addresses::generate(PortalType::Inlet); - let inlet_shared_state = self.inlet_shared_state.read().unwrap().clone(); + let inlet_shared_state = self.inlet_shared_state.read().await.clone(); - if inlet_shared_state.is_paused { + if inlet_shared_state.is_paused() { // Just drop the stream return Ok(true); } @@ -237,7 +191,7 @@ impl Processor for TcpInletListenProcessor { TcpInletOptions::setup_flow_control( ctx.flow_controls(), &addresses, - inlet_shared_state.route.next()?, + inlet_shared_state.route().next()?, ); let streams = if let Some(certificate_provider) = &self.options.tls_certificate_provider { @@ -262,13 +216,13 @@ impl Processor for TcpInletListenProcessor { ) }; - // TODO: Make sure the connection can't be spoofed by someone having access to that Outlet TcpPortalWorker::start_new_inlet( ctx, self.registry.clone(), streams, HostnamePort::from(socket_addr), - inlet_shared_state.route, + inlet_shared_state.route().clone(), + inlet_shared_state.their_identifier(), addresses, self.options.incoming_access_control.clone(), self.options.outgoing_access_control.clone(), diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/inlet_shared_state.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/inlet_shared_state.rs new file mode 100644 index 00000000000..4322b3458b8 --- /dev/null +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/inlet_shared_state.rs @@ -0,0 +1,78 @@ +use ockam_core::{LocalInfoIdentifier, Result, Route, SecureChannelMetadata}; +use ockam_node::Context; + +/// State shared between `TcpInletListenProcessor` and `TcpInlet` to allow manipulating its state +/// from outside the worker: update the route to the outlet or pause it. +#[derive(Debug, Clone)] +pub struct InletSharedState { + route: Route, + // Identifier of the other side + // The identifier is always the same for the same route as is obtained from the first local + // secure channel on the route. However, we should recheck that identifier hasn't changed + // when updating the route. + their_identifier: Option, + is_paused: bool, + // Starts with 0 and increments each time when inlet updates the route to the outlet + // (e.g. when reconnecting), this will allow outlet to figure out what is the most recent + // return_route even if messages arrive out-of-order + route_index: u32, +} + +impl InletSharedState { + pub async fn create(ctx: &Context, route: Route, is_paused: bool) -> Result { + let their_identifier = + if let Some(terminal) = ctx.find_terminal_address(route.clone()).await? { + SecureChannelMetadata::from_terminal_address(&terminal) + .map(|m| m.their_identifier()) + .ok() + } else { + None + }; + + Ok(Self { + route, + their_identifier, + is_paused, + route_index: 0, + }) + } + + pub fn route(&self) -> &Route { + &self.route + } + + pub fn their_identifier(&self) -> Option { + self.their_identifier.clone() + } + + pub fn is_paused(&self) -> bool { + self.is_paused + } + + pub fn route_index(&self) -> u32 { + self.route_index + } + + pub async fn update_route(&mut self, ctx: &Context, new_route: Route) -> Result<()> { + let their_identifier = + if let Some(terminal) = ctx.find_terminal_address(new_route.clone()).await? { + SecureChannelMetadata::from_terminal_address(&terminal) + .map(|m| m.their_identifier()) + .ok() + } else { + None + }; + + self.their_identifier = their_identifier; + + self.route = new_route; + // Overflow here is very unlikely... + self.route_index += 1; + + Ok(()) + } + + pub fn set_is_paused(&mut self, is_paused: bool) { + self.is_paused = is_paused; + } +} diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/mod.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/mod.rs index 1112d6c4d21..069634bbd5d 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/mod.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/mod.rs @@ -1,5 +1,6 @@ pub mod addresses; mod inlet_listener; +mod inlet_shared_state; mod interceptor; pub mod options; mod outlet_listener; @@ -9,6 +10,7 @@ mod portal_worker; mod tls_certificate; pub(crate) use inlet_listener::*; +pub(crate) use inlet_shared_state::*; pub use interceptor::{ Direction, PortalInletInterceptor, PortalInterceptor, PortalInterceptorFactory, PortalInterceptorWorker, PortalOutletInterceptor, diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/outlet_listener.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/outlet_listener.rs index 1d5b727ae4b..aded5a09abe 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/outlet_listener.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/outlet_listener.rs @@ -1,6 +1,8 @@ use crate::portal::addresses::{Addresses, PortalType}; use crate::{portal::TcpPortalWorker, PortalMessage, TcpOutletOptions, TcpRegistry}; -use ockam_core::{async_trait, Address, DenyAll, NeutralMessage, Result, Routed, Worker}; +use ockam_core::{ + async_trait, Address, DenyAll, NeutralMessage, Result, Routed, SecureChannelLocalInfo, Worker, +}; use ockam_node::{Context, WorkerBuilder}; use ockam_transport_core::{HostnamePort, TransportError}; use tracing::{debug, instrument}; @@ -75,6 +77,9 @@ impl Worker for TcpOutletListenWorker { ctx: &mut Self::Context, msg: Routed, ) -> Result<()> { + let their_identifier = SecureChannelLocalInfo::find_info(msg.local_message()) + .map(|l| l.their_identifier()) + .ok(); let return_route = msg.return_route(); let src_addr = msg.src_addr(); let body = msg.into_body()?.into_vec(); @@ -88,14 +93,13 @@ impl Worker for TcpOutletListenWorker { TcpOutletOptions::setup_flow_control_for_outlet(ctx.flow_controls(), &addresses, &src_addr); - // TODO: Make sure the connection can't be spoofed by someone having access to that Outlet - TcpPortalWorker::start_new_outlet( ctx, self.registry.clone(), self.hostname_port.clone(), self.options.tls, return_route.clone(), + their_identifier, addresses.clone(), self.options.incoming_access_control.clone(), self.options.outgoing_access_control.clone(), diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_worker.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_worker.rs index b2a7180b366..7590e7071f8 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_worker.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_worker.rs @@ -6,7 +6,7 @@ use crate::{portal::TcpPortalRecvProcessor, PortalInternalMessage, PortalMessage use ockam_core::compat::{boxed::Box, sync::Arc}; use ockam_core::{ async_trait, AllowOnwardAddress, AllowSourceAddress, Decodable, DenyAll, IncomingAccessControl, - Mailbox, Mailboxes, OutgoingAccessControl, + LocalInfoIdentifier, Mailbox, Mailboxes, OutgoingAccessControl, SecureChannelLocalInfo, }; use ockam_core::{Any, Result, Route, Routed, Worker}; use ockam_node::{Context, ProcessorBuilder, WorkerBuilder}; @@ -41,6 +41,7 @@ enum State { pub(crate) struct TcpPortalWorker { registry: TcpRegistry, state: State, + their_identifier: Option, write_half: Option, read_half: Option, hostname_port: HostnamePort, @@ -73,6 +74,7 @@ impl TcpPortalWorker { streams: (ReadHalfMaybeTls, WriteHalfMaybeTls), hostname_port: HostnamePort, ping_route: Route, + their_identifier: Option, addresses: Addresses, incoming_access_control: Arc, outgoing_access_control: Arc, // To propagate to the receiver @@ -83,6 +85,7 @@ impl TcpPortalWorker { hostname_port, false, State::SendPing { ping_route }, + their_identifier, Some(streams), addresses, incoming_access_control, @@ -100,6 +103,7 @@ impl TcpPortalWorker { hostname_port: HostnamePort, tls: bool, pong_route: Route, + their_identifier: Option, addresses: Addresses, incoming_access_control: Arc, outgoing_access_control: Arc, @@ -110,6 +114,7 @@ impl TcpPortalWorker { hostname_port, tls, State::SendPong { pong_route }, + their_identifier, None, addresses, incoming_access_control, @@ -127,6 +132,7 @@ impl TcpPortalWorker { hostname_port: HostnamePort, is_tls: bool, state: State, + their_identifier: Option, streams: Option<(ReadHalfMaybeTls, WriteHalfMaybeTls)>, addresses: Addresses, incoming_access_control: Arc, @@ -156,6 +162,7 @@ impl TcpPortalWorker { let worker = Self { registry, state, + their_identifier, write_half: tx, read_half: rx, hostname_port, @@ -449,6 +456,14 @@ impl Worker for TcpPortalWorker { return Ok(()); } + let their_identifier = SecureChannelLocalInfo::find_info(msg.local_message()) + .map(|l| l.their_identifier()) + .ok(); + + if their_identifier != self.their_identifier { + return Err(TransportError::IdentifierChanged)?; + } + // Remove our own address from the route so the other end // knows what to do with the incoming message let state = self.clone_state(); diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/transport/portals.rs b/implementations/rust/ockam/ockam_transport_tcp/src/transport/portals.rs index b54e23742fe..6465368d4dd 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/transport/portals.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/transport/portals.rs @@ -3,9 +3,10 @@ use crate::{portal::TcpOutletListenWorker, TcpInletOptions, TcpOutletOptions, Tc use core::fmt; use core::fmt::{Debug, Formatter}; use ockam_core::compat::net::SocketAddr; -use ockam_core::compat::sync::{Arc, RwLock}; +use ockam_core::compat::sync::Arc; use ockam_core::flow_control::FlowControls; use ockam_core::{route, Address, Result, Route}; +use ockam_node::compat::asynchronous::RwLock; use ockam_node::Context; use ockam_transport_core::{parse_socket_addr, HostnamePort}; use tracing::instrument; @@ -192,6 +193,11 @@ impl TcpInlet { } } + /// Returns true if the Inlet is eBPF + pub fn is_ebpf(&self) -> bool { + matches!(self.state, TcpInletState::Ebpf { .. }) + } + /// Socket Address pub fn socket_address(&self) -> SocketAddr { self.socket_address @@ -213,27 +219,25 @@ impl TcpInlet { /// Update the route to the outlet node. /// This is useful if we re-create a secure channel if because, e.g., the other node wasn't /// reachable, or if we want to switch transport, e.g., from relayed to UDP NAT puncture. - /// NOTE: Existing TCP connections will still use the old route, + /// NOTE: For regular Portals existing TCP connections will still use the old route, /// only newly accepted connections will use the new route. - pub fn update_outlet_node_route( - &self, - flow_controls: &FlowControls, - new_route: Route, - ) -> Result<()> { - let mut inlet_shared_state = self.inlet_shared_state.write().unwrap(); + /// For eBPF Portals old connections can continue work in case the Identifier of the + /// Outlet node didn't change + pub async fn update_outlet_node_route(&self, ctx: &Context, new_route: Route) -> Result<()> { + let mut inlet_shared_state = self.inlet_shared_state.write().await; let new_route = Self::build_new_full_route(new_route, inlet_shared_state.route())?; let next = new_route.next()?.clone(); - inlet_shared_state.update_route(new_route); + inlet_shared_state.update_route(ctx, new_route).await?; - self.update_flow_controls(flow_controls, next); + self.update_flow_controls(ctx.flow_controls(), next); Ok(()) } /// Pause TCP Inlet, all incoming TCP streams will be dropped. - pub fn pause(&self) { - let mut inlet_shared_state = self.inlet_shared_state.write().unwrap(); + pub async fn pause(&self) { + let mut inlet_shared_state = self.inlet_shared_state.write().await; inlet_shared_state.set_is_paused(true); } @@ -254,16 +258,16 @@ impl TcpInlet { } /// Unpause TCP Inlet and update the outlet route. - pub fn unpause(&self, flow_controls: &FlowControls, new_route: Route) -> Result<()> { - let mut inlet_shared_state = self.inlet_shared_state.write().unwrap(); + pub async fn unpause(&self, ctx: &Context, new_route: Route) -> Result<()> { + let mut inlet_shared_state = self.inlet_shared_state.write().await; let new_route = Self::build_new_full_route(new_route, inlet_shared_state.route())?; let next = new_route.next()?.clone(); - inlet_shared_state.update_route(new_route); + inlet_shared_state.update_route(ctx, new_route).await?; inlet_shared_state.set_is_paused(false); - self.update_flow_controls(flow_controls, next); + self.update_flow_controls(ctx.flow_controls(), next); Ok(()) } diff --git a/implementations/rust/ockam/ockam_transport_tcp/tests/portal.rs b/implementations/rust/ockam/ockam_transport_tcp/tests/portal.rs index 08ab03d0de5..30f509eaf01 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/tests/portal.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/tests/portal.rs @@ -300,7 +300,9 @@ async fn portal__update_route__should_succeed(ctx: &mut Context) -> Result<()> { node_connection1.stop(ctx).await?; - inlet.update_outlet_node_route(ctx.flow_controls(), route![node_connection2])?; + inlet + .update_outlet_node_route(ctx, route![node_connection2]) + .await?; let mut stream = TcpStream::connect(inlet.socket_address()).await.unwrap(); write_binary(&mut stream, payload1).await;