From 4d217afd74b81e581fd056bb8a207197193ddb77 Mon Sep 17 00:00:00 2001 From: Rob Date: Thu, 1 Feb 2024 13:16:40 -0500 Subject: [PATCH] generic over connection method --- Cargo.lock | 1 + Cargo.toml | 1 + client/src/lib.rs | 77 +++++++++---------- proto/Cargo.toml | 3 +- proto/schema/messages.capnp | 5 +- proto/src/connection/auth.rs | 3 - proto/src/connection/fallible/quic.rs | 4 +- proto/src/connection/fallible/tcp.rs | 4 +- proto/src/connection/flow.rs | 102 +++++++++++++++++++++++++ proto/src/connection/mod.rs | 3 +- proto/src/connection/sticky.rs | 103 +++++++++++--------------- proto/src/message.rs | 23 +++--- 12 files changed, 203 insertions(+), 126 deletions(-) delete mode 100644 proto/src/connection/auth.rs create mode 100644 proto/src/connection/flow.rs diff --git a/Cargo.lock b/Cargo.lock index 1e489e9..012bb2a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1081,6 +1081,7 @@ dependencies = [ "rand", "thiserror", "tokio", + "tracing", "url", ] diff --git a/Cargo.toml b/Cargo.toml index 71461e7..cd58957 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,3 +9,4 @@ tokio = { version = "1.35.1", features = ["full"] } jf-primitives.git = "https://github.com/EspressoSystems/jellyfish.git" rand = "0.8.5" ark-serialize = "0.4.2" +tracing = "0.1.40" \ No newline at end of file diff --git a/client/src/lib.rs b/client/src/lib.rs index f982555..a10a16d 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -7,26 +7,34 @@ use ark_serialize::{CanonicalDeserialize, CanonicalSerialize}; use jf_primitives::signatures::SignatureScheme as JfSignatureScheme; use proto::{ bail, - connection::{sticky::Sticky, Connection as ProtoConnection}, + connection::{ + flow::Flow, + sticky::{self, Sticky}, + Connection, + }, crypto, error::Error, error::Result, message::{Broadcast, Direct, Message, Subscribe, Topic, Unsubscribe}, }; -use tokio::sync::OnceCell; /// `Client` is a light wrapper around a `Sticky` connection that provides functions /// for common operations to and from a server. Mostly just used to make the API /// more ergonomic. -pub struct Client( - Sticky, -); +pub struct Client< + SignatureScheme: JfSignatureScheme, + ConnectionType: Connection, + ConnectionFlow: Flow, +>(Sticky); -pub type Config = - proto::connection::sticky::Config; +pub type Config = + sticky::Config; -impl - Client +impl< + SignatureScheme: JfSignatureScheme, + ConnectionType: Connection, + ConnectionFlow: Flow, + > Client where SignatureScheme::Signature: CanonicalSerialize + CanonicalDeserialize, SignatureScheme::VerificationKey: CanonicalSerialize + CanonicalDeserialize, @@ -39,8 +47,10 @@ where /// # Errors /// Errors if the downstream `Sticky` object was unable to be made. /// This usually happens when we can't bind to the specified endpoint. - pub fn new(config: Config) -> Result { - Self::new_with_connection(config, OnceCell::new()) + pub async fn new( + config: Config, + ) -> Result { + Self::new_with_connection(config, Option::None).await } /// Creates a new client from the given `Config` and an optional `Connection`. @@ -48,14 +58,14 @@ where /// light wrapper. /// /// # Errors - /// Errors if the downstream `Sticky` object was unable to be made. + /// Errors if the downstream `Sticky` object was unable to be created. /// This usually happens when we can't bind to the specified endpoint. - pub fn new_with_connection( - config: Config, - connection: OnceCell, + pub async fn new_with_connection( + config: Config, + connection: Option, ) -> Result { Ok(Client(bail!( - Sticky::from_config_and_connection(config, connection), + Sticky::from_config_and_connection(config, connection).await, Connection, "failed to create client" ))) @@ -74,7 +84,7 @@ where } /// Sends a pre-serialized message to the server, denoting recipients in the form - /// of a vector of topics. Use `send_formed_message` when the message is already + /// of a vector of topics. Use `send_message_raw` when the message is already /// formed. If it fails, we return an error but try to initiate a new connection /// in the background. /// @@ -83,18 +93,12 @@ where pub async fn send_broadcast_message(&self, topics: Vec, message: Vec) -> Result<()> { // TODO: conditionally match error on whether deserialization OR the connection failed // Form and send the single message - bail!( - self.send_formed_message(Arc::from(Message::Broadcast(Broadcast { topics, message }))) - .await, - Connection, - "failed to send message" - ); Ok(()) } /// Sends a pre-serialized message to the server, denoting interest in delivery - /// to a single recipient. Use `send_formed_message` when the message is already formed. + /// to a single recipient. Use `send_message_raw` when the message is already formed. /// /// # Errors /// If the connection or serialization has failed @@ -111,13 +115,6 @@ where "failed to serialize recipient" ); - // Send the message with the serialized recipient - self.send_formed_message(Arc::from(Message::Direct(Direct { - recipient: recipient_bytes, - message, - }))) - .await; - Ok(()) } @@ -126,9 +123,8 @@ where /// /// # Errors /// If the connection or serialization has failed - pub async fn subscribe(&self, topics: Vec) { - self.send_formed_message(Arc::from(Message::Subscribe(Subscribe { topics }))) - .await; + pub async fn subscribe(&self, topics: Vec) -> Result<()> { + todo!() } /// Sends a message to the server that asserts that this client is no longer @@ -136,18 +132,13 @@ where /// /// # Errors /// If the connection or serialization has failed - pub async fn unsubscribe(&self, topics: Vec) { - self.send_formed_message(Arc::from(Message::Unsubscribe(Unsubscribe { topics }))) - .await; + pub async fn unsubscribe(&self, topics: Vec) -> Result<()> { + todo!() } /// Sends a pre-formed message over the wire. Various functions make use /// of this one downstream. - /// - /// TODO: make this generic over a borrowed message so we can pass in either - /// a reference or an Arc to the object itself - pub async fn send_formed_message(&self, message: Arc) -> Result<()> { - // self.0.send_message(message).await; - todo!() + pub async fn send_message_raw>(&self, message: M) -> Result<()> { + self.0.send_message(message).await } } diff --git a/proto/Cargo.toml b/proto/Cargo.toml index c0b1ced..bd56d5b 100644 --- a/proto/Cargo.toml +++ b/proto/Cargo.toml @@ -16,4 +16,5 @@ jf-primitives.workspace = true ark-serialize.workspace = true rand.workspace = true parking_lot = "0.12.1" -url = "2.5.0" \ No newline at end of file +url = "2.5.0" +tracing.workspace = true \ No newline at end of file diff --git a/proto/schema/messages.capnp b/proto/schema/messages.capnp index 5cb81cd..116d4f9 100644 --- a/proto/schema/messages.capnp +++ b/proto/schema/messages.capnp @@ -59,8 +59,9 @@ struct AuthenticateWithPermit { struct AuthenticateResponse { # The permit. Sent from marshals to clients to verify authentication. permit @0: UInt64; - # The reason authentication was unsuccessful, if applicable - reason @1: Text; + # The message context. Is an error reason if failed, or the endpoint + # address if successful. + context @1: Text; } # This message is a direct message. It is sent by a client, used to deliver a diff --git a/proto/src/connection/auth.rs b/proto/src/connection/auth.rs deleted file mode 100644 index 7e75ba0..0000000 --- a/proto/src/connection/auth.rs +++ /dev/null @@ -1,3 +0,0 @@ -//! This module defines common authentication methods that we may want to use -//! as a client or as a broker. - diff --git a/proto/src/connection/fallible/quic.rs b/proto/src/connection/fallible/quic.rs index 952d41f..bc2698b 100644 --- a/proto/src/connection/fallible/quic.rs +++ b/proto/src/connection/fallible/quic.rs @@ -85,7 +85,7 @@ impl Connection for Fallible { /// # Errors /// Errors if we either failed to open the stream or send the message over that stream. /// This usually means a connection problem. - async fn send_message(&self, message: Arc) -> Result<()> { + async fn send_message>(&self, message: M) -> Result<()> { // Open the outgoing unidirectional stream let mut stream = bail!( self.0.open_uni().await, @@ -95,7 +95,7 @@ impl Connection for Fallible { // Serialize the message let message_bytes = bail!( - message.serialize(), + message.as_ref().serialize(), Serialize, "failed to serialize message" ); diff --git a/proto/src/connection/fallible/tcp.rs b/proto/src/connection/fallible/tcp.rs index 8a3fe8f..20e2ef9 100644 --- a/proto/src/connection/fallible/tcp.rs +++ b/proto/src/connection/fallible/tcp.rs @@ -76,13 +76,13 @@ impl Connection for Fallible { /// # Errors /// Errors if we either failed to open the stream or send the message over that stream. /// This usually means a connection problem. - async fn send_message(&self, message: Arc) -> Result<()> { + async fn send_message>(&self, message: M) -> Result<()> { // Lock the stream so we don't send message/message sizes interleaved let mut sender_guard = self.sender.lock().await; // Serialize the message let serialized_message = bail!( - message.serialize(), + message.as_ref().serialize(), Serialize, "failed to serialize message" ); diff --git a/proto/src/connection/flow.rs b/proto/src/connection/flow.rs new file mode 100644 index 0000000..4bc0cf5 --- /dev/null +++ b/proto/src/connection/flow.rs @@ -0,0 +1,102 @@ +//! This file defines the connection and authentication flows +//! to be used when connecting and reconnecting. + +use std::time::{SystemTime, UNIX_EPOCH}; + +use ark_serialize::{CanonicalDeserialize, CanonicalSerialize}; +use jf_primitives::signatures::SignatureScheme as JfSignatureScheme; + +use crate::{ + bail, + connection::Connection, + crypto::{self, DeterministicRng}, + error::{Error, Result}, + message::{AuthenticateWithKey, Message}, +}; + +pub trait Flow< + SignatureScheme: JfSignatureScheme, + ConnectionType: Connection, +> +{ + async fn connect( + endpoint: String, + signing_key: &SignatureScheme::SigningKey, + verification_key: &SignatureScheme::VerificationKey, + ) -> Result; +} + + +pub struct ToMarshal {} + +impl< + SignatureScheme: JfSignatureScheme, + ConnectionType: Connection, + > Flow for ToMarshal +where + SignatureScheme::Signature: CanonicalSerialize + CanonicalDeserialize, + SignatureScheme::VerificationKey: CanonicalSerialize + CanonicalDeserialize, + SignatureScheme::SigningKey: CanonicalSerialize + CanonicalDeserialize, +{ + async fn connect( + endpoint: String, + signing_key: &SignatureScheme::SigningKey, + verification_key: &SignatureScheme::VerificationKey, + ) -> Result { + let connection = bail!( + ConnectionType::connect(endpoint).await, + Connection, + "failed to connect to remote" + ); + + // Get the current timestamp, which we sign to avoid replay attacks + let timestamp = bail!( + SystemTime::now().duration_since(UNIX_EPOCH), + Parse, + "failed to get timestamp: time went backwards" + ) + .as_secs(); + + // Sign the timestamp from above + let signature = bail!( + SignatureScheme::sign( + &(), + &signing_key, + timestamp.to_le_bytes(), + &mut DeterministicRng(0), + ), + Crypto, + "failed to sign message" + ); + + // Serialize the verify key + let verification_key_bytes = bail!( + crypto::serialize(verification_key), + Serialize, + "failed to serialize verification key" + ); + + // Serialize the signature + let signature_bytes = bail!( + crypto::serialize(&signature), + Serialize, + "failed to serialize signature" + ); + + let message = Message::AuthenticateWithKey(AuthenticateWithKey { + timestamp, + verification_key: verification_key_bytes, + signature: signature_bytes, + }); + + // Create and send the authentication message from the above operations + // bail!( + // connection.send_message(message).await, + // Connection, + // "failed to send message" + // ); + + // Ok(connection) + todo!() + } +} diff --git a/proto/src/connection/mod.rs b/proto/src/connection/mod.rs index 5fd061e..ef0b264 100644 --- a/proto/src/connection/mod.rs +++ b/proto/src/connection/mod.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use crate::{error::Result, message::Message}; pub mod fallible; +pub mod flow; pub mod sticky; pub trait Connection { @@ -16,7 +17,7 @@ pub trait Connection { /// /// # Errors /// Errors if we fail to deliver the message. This usually means a connection problem. - async fn send_message(&self, message: Arc) -> Result<()>; + async fn send_message>(&self, message: M) -> Result<()>; /// Connect to a remote address, returning an instance of `Self`. /// diff --git a/proto/src/connection/sticky.rs b/proto/src/connection/sticky.rs index f0d365b..76f2a93 100644 --- a/proto/src/connection/sticky.rs +++ b/proto/src/connection/sticky.rs @@ -1,11 +1,11 @@ //! This file provides a `Sticky` connection, which allows for reconnections //! on top of a normal implementation of a `Fallible` connection. -use std::{collections::HashSet, net::SocketAddr, sync::Arc}; +use std::{collections::HashSet, marker::PhantomData, sync::Arc}; use jf_primitives::signatures::SignatureScheme as JfSignatureScheme; use parking_lot::Mutex; -use tokio::sync::{OnceCell, RwLock}; +use tokio::sync::RwLock; use crate::{ bail, @@ -13,7 +13,7 @@ use crate::{ message::{Message, Topic}, }; -use super::Connection as ProtoConnection; +use super::{flow::Flow, Connection}; /// `Sticky` is a wrapper around a `Fallible` connection. /// @@ -22,35 +22,33 @@ use super::Connection as ProtoConnection; /// /// Can be cloned to provide a handle to the same underlying elastic connection. #[derive(Clone)] -pub struct Sticky { - inner: Arc>, +pub struct Sticky< + SignatureScheme: JfSignatureScheme, + ConnectionType: Connection, + ConnectionFlow: Flow, +> { + inner: Arc>, } /// `StickyInner` is held exclusively by `Sticky`, wherein an `Arc` is used /// to facilitate interior mutability. -struct StickyInner { +struct StickyInner< + SignatureScheme: JfSignatureScheme, + ConnectionType: Connection, + ConnectionFlow: Flow, +> { /// This is the remote address that we authenticate to. It can either be a broker /// or a marshal. The authentication flow depends on the function defined in /// `auth_flow`, so this may not be the final address (e.g. if you are asking the /// marshal for it). remote_address: String, - /// The authentication flow that is followed when connecting. Uses the `SocketAddr` - /// as the endpoint for the connection. Takes the signing and verification keys - /// in case they are needed. - auth_flow: fn( - SocketAddr, - Connection, - SignatureScheme::SigningKey, - SignatureScheme::VerificationKey, - ) -> Connection, - /// A list of topics we are subscribed to. This allows us to, when reconnecting, /// easily provide the list of topics we care about. subscribed_topics: Mutex>, /// The underlying connection, which we modify to facilitate reconnections. - connection: RwLock>, + connection: RwLock, /// The underlying (public) verification key, used to authenticate with the server. Checked against the stake /// table. @@ -59,10 +57,16 @@ struct StickyInner, } /// The configuration needed to construct a client -pub struct Config { +pub struct Config< + SignatureScheme: JfSignatureScheme, + ConnectionType: Connection, + ConnectionFlow: Flow, +> { /// The verification (public) key. Sent to the server to verify /// our identity. pub verification_key: SignatureScheme::VerificationKey, @@ -78,19 +82,14 @@ pub struct Config, - /// The authentication flow that is followed when connecting. Uses the `SocketAddr` - /// as the endpoint for the connection. Takes the signing and verification keys - /// in case they are needed. - pub auth_flow: fn( - SocketAddr, - Connection, - SignatureScheme::SigningKey, - SignatureScheme::VerificationKey, - ) -> Connection, + _pd: PhantomData<(ConnectionType, ConnectionFlow)>, } -impl - Sticky +impl< + SignatureScheme: JfSignatureScheme, + ConnectionType: Connection, + ConnectionFlow: Flow, + > Sticky { /// Creates a new `Sticky` connection from a `Config` and an (optional) pre-existing /// `Fallible` connection. @@ -100,19 +99,25 @@ impl /// /// # Errors /// Errors if we are unable to either parse or bind an endpoint to the local address. - pub fn from_config_and_connection( - config: Config, - connection: OnceCell, + pub async fn from_config_and_connection( + config: Config, + maybe_connection: Option, ) -> Result { // Extrapolate values from the underlying client configuration let Config { - auth_flow, verification_key, signing_key, remote_address, initial_subscribed_topics, + _pd, } = config; + let connection = bail!( + ConnectionFlow::connect(remote_address.clone(), &signing_key, &verification_key).await, + Connection, + "failed to make initial connection" + ); + // Return the slightly transformed connection. Ok(Self { inner: Arc::from(StickyInner { @@ -122,7 +127,7 @@ impl connection: RwLock::from(connection), signing_key, verification_key, - auth_flow, + _pd, }), }) } @@ -130,33 +135,9 @@ impl /// Sends a message to the underlying fallible connection. Retry logic is handled in /// the macro `retry_on_error` where it is conditionally propagated /// to `wait_connect()`. - pub async fn send_message(&self, message: Arc) -> Result<()> { - // TODO: check if this makes sense to bail, or can we just return - // the downstream error - - // TODO: clean up clean up, possible macro !!!! - // we did this because there is a temporary borrowed value, but there HAS - // to be a better way. - // bail!( - // bail!( - // self.inner - // .connection - // .read() - // .await - // .get_or_try_init(|| async { - // Connection::connect(self.inner.remote_address.clone()).await - // }) - // .await, - // Connection, - // "failed to connect" - // ) - // .send_message(message) - // .await, - // Connection, - // "failed to send message" - // ); - - todo!(); + pub async fn send_message>(&self, message: M) -> Result<()> { + // Acquire the connection from the underlying `Option` + let read_guard = self.inner.connection.read().await; Ok(()) } diff --git a/proto/src/message.rs b/proto/src/message.rs index 147507e..bfefd15 100644 --- a/proto/src/message.rs +++ b/proto/src/message.rs @@ -119,7 +119,7 @@ impl Message { // Set each field message.set_permit(to_serialize.permit); - message.set_reason(to_serialize.reason.clone()); + message.set_context(to_serialize.context.clone()); } Self::Broadcast(to_serialize) => { @@ -232,7 +232,7 @@ impl Message { Self::AuthenticateResponse(AuthenticateResponse { permit: deserialize!(message.get_permit()), - reason: deserialize!(message.get_reason(), String), + context: deserialize!(message.get_context(), String), }) } messages_capnp::message::Direct(maybe_message) => { @@ -314,11 +314,11 @@ impl From for messages_capnp::Topic { #[derive(Eq, PartialEq)] pub struct AuthenticateWithKey { // The verification key, used downstream against the signed timestamp to verify the sender. - verification_key: Vec, + pub verification_key: Vec, // The timestamp, unsigned. This is signed by the client to prevent replay attacks. - timestamp: u64, + pub timestamp: u64, // The signature, which is the timestamp, but signed. - signature: Vec, + pub signature: Vec, } /// This message is used to authenticate the client to a server. It contains the permit @@ -326,18 +326,19 @@ pub struct AuthenticateWithKey { #[derive(Eq, PartialEq)] pub struct AuthenticateWithPermit { // The permit issued by the marshal, if applicable. - permit: u64, + pub permit: u64, } /// This message is sent to the client or broker upon authentication. It contains -/// if it was successful or not, the reason, and the permit, if applicable. +/// if it was successful or not, the context, and the permit, if applicable. #[derive(Eq, PartialEq)] pub struct AuthenticateResponse { // The permit. Sent from marshals to clients to verify authentication. Is `0` // if failed, `1` if successful, and neither if it is an actual permit. - permit: u64, - // The reason authentication was unsuccessful, if applicable - reason: String, + pub permit: u64, + // The message context. Is an error reason if failed, or the endpoint + // address if successful. + pub context: String, } /// This message is a direct message. It is sent by a client, used to deliver a @@ -412,7 +413,7 @@ mod test { // `AuthenticateResponse` message assert_serialize_deserialize!(Message::AuthenticateResponse(AuthenticateResponse { permit: 1234, - reason: "1234".to_string(), + context: "1234".to_string(), })); // `Direct` message