Skip to content

Commit

Permalink
add connection to trait
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-maron committed Jan 31, 2024
1 parent 86b71f1 commit 86a1702
Show file tree
Hide file tree
Showing 8 changed files with 311 additions and 29 deletions.
52 changes: 52 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ tokio.workspace = true
jf-primitives.workspace = true
ark-serialize = "0.4.2"
rand.workspace = true
parking_lot = "0.12.1"
parking_lot = "0.12.1"
url = "2.5.0"
4 changes: 0 additions & 4 deletions proto/src/address_source.rs

This file was deleted.

54 changes: 53 additions & 1 deletion proto/src/connection/fallible/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
//! connection that implements our message framing and connection
//! logic.

use quinn::Endpoint;

use crate::{
bail,
connection::Connection,
Expand All @@ -10,7 +12,7 @@ use crate::{
MAX_MESSAGE_SIZE,
};
use core::hash::Hash;
use std::sync::Arc;
use std::{net::SocketAddr, sync::Arc};

/// `Fallible` is a thin wrapper around `quinn::Connection` that implements
/// `Connection`.
Expand Down Expand Up @@ -113,4 +115,54 @@ impl Connection for Fallible {
"failed to finish stream"
))
}

/// Connect to a remote endpoint, returning an instance of `Self`. With QUIC,
/// this requires creating an endpoint, binding to it, and then attempting
/// a connection.
///
/// # Errors
/// Errors if we fail to connect or if we fail to bind to the interface we want.
async fn connect(remote_endpoint: String) -> Result<Self>
where
Self: Sized,
{
// Parse the socket address
let remote_address = bail!(
remote_endpoint.parse(),
Parse,
"failed to parse remote endpoint"
);

// Parse host for certificate. We need this to ensure that the
// TLS cert matches what the server is providing.
let domain_name = bail!(
url::Host::parse(&remote_endpoint),
Parse,
"failed to parse host from remote endpoint"
)
.to_string();

// Create QUIC endpoint
let endpoint = bail!(
Endpoint::client(bail!(
"0.0.0.0:0".parse(),
Parse,
"failed to parse local bind address"
)),
Connection,
"failed to bind to local address"
);

// Connect with QUIC endpoint to remote address
Ok(Self(bail!(
bail!(
endpoint.connect(remote_address, &domain_name),
Connection,
"failed to connect to remote address"
)
.await,
Connection,
"failed to connect to remote address"
)))
}
}
48 changes: 46 additions & 2 deletions proto/src/connection/fallible/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::tcp::{OwnedReadHalf, OwnedWriteHalf},
net::{
tcp::{OwnedReadHalf, OwnedWriteHalf},
TcpSocket,
},
sync::Mutex,
};

Expand All @@ -15,7 +18,7 @@ use crate::{
message::Message,
MAX_MESSAGE_SIZE,
};
use std::sync::Arc;
use std::{net::SocketAddr, sync::Arc};

/// `Fallible` is a thin wrapper around `OwnedReadHalf` and `OwnedWriteHalf` that implements
/// `Connection`.
Expand Down Expand Up @@ -102,4 +105,45 @@ impl Connection for Fallible {

Ok(())
}

/// Connect to a remote endpoint, returning an instance of `Self`.
/// With TCP, this requires just connecting to the remote endpoint.
///
/// # Errors
/// Errors if we fail to connect or if we fail to bind to the interface we want.
async fn connect(remote_endpoint: String) -> Result<Self>
where
Self: Sized,
{
// Parse the socket address
let remote_address = bail!(
remote_endpoint.parse(),
Parse,
"failed to parse remote endpoint"
);

// Create a new TCP socket
let socket = bail!(
TcpSocket::new_v4(),
Connection,
"failed to bind to local socket"
);

// Connect the stream to the local socket
let stream = bail!(
socket.connect(remote_address).await,
Connection,
"failed to connect to remote address"
);

// Split the connection into a `ReadHalf` and `WriteHalf` so we can operate
// concurrently over both
let (read_half, write_half) = stream.into_split();

// `Mutex` and `Arc` each side
Ok(Self {
receiver: Arc::from(Mutex::from(read_half)),
sender: Arc::from(Mutex::from(write_half)),
})
}
}
13 changes: 10 additions & 3 deletions proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::sync::Arc;
use std::{net::SocketAddr, sync::Arc};

use crate::{error::Result, message::Message};

pub mod fallible;
pub mod sticky;

pub trait Connection{
pub trait Connection {
/// Receive a single message from the connection.
///
/// # Errors
Expand All @@ -17,5 +17,12 @@ pub trait Connection{
/// # Errors
/// Errors if we fail to deliver the message. This usually means a connection problem.
async fn send_message(&self, message: Arc<Message>) -> Result<()>;
}

/// Connect to a remote address, returning an instance of `Self`.
///
/// # Errors
/// Errors if we fail to connect or if we fail to bind to the interface we want.
async fn connect(remote_endpoint: String) -> Result<Self>
where
Self: Sized;
}
84 changes: 79 additions & 5 deletions proto/src/connection/sticky.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use jf_primitives::signatures::SignatureScheme as JfSignatureScheme;
use parking_lot::Mutex;
use tokio::sync::{OnceCell, RwLock};

use crate::messages_capnp::Topic;
use crate::{error::Result, message::Topic};

use super::Connection as ProtoConnection;

Expand All @@ -31,12 +31,19 @@ pub struct Sticky<SignatureScheme: JfSignatureScheme, Connection: ProtoConnectio
struct StickyInner<SignatureScheme: JfSignatureScheme, Connection: ProtoConnection> {
/// This is the remote address that we authenticate to. It can either be a broker
/// or a marshal. The authentication flow depends on the function defined in
/// `auth_flow`.
address: SocketAddr,
/// `auth_flow`, so this may not be the final address (e.g. if you are asking the
/// marshal for it).
remote_address: String,

/// The authentication flow that is followed when connecting. Uses the `SocketAddr`
/// as the endpoint for the connection.
auth_flow: fn(SocketAddr, Connection) -> Connection,
/// as the endpoint for the connection. Takes the signing and verification keys
/// in case they are needed.
auth_flow: fn(
SocketAddr,
Connection,
SignatureScheme::SigningKey,
SignatureScheme::VerificationKey,
) -> Connection,

/// A list of topics we are subscribed to. This allows us to, when reconnecting,
/// easily provide the list of topics we care about.
Expand All @@ -53,3 +60,70 @@ struct StickyInner<SignatureScheme: JfSignatureScheme, Connection: ProtoConnecti
/// authentication phase.
signing_key: SignatureScheme::SigningKey,
}

/// The configuration needed to construct a client
pub struct Config<SignatureScheme: JfSignatureScheme, Connection: ProtoConnection> {
/// The verification (public) key. Sent to the server to verify
/// our identity.
pub verification_key: SignatureScheme::VerificationKey,

/// The signing (private) key. Only used for signing the authentication
/// message sent to the server upon connection.
pub signing_key: SignatureScheme::SigningKey,

/// The remote address(es) to connect and authenticate to.
pub remote_address: String,

/// The topics we want to be subscribed to initially. This is needed so that
/// we can resubscribe to the same topics upon reconnection.
pub initial_subscribed_topics: Vec<Topic>,

/// The authentication flow that is followed when connecting. Uses the `SocketAddr`
/// as the endpoint for the connection. Takes the signing and verification keys
/// in case they are needed.
auth_flow: fn(
SocketAddr,
Connection,
SignatureScheme::SigningKey,
SignatureScheme::VerificationKey,
) -> Connection,
}

impl<SignatureScheme: JfSignatureScheme, Connection: ProtoConnection>
Sticky<SignatureScheme, Connection>
{
/// Creates a new `Sticky` connection from a `Config` and an (optional) pre-existing
/// `Fallible` connection.
///
/// This allows us to create elastic clients that always try to maintain a connection
/// with each other.
///
/// # Errors
/// Errors if we are unable to either parse or bind an endpoint to the local address.
pub fn from_config_and_connection(
config: Config<SignatureScheme, Connection>,
connection: OnceCell<Connection>,
) -> Result<Self> {
// Extrapolate values from the underlying client configuration
let Config {
auth_flow,
verification_key,
signing_key,
remote_address,
initial_subscribed_topics,
} = config;

// Return the slightly transformed connection.
Ok(Self {
inner: Arc::from(StickyInner {
remote_address,
subscribed_topics: Mutex::from(HashSet::from_iter(initial_subscribed_topics)),
// Use the existing connection
connection: RwLock::from(connection),
signing_key,
verification_key,
auth_flow,
}),
})
}
}
Loading

0 comments on commit 86a1702

Please sign in to comment.