Skip to content

Commit

Permalink
feat(rust): tie each tcp connection inside portal to an Identifier
Browse files Browse the repository at this point in the history
  • Loading branch information
SanjoDeundiak committed Oct 29, 2024
1 parent 3a220f5 commit 5cec5ca
Show file tree
Hide file tree
Showing 15 changed files with 205 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,9 @@ impl InletSessionReplacer {
// Finally, attempt to create/update inlet using the new route
let inlet_address = match self.inlet.clone() {
Some(inlet) => {
inlet.unpause(
self.context.flow_controls(),
normalized_stripped_route.clone(),
)?;
inlet
.unpause(&self.context, normalized_stripped_route.clone())
.await?;

inlet.processor_address().cloned()
}
Expand Down Expand Up @@ -216,7 +215,7 @@ impl InletSessionReplacer {

async fn pause_inlet(&mut self) {
if let Some(inlet) = self.inlet.as_mut() {
inlet.pause();
inlet.pause().await;
}
}

Expand Down Expand Up @@ -387,7 +386,7 @@ impl AdditionalSessionReplacer for InletSessionReplacer {
additional_sc.update_remote_node_route(route![puncture.sender_address()])?;

let new_route = route![additional_sc.clone()];
inlet.unpause(self.context.flow_controls(), new_route.clone())?;
inlet.unpause(&self.context, new_route.clone()).await?;

self.additional_route = Some(new_route.clone());

Expand All @@ -401,14 +400,14 @@ impl AdditionalSessionReplacer for InletSessionReplacer {
match self.main_route.as_ref() {
Some(main_route) if enable_fallback => {
// Switch Inlet to the main route
let res = inlet.unpause(self.context.flow_controls(), main_route.clone());
let res = inlet.unpause(&self.context, main_route.clone()).await;

if let Some(err) = res.err() {
error!("Error switching Inlet to the main route {}", err);
}
}
_ => {
inlet.pause();
inlet.pause().await;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::alloc::string::ToString;
use core::fmt::{Debug, Formatter};
use minicbor::{CborLen, Decode, Encode};

use crate::alloc::string::ToString;

/// Identifier length
pub const IDENTIFIER_LEN: usize = 32;

Expand Down
7 changes: 7 additions & 0 deletions implementations/rust/ockam/ockam_transport_core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ pub enum TransportError {
ReadCaps(String),
/// eBPF prerequisites check failed
EbpfPrerequisitesCheckFailed(String),
/// The Identifier of the other side of the portal has changed when updating the route
IdentifierChanged,
}

impl ockam_core::compat::error::Error for TransportError {}
Expand Down Expand Up @@ -127,6 +129,10 @@ impl core::fmt::Display for TransportError {
Self::EbpfPrerequisitesCheckFailed(e) => {
write!(f, "eBPF prerequisites check failed: {}", e)
}
Self::IdentifierChanged => write!(
f,
"identifier of the other side of the portal has changed when updating the route"
),
}
}
}
Expand Down Expand Up @@ -167,6 +173,7 @@ impl From<TransportError> for Error {
| RemovingOutletPort(_) => Kind::Io,
ReadCaps(_) => Kind::Io,
EbpfPrerequisitesCheckFailed(_) => Kind::Misuse,
IdentifierChanged => Kind::Conflict,
};

Error::new(Origin::Transport, kind, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use crate::ebpf_portal::{
Inlet, InletConnection, OckamPortalPacket, Outlet, ParsedRawSocketPacket, PortalMode,
};
use log::{debug, trace, warn};
use ockam_core::{async_trait, route, LocalMessage, Processor, Result};
use ockam_core::{async_trait, route, LocalInfoIdentifier, LocalMessage, Processor, Result};
use ockam_node::Context;
use ockam_transport_core::TransportError;
use rand::random;
use std::net::Ipv4Addr;
use std::sync::Arc;
Expand Down Expand Up @@ -36,15 +37,14 @@ impl InternalProcessor {

async fn new_inlet_connection(
inlet: &Inlet,
their_identifier: Option<LocalInfoIdentifier>,
src_ip: Ipv4Addr,
parsed_packet: &ParsedRawSocketPacket,
) -> Result<Arc<InletConnection>> {
// TODO: eBPF Remove connection eventually

// TODO: Make sure the connection can't be spoofed by someone having access to that Outlet

let connection = Arc::new(InletConnection {
identifier: None,
their_identifier,
connection_identifier: random(),
inlet_ip: parsed_packet.destination_ip,
client_ip: src_ip,
Expand All @@ -70,7 +70,7 @@ impl Processor for InternalProcessor {
match &self.mode {
// Client -> Inlet packet
PortalMode::Inlet { inlet } => {
let inlet_shared_state = inlet.inlet_shared_state.read().unwrap().clone();
let inlet_shared_state = inlet.inlet_shared_state.read().await.clone();

if inlet_shared_state.is_paused() {
return Ok(true);
Expand All @@ -86,6 +86,11 @@ impl Processor for InternalProcessor {
parsed_packet.packet.source_ip,
parsed_packet.packet.source
);

if connection.their_identifier != inlet_shared_state.their_identifier() {
return Err(TransportError::IdentifierChanged)?;
}

connection
}
None => {
Expand All @@ -105,6 +110,7 @@ impl Processor for InternalProcessor {
);
Self::new_inlet_connection(
inlet,
inlet_shared_state.their_identifier(),
parsed_packet.packet.source_ip,
&parsed_packet,
)
Expand Down Expand Up @@ -152,7 +158,7 @@ impl Processor for InternalProcessor {
let portal_packet = OckamPortalPacket::from_raw_socket_packet(
parsed_packet.packet,
connection.connection_identifier.clone(),
0, // Doesn't matter for the outlet, as outlet can't update the route
0, // Doesn't matter for the outlet, as outlets can't update the route
);

trace!("Outlet Processor: Got packet, forwarding to the other side");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ use core::fmt::Debug;
use log::{debug, error};
use nix::unistd::Uid;
use ockam_core::{Address, DenyAll, Result, Route};
use ockam_node::compat::asynchronous::resolve_peer;
use ockam_node::compat::asynchronous::{resolve_peer, RwLock};
use ockam_node::{ProcessorBuilder, WorkerBuilder};
use ockam_transport_core::{HostnamePort, TransportError};
use std::net::{IpAddr, SocketAddrV4};
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::sync::mpsc::channel;
use tracing::instrument;
Expand Down Expand Up @@ -104,10 +104,9 @@ impl TcpTransport {

let write_handle = self.start_raw_socket_processor_if_needed().await?;

let inlet_shared_state = Arc::new(RwLock::new(InletSharedState::new(
false,
outlet_route.clone(),
)));
let inlet_shared_state =
InletSharedState::create(self.ctx(), outlet_route.clone(), false).await?;
let inlet_shared_state = Arc::new(RwLock::new(inlet_shared_state));

let remote_worker_address = Address::random_tagged("Ebpf.RemoteWorker.Inlet");
let internal_worker_address = Address::random_tagged("Ebpf.InternalWorker.Inlet");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
use crate::ebpf_portal::{ConnectionIdentifier, ParsedRawSocketPacket, Port};
use crate::portal::InletSharedState;
use ockam_core::Address;
use ockam_core::compat::sync::Arc;
use ockam_core::compat::sync::RwLock as SyncRwLock;
use ockam_core::{Address, LocalInfoIdentifier};
use ockam_node::compat::asynchronous::RwLock as AsyncRwLock;
use std::collections::HashMap;
use std::net::Ipv4Addr;
use std::sync::{Arc, RwLock};
use tokio::net::TcpListener;
use tokio::sync::mpsc::Sender;

/// Inlet registry
#[derive(Default, Clone)]
pub(crate) struct InletRegistry {
inlets: Arc<RwLock<HashMap<Port, Inlet>>>,
inlets: Arc<SyncRwLock<HashMap<Port, Inlet>>>,
}

impl InletRegistry {
Expand All @@ -29,7 +31,7 @@ impl InletRegistry {
sender: Sender<ParsedRawSocketPacket>,
port: Port,
tcp_listener: TcpListener,
inlet_shared_state: Arc<RwLock<InletSharedState>>,
inlet_shared_state: Arc<AsyncRwLock<InletSharedState>>,
) -> Inlet {
let mut inlets = self.inlets.write().unwrap();

Expand Down Expand Up @@ -69,12 +71,12 @@ pub struct Inlet {
/// Port
pub port: Port,
/// Route to the corresponding Outlet
pub inlet_shared_state: Arc<RwLock<InletSharedState>>,
pub inlet_shared_state: Arc<AsyncRwLock<InletSharedState>>,
/// Hold to mark the port as taken
pub _tcp_listener: Arc<TcpListener>,
/// Same map with different key
connections1: Arc<RwLock<HashMap<InletConnectionKey1, Arc<InletConnection>>>>,
connections2: Arc<RwLock<HashMap<InletConnectionKey2, Arc<InletConnection>>>>,
connections1: Arc<SyncRwLock<HashMap<InletConnectionKey1, Arc<InletConnection>>>>,
connections2: Arc<SyncRwLock<HashMap<InletConnectionKey2, Arc<InletConnection>>>>,
}

impl Inlet {
Expand All @@ -89,7 +91,7 @@ impl Inlet {
);
self.connections2.write().unwrap().insert(
InletConnectionKey2 {
identifier: connection.identifier.clone(),
their_identifier: connection.their_identifier.clone(),
connection_identifier: connection.connection_identifier.clone(),
},
connection,
Expand All @@ -115,14 +117,14 @@ impl Inlet {
/// Get mapping
pub(crate) fn get_connection_external(
&self,
identifier: Option<String>, // Identity
their_identifier: Option<LocalInfoIdentifier>, // Identity
connection_identifier: ConnectionIdentifier,
) -> Option<Arc<InletConnection>> {
self.connections2
.read()
.unwrap()
.get(&InletConnectionKey2 {
identifier,
their_identifier,
connection_identifier,
})
.cloned()
Expand All @@ -137,14 +139,14 @@ struct InletConnectionKey1 {

#[derive(Hash, PartialEq, Eq)]
struct InletConnectionKey2 {
identifier: Option<String>,
their_identifier: Option<LocalInfoIdentifier>,
connection_identifier: ConnectionIdentifier,
}

/// Inlet Mapping
pub struct InletConnection {
/// Identity Identifier of the other side
pub identifier: Option<String>,
pub their_identifier: Option<LocalInfoIdentifier>,
/// Unique connection Identifier
pub connection_identifier: ConnectionIdentifier,
/// We can listen of multiple IPs
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::ebpf_portal::{ConnectionIdentifier, ParsedRawSocketPacket, Port};
use ockam_core::{Address, Route};
use ockam_core::{Address, LocalInfoIdentifier, Route};
use std::collections::HashMap;
use std::net::Ipv4Addr;
use std::sync::{Arc, RwLock};
Expand Down Expand Up @@ -87,7 +87,7 @@ impl Outlet {
.insert(connection.assigned_port, connection.clone());
self.connections2.write().unwrap().insert(
OutletConnectionKey {
identifier: connection.identifier.clone(),
their_identifier: connection.their_identifier.clone(),
connection_identifier: connection.connection_identifier.clone(),
},
connection,
Expand All @@ -109,14 +109,14 @@ impl Outlet {
/// Get mapping
pub(crate) fn get_connection_external(
&self,
identifier: Option<String>, // Identity
their_identifier: Option<LocalInfoIdentifier>, // Identity
connection_identifier: ConnectionIdentifier,
) -> Option<Arc<OutletConnection>> {
self.connections2
.read()
.unwrap()
.get(&OutletConnectionKey {
identifier,
their_identifier,
connection_identifier,
})
.cloned()
Expand All @@ -125,7 +125,7 @@ impl Outlet {

#[derive(Hash, PartialEq, Eq)]
struct OutletConnectionKey {
identifier: Option<String>,
their_identifier: Option<LocalInfoIdentifier>,
connection_identifier: ConnectionIdentifier,
}

Expand All @@ -150,7 +150,7 @@ impl OutletConnectionReturnRoute {
/// Outlet mapping
pub struct OutletConnection {
/// Identity Identifier of the other side
pub identifier: Option<String>,
pub their_identifier: Option<LocalInfoIdentifier>,
/// Unique connection Identifier
pub connection_identifier: ConnectionIdentifier,
/// Assigned port on our machine for a specific connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use crate::ebpf_portal::{
OutletConnectionReturnRoute, Port, TcpTransportEbpfSupport,
};
use log::{debug, trace};
use ockam_core::{async_trait, Any, Result, Route, Routed, Worker};
use ockam_core::{
async_trait, Any, LocalInfoIdentifier, Result, Route, Routed, SecureChannelLocalInfo, Worker,
};
use ockam_node::Context;
use ockam_transport_core::TransportError;
use pnet::packet::tcp::MutableTcpPacket;
Expand Down Expand Up @@ -65,7 +67,7 @@ impl RemoteWorker {
async fn new_outlet_connection(
&self,
outlet: &Outlet,
identifier: Option<String>,
identifier: Option<LocalInfoIdentifier>,
msg: &OckamPortalPacket,
return_route: Route,
) -> Result<Arc<OutletConnection>> {
Expand All @@ -82,7 +84,7 @@ impl RemoteWorker {
debug!("New TCP connection. Assigned socket {}", local_addr);

let connection = Arc::new(OutletConnection {
identifier,
their_identifier: identifier,
connection_identifier: msg.connection_identifier.clone(),
assigned_port,
_tcp_listener: Arc::new(tcp_listener),
Expand Down Expand Up @@ -207,17 +209,20 @@ impl Worker for RemoteWorker {
_ctx: &mut Self::Context,
msg: Routed<Self::Message>,
) -> Result<()> {
let their_identifier = SecureChannelLocalInfo::find_info(msg.local_message())
.map(|l| l.their_identifier())
.ok();
let return_route = msg.return_route();
let payload = msg.into_payload();

let msg: OckamPortalPacket = minicbor::decode(&payload)?;

let identifier = None; // FIXME: Should be the Identifier of the other side

match &self.mode {
// Outlet -> Inlet packet
PortalMode::Inlet { inlet } => {
match inlet.get_connection_external(identifier, msg.connection_identifier.clone()) {
match inlet
.get_connection_external(their_identifier, msg.connection_identifier.clone())
{
Some(connection) => {
self.handle_inlet(inlet, &connection, msg).await?;
}
Expand All @@ -230,9 +235,10 @@ impl Worker for RemoteWorker {
}
// Inlet -> Outlet packet
PortalMode::Outlet { outlet } => {
if let Some(connection) = outlet
.get_connection_external(identifier.clone(), msg.connection_identifier.clone())
{
if let Some(connection) = outlet.get_connection_external(
their_identifier.clone(),
msg.connection_identifier.clone(),
) {
self.handle_outlet(outlet, &connection, msg, return_route)
.await?;

Expand All @@ -243,7 +249,7 @@ impl Worker for RemoteWorker {
const SYN: u8 = 2;
if msg.flags == SYN {
let connection = self
.new_outlet_connection(outlet, identifier, &msg, return_route.clone())
.new_outlet_connection(outlet, their_identifier, &msg, return_route.clone())
.await?;

self.handle_outlet(outlet, &connection, msg, return_route)
Expand Down
Loading

0 comments on commit 5cec5ca

Please sign in to comment.