Skip to content

Commit

Permalink
feat(rust): add route_index
Browse files Browse the repository at this point in the history
  • Loading branch information
SanjoDeundiak committed Oct 29, 2024
1 parent 08c2c22 commit 3a220f5
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,16 @@ impl Distribution<ConnectionIdentifier> for Standard {
#[rustfmt::skip]
pub struct OckamPortalPacket {
#[n(0)] pub connection_identifier: ConnectionIdentifier,
#[n(1)] pub sequence: u32,
#[n(2)] pub acknowledgement: u32,
#[n(3)] pub data_offset: u8,
#[n(4)] pub reserved: u8,
#[n(5)] pub flags: u8,
#[n(6)] pub window: u16,
#[n(7)] pub urgent_ptr: u16,
#[n(8)] pub options: Vec<TcpOption>,
#[n(9)] pub payload: Vec<u8>,
#[n(1)] pub route_index: u32,
#[n(2)] pub sequence: u32,
#[n(3)] pub acknowledgement: u32,
#[n(4)] pub data_offset: u8,
#[n(5)] pub reserved: u8,
#[n(6)] pub flags: u8,
#[n(7)] pub window: u16,
#[n(8)] pub urgent_ptr: u16,
#[n(9)] pub options: Vec<TcpOption>,
#[n(10)] pub payload: Vec<u8>,
}

#[allow(missing_docs)]
Expand All @@ -66,9 +67,11 @@ impl OckamPortalPacket {
pub fn from_raw_socket_packet(
value: RawSocketPacket,
connection_identifier: ConnectionIdentifier,
route_index: u32,
) -> Self {
Self {
connection_identifier,
route_index,
sequence: value.sequence,
acknowledgement: value.acknowledgement,
data_offset: value.data_offset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,11 @@ impl Processor for InternalProcessor {
};

match &self.mode {
// Client -> Inlet packet
PortalMode::Inlet { inlet } => {
let inlet_shared_state = inlet.inlet_shared_state.read().unwrap().clone();

if inlet_shared_state.is_paused {
if inlet_shared_state.is_paused() {
return Ok(true);
}

Expand Down Expand Up @@ -114,19 +115,21 @@ impl Processor for InternalProcessor {
let portal_packet = OckamPortalPacket::from_raw_socket_packet(
parsed_packet.packet,
connection.connection_identifier.clone(),
inlet_shared_state.route_index(),
);

trace!("Inlet Processor: Got packet, forwarding to the other side");

ctx.forward_from_address(
LocalMessage::new()
.with_onward_route(inlet_shared_state.route)
.with_onward_route(inlet_shared_state.route().clone())
.with_return_route(route![inlet.remote_worker_address.clone()])
.with_payload(minicbor::to_vec(portal_packet)?),
ctx.address(),
)
.await?;
}
// Server -> Outlet packet
PortalMode::Outlet { outlet } => {
let connection =
match outlet.get_connection_internal(parsed_packet.packet.destination) {
Expand All @@ -149,13 +152,16 @@ impl Processor for InternalProcessor {
let portal_packet = OckamPortalPacket::from_raw_socket_packet(
parsed_packet.packet,
connection.connection_identifier.clone(),
0, // Doesn't matter for the outlet, as outlet can't update the route
);

trace!("Outlet Processor: Got packet, forwarding to the other side");

let return_route = connection.return_route.read().unwrap().route.clone();

ctx.forward_from_address(
LocalMessage::new()
.with_onward_route(connection.return_route.clone())
.with_onward_route(return_route)
.with_return_route(route![outlet.remote_worker_address.clone()])
.with_payload(minicbor::to_vec(portal_packet)?),
ctx.address(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,10 @@ impl TcpTransport {

let write_handle = self.start_raw_socket_processor_if_needed().await?;

let inlet_shared_state = Arc::new(RwLock::new(InletSharedState {
route: outlet_route.clone(),
is_paused: false,
}));
let inlet_shared_state = Arc::new(RwLock::new(InletSharedState::new(
false,
outlet_route.clone(),
)));

let remote_worker_address = Address::random_tagged("Ebpf.RemoteWorker.Inlet");
let internal_worker_address = Address::random_tagged("Ebpf.InternalWorker.Inlet");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,24 @@ struct OutletConnectionKey {
connection_identifier: ConnectionIdentifier,
}

/// Updatable return_route to the Inlet (updatable by the Inlet)
pub struct OutletConnectionReturnRoute {
/// Route
pub route: Route,
/// Number of the route. Starts from 0 and Inlet updates it each time.
pub route_index: u32,
}

impl OutletConnectionReturnRoute {
/// Constructor. Route index starts with 0
pub fn new(route: Route) -> Self {
Self {
route,
route_index: 0,
}
}
}

/// Outlet mapping
pub struct OutletConnection {
/// Identity Identifier of the other side
Expand All @@ -138,7 +156,7 @@ pub struct OutletConnection {
/// Assigned port on our machine for a specific connection
pub assigned_port: Port,
/// Route to the other side PortalWorker
pub return_route: Route, // TODO: Update it if the inlet updates the route
pub return_route: Arc<RwLock<OutletConnectionReturnRoute>>,
/// To hold the port
pub _tcp_listener: Arc<TcpListener>,
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::ebpf_portal::{
Inlet, InletConnection, OckamPortalPacket, Outlet, OutletConnection, Port,
TcpTransportEbpfSupport,
Inlet, InletConnection, OckamPortalPacket, Outlet, OutletConnection,
OutletConnectionReturnRoute, Port, TcpTransportEbpfSupport,
};
use log::{debug, trace};
use ockam_core::{async_trait, Any, Result, Route, Routed, Worker};
Expand All @@ -9,7 +9,7 @@ use ockam_transport_core::TransportError;
use pnet::packet::tcp::MutableTcpPacket;
use pnet::transport::TransportSender;
use std::net::{IpAddr, Ipv4Addr};
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, RwLock};
use tokio::net::TcpListener;
use tracing::warn;

Expand Down Expand Up @@ -86,7 +86,7 @@ impl RemoteWorker {
connection_identifier: msg.connection_identifier.clone(),
assigned_port,
_tcp_listener: Arc::new(tcp_listener),
return_route,
return_route: Arc::new(RwLock::new(OutletConnectionReturnRoute::new(return_route))),
});

outlet.add_connection(connection.clone());
Expand Down Expand Up @@ -176,7 +176,17 @@ impl RemoteWorker {
outlet: &Outlet,
connection: &OutletConnection,
msg: OckamPortalPacket,
return_route: Route,
) -> Result<()> {
{
let mut connection_return_route = connection.return_route.write().unwrap();

if connection_return_route.route_index < msg.route_index {
connection_return_route.route_index = msg.route_index;
connection_return_route.route = return_route;
}
}

self.handle(
msg,
connection.assigned_port,
Expand Down Expand Up @@ -205,6 +215,7 @@ impl Worker for RemoteWorker {
let identifier = None; // FIXME: Should be the Identifier of the other side

match &self.mode {
// Outlet -> Inlet packet
PortalMode::Inlet { inlet } => {
match inlet.get_connection_external(identifier, msg.connection_identifier.clone()) {
Some(connection) => {
Expand All @@ -217,21 +228,26 @@ impl Worker for RemoteWorker {

return Ok(());
}
// Inlet -> Outlet packet
PortalMode::Outlet { outlet } => {
if let Some(connection) = outlet
.get_connection_external(identifier.clone(), msg.connection_identifier.clone())
{
self.handle_outlet(outlet, &connection, msg).await?;
self.handle_outlet(outlet, &connection, msg, return_route)
.await?;

return Ok(());
}

if msg.flags == 2 {
// Checks that SYN flag is set, and every other flag is not set.
const SYN: u8 = 2;
if msg.flags == SYN {
let connection = self
.new_outlet_connection(outlet, identifier, &msg, return_route)
.new_outlet_connection(outlet, identifier, &msg, return_route.clone())
.await?;

self.handle_outlet(outlet, &connection, msg).await?;
self.handle_outlet(outlet, &connection, msg, return_route)
.await?;

return Ok(());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,44 @@ use tracing::{debug, error, instrument};
/// from outside the worker: update the route to the outlet or pause it.
#[derive(Debug, Clone)]
pub struct InletSharedState {
pub route: Route,
pub is_paused: bool,
route: Route,
is_paused: bool,
// Starts with 0 and increments each time when inlet updates the route to the outlet
// (e.g. when reconnecting), this will allow outlet to figure out what is the most recent
// return_route even if messages arrive out-of-order
route_index: u32,
}

impl InletSharedState {
pub fn route(&self) -> &Route {
&self.route
}

pub fn update_route(&mut self, new_route: Route) {
self.route = new_route;
// Overflow here is very unlikely...
self.route_index += 1;
}

pub fn is_paused(&self) -> bool {
self.is_paused
}

pub fn set_is_paused(&mut self, is_paused: bool) {
self.is_paused = is_paused;
}

pub fn route_index(&self) -> u32 {
self.route_index
}

pub fn new(is_paused: bool, route: Route) -> Self {
Self {
route,
is_paused,
route_index: 0,
}
}
}

/// A TCP Portal Inlet listen processor
Expand Down Expand Up @@ -76,6 +112,7 @@ impl TcpInletListenProcessor {
let inlet_shared_state = InletSharedState {
route: outlet_listener_route,
is_paused: options.is_paused,
route_index: 0,
};
let inlet_shared_state = Arc::new(RwLock::new(inlet_shared_state));
let processor = Self::new(registry, inner, inlet_shared_state.clone(), options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ pub enum PortalMessage<'de> {
/// or from the target to the Inlet was dropped
Disconnect,
/// Message with binary payload and packet counter
// TODO: Add route_index. May not be as important as for eBPF portals, as regular portals
// require reliable channel anyways. And if PortalMessage is sent over a channel that
// guarantees ordering, we don't need route_index
Payload(&'de [u8], Option<u16>),
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,9 @@ impl TcpInlet {
) -> Result<()> {
let mut inlet_shared_state = self.inlet_shared_state.write().unwrap();

let new_route = Self::build_new_full_route(new_route, &inlet_shared_state.route)?;
let new_route = Self::build_new_full_route(new_route, inlet_shared_state.route())?;
let next = new_route.next()?.clone();
inlet_shared_state.route = new_route;
inlet_shared_state.update_route(new_route);

self.update_flow_controls(flow_controls, next);

Expand All @@ -235,7 +235,7 @@ impl TcpInlet {
pub fn pause(&self) {
let mut inlet_shared_state = self.inlet_shared_state.write().unwrap();

inlet_shared_state.is_paused = true;
inlet_shared_state.set_is_paused(true);
}

fn update_flow_controls(&self, flow_controls: &FlowControls, next: Address) {
Expand All @@ -257,12 +257,12 @@ impl TcpInlet {
pub fn unpause(&self, flow_controls: &FlowControls, new_route: Route) -> Result<()> {
let mut inlet_shared_state = self.inlet_shared_state.write().unwrap();

let new_route = Self::build_new_full_route(new_route, &inlet_shared_state.route)?;
let new_route = Self::build_new_full_route(new_route, inlet_shared_state.route())?;
let next = new_route.next()?.clone();

inlet_shared_state.route =
Self::build_new_full_route(new_route, &inlet_shared_state.route)?;
inlet_shared_state.is_paused = false;
inlet_shared_state.update_route(new_route);
inlet_shared_state.set_is_paused(false);

self.update_flow_controls(flow_controls, next);

Ok(())
Expand Down

0 comments on commit 3a220f5

Please sign in to comment.