Skip to content

Commit

Permalink
generic over connection method
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-maron committed Feb 1, 2024
1 parent d47ea3d commit 4d217af
Show file tree
Hide file tree
Showing 12 changed files with 203 additions and 126 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ tokio = { version = "1.35.1", features = ["full"] }
jf-primitives.git = "https://github.com/EspressoSystems/jellyfish.git"
rand = "0.8.5"
ark-serialize = "0.4.2"
tracing = "0.1.40"
77 changes: 34 additions & 43 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,34 @@ use ark_serialize::{CanonicalDeserialize, CanonicalSerialize};
use jf_primitives::signatures::SignatureScheme as JfSignatureScheme;
use proto::{
bail,
connection::{sticky::Sticky, Connection as ProtoConnection},
connection::{
flow::Flow,
sticky::{self, Sticky},
Connection,
},
crypto,
error::Error,
error::Result,
message::{Broadcast, Direct, Message, Subscribe, Topic, Unsubscribe},
};
use tokio::sync::OnceCell;

/// `Client` is a light wrapper around a `Sticky` connection that provides functions
/// for common operations to and from a server. Mostly just used to make the API
/// more ergonomic.
pub struct Client<SignatureScheme: JfSignatureScheme, Connection: ProtoConnection>(
Sticky<SignatureScheme, Connection>,
);
pub struct Client<
SignatureScheme: JfSignatureScheme<PublicParameter = (), MessageUnit = u8>,
ConnectionType: Connection,
ConnectionFlow: Flow<SignatureScheme, ConnectionType>,
>(Sticky<SignatureScheme, ConnectionType, ConnectionFlow>);

pub type Config<SignatureScheme, Connection> =
proto::connection::sticky::Config<SignatureScheme, Connection>;
pub type Config<SignatureScheme, ConnectionType, ConnectionFlow> =
sticky::Config<SignatureScheme, ConnectionType, ConnectionFlow>;

impl<SignatureScheme: JfSignatureScheme, Connection: ProtoConnection>
Client<SignatureScheme, Connection>
impl<
SignatureScheme: JfSignatureScheme<PublicParameter = (), MessageUnit = u8>,
ConnectionType: Connection,
ConnectionFlow: Flow<SignatureScheme, ConnectionType>,
> Client<SignatureScheme, ConnectionType, ConnectionFlow>
where
SignatureScheme::Signature: CanonicalSerialize + CanonicalDeserialize,
SignatureScheme::VerificationKey: CanonicalSerialize + CanonicalDeserialize,
Expand All @@ -39,23 +47,25 @@ where
/// # Errors
/// Errors if the downstream `Sticky` object was unable to be made.
/// This usually happens when we can't bind to the specified endpoint.
pub fn new(config: Config<SignatureScheme, Connection>) -> Result<Self> {
Self::new_with_connection(config, OnceCell::new())
pub async fn new(
config: Config<SignatureScheme, ConnectionType, ConnectionFlow>,
) -> Result<Self> {
Self::new_with_connection(config, Option::None).await
}

/// Creates a new client from the given `Config` and an optional `Connection`.
/// Proxies the config to the `Sticky` constructor since a `Client` is just a
/// light wrapper.
///
/// # Errors
/// Errors if the downstream `Sticky` object was unable to be made.
/// Errors if the downstream `Sticky` object was unable to be created.
/// This usually happens when we can't bind to the specified endpoint.
pub fn new_with_connection(
config: Config<SignatureScheme, Connection>,
connection: OnceCell<Connection>,
pub async fn new_with_connection(
config: Config<SignatureScheme, ConnectionType, ConnectionFlow>,
connection: Option<ConnectionType>,
) -> Result<Self> {
Ok(Client(bail!(
Sticky::from_config_and_connection(config, connection),
Sticky::from_config_and_connection(config, connection).await,
Connection,
"failed to create client"
)))
Expand All @@ -74,7 +84,7 @@ where
}

/// Sends a pre-serialized message to the server, denoting recipients in the form
/// of a vector of topics. Use `send_formed_message` when the message is already
/// of a vector of topics. Use `send_message_raw` when the message is already
/// formed. If it fails, we return an error but try to initiate a new connection
/// in the background.
///
Expand All @@ -83,18 +93,12 @@ where
pub async fn send_broadcast_message(&self, topics: Vec<Topic>, message: Vec<u8>) -> Result<()> {
// TODO: conditionally match error on whether deserialization OR the connection failed
// Form and send the single message
bail!(
self.send_formed_message(Arc::from(Message::Broadcast(Broadcast { topics, message })))
.await,
Connection,
"failed to send message"
);

Ok(())
}

/// Sends a pre-serialized message to the server, denoting interest in delivery
/// to a single recipient. Use `send_formed_message` when the message is already formed.
/// to a single recipient. Use `send_message_raw` when the message is already formed.
///
/// # Errors
/// If the connection or serialization has failed
Expand All @@ -111,13 +115,6 @@ where
"failed to serialize recipient"
);

// Send the message with the serialized recipient
self.send_formed_message(Arc::from(Message::Direct(Direct {
recipient: recipient_bytes,
message,
})))
.await;

Ok(())
}

Expand All @@ -126,28 +123,22 @@ where
///
/// # Errors
/// If the connection or serialization has failed
pub async fn subscribe(&self, topics: Vec<Topic>) {
self.send_formed_message(Arc::from(Message::Subscribe(Subscribe { topics })))
.await;
pub async fn subscribe(&self, topics: Vec<Topic>) -> Result<()> {
todo!()
}

/// Sends a message to the server that asserts that this client is no longer
/// interested in a specific topic
///
/// # Errors
/// If the connection or serialization has failed
pub async fn unsubscribe(&self, topics: Vec<Topic>) {
self.send_formed_message(Arc::from(Message::Unsubscribe(Unsubscribe { topics })))
.await;
pub async fn unsubscribe(&self, topics: Vec<Topic>) -> Result<()> {
todo!()
}

/// Sends a pre-formed message over the wire. Various functions make use
/// of this one downstream.
///
/// TODO: make this generic over a borrowed message so we can pass in either
/// a reference or an Arc to the object itself
pub async fn send_formed_message(&self, message: Arc<Message>) -> Result<()> {
// self.0.send_message(message).await;
todo!()
pub async fn send_message_raw<M: AsRef<Message>>(&self, message: M) -> Result<()> {
self.0.send_message(message).await
}
}
3 changes: 2 additions & 1 deletion proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ jf-primitives.workspace = true
ark-serialize.workspace = true
rand.workspace = true
parking_lot = "0.12.1"
url = "2.5.0"
url = "2.5.0"
tracing.workspace = true
5 changes: 3 additions & 2 deletions proto/schema/messages.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ struct AuthenticateWithPermit {
struct AuthenticateResponse {
# The permit. Sent from marshals to clients to verify authentication.
permit @0: UInt64;
# The reason authentication was unsuccessful, if applicable
reason @1: Text;
# The message context. Is an error reason if failed, or the endpoint
# address if successful.
context @1: Text;
}

# This message is a direct message. It is sent by a client, used to deliver a
Expand Down
3 changes: 0 additions & 3 deletions proto/src/connection/auth.rs

This file was deleted.

4 changes: 2 additions & 2 deletions proto/src/connection/fallible/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl Connection for Fallible {
/// # Errors
/// Errors if we either failed to open the stream or send the message over that stream.
/// This usually means a connection problem.
async fn send_message(&self, message: Arc<Message>) -> Result<()> {
async fn send_message<M: AsRef<Message>>(&self, message: M) -> Result<()> {
// Open the outgoing unidirectional stream
let mut stream = bail!(
self.0.open_uni().await,
Expand All @@ -95,7 +95,7 @@ impl Connection for Fallible {

// Serialize the message
let message_bytes = bail!(
message.serialize(),
message.as_ref().serialize(),
Serialize,
"failed to serialize message"
);
Expand Down
4 changes: 2 additions & 2 deletions proto/src/connection/fallible/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@ impl Connection for Fallible {
/// # Errors
/// Errors if we either failed to open the stream or send the message over that stream.
/// This usually means a connection problem.
async fn send_message(&self, message: Arc<Message>) -> Result<()> {
async fn send_message<M: AsRef<Message>>(&self, message: M) -> Result<()> {
// Lock the stream so we don't send message/message sizes interleaved
let mut sender_guard = self.sender.lock().await;

// Serialize the message
let serialized_message = bail!(
message.serialize(),
message.as_ref().serialize(),
Serialize,
"failed to serialize message"
);
Expand Down
102 changes: 102 additions & 0 deletions proto/src/connection/flow.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
//! This file defines the connection and authentication flows
//! to be used when connecting and reconnecting.

use std::time::{SystemTime, UNIX_EPOCH};

use ark_serialize::{CanonicalDeserialize, CanonicalSerialize};
use jf_primitives::signatures::SignatureScheme as JfSignatureScheme;

use crate::{
bail,
connection::Connection,
crypto::{self, DeterministicRng},
error::{Error, Result},
message::{AuthenticateWithKey, Message},
};

pub trait Flow<
SignatureScheme: JfSignatureScheme<PublicParameter = (), MessageUnit = u8>,
ConnectionType: Connection,
>
{
async fn connect(
endpoint: String,
signing_key: &SignatureScheme::SigningKey,
verification_key: &SignatureScheme::VerificationKey,
) -> Result<ConnectionType>;
}


pub struct ToMarshal {}

impl<
SignatureScheme: JfSignatureScheme<PublicParameter = (), MessageUnit = u8>,
ConnectionType: Connection,
> Flow<SignatureScheme, ConnectionType> for ToMarshal
where
SignatureScheme::Signature: CanonicalSerialize + CanonicalDeserialize,
SignatureScheme::VerificationKey: CanonicalSerialize + CanonicalDeserialize,
SignatureScheme::SigningKey: CanonicalSerialize + CanonicalDeserialize,
{
async fn connect(
endpoint: String,
signing_key: &SignatureScheme::SigningKey,
verification_key: &SignatureScheme::VerificationKey,
) -> Result<ConnectionType> {
let connection = bail!(
ConnectionType::connect(endpoint).await,
Connection,
"failed to connect to remote"
);

// Get the current timestamp, which we sign to avoid replay attacks
let timestamp = bail!(
SystemTime::now().duration_since(UNIX_EPOCH),
Parse,
"failed to get timestamp: time went backwards"
)
.as_secs();

// Sign the timestamp from above
let signature = bail!(
SignatureScheme::sign(
&(),
&signing_key,
timestamp.to_le_bytes(),
&mut DeterministicRng(0),
),
Crypto,
"failed to sign message"
);

// Serialize the verify key
let verification_key_bytes = bail!(
crypto::serialize(verification_key),
Serialize,
"failed to serialize verification key"
);

// Serialize the signature
let signature_bytes = bail!(
crypto::serialize(&signature),
Serialize,
"failed to serialize signature"
);

let message = Message::AuthenticateWithKey(AuthenticateWithKey {
timestamp,
verification_key: verification_key_bytes,
signature: signature_bytes,
});

// Create and send the authentication message from the above operations
// bail!(
// connection.send_message(message).await,
// Connection,
// "failed to send message"
// );

// Ok(connection)
todo!()
}
}
3 changes: 2 additions & 1 deletion proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;
use crate::{error::Result, message::Message};

pub mod fallible;
pub mod flow;
pub mod sticky;

pub trait Connection {
Expand All @@ -16,7 +17,7 @@ pub trait Connection {
///
/// # Errors
/// Errors if we fail to deliver the message. This usually means a connection problem.
async fn send_message(&self, message: Arc<Message>) -> Result<()>;
async fn send_message<M: AsRef<Message>>(&self, message: M) -> Result<()>;

/// Connect to a remote address, returning an instance of `Self`.
///
Expand Down
Loading

0 comments on commit 4d217af

Please sign in to comment.