Skip to content

Commit

Permalink
Merge pull request #56 from EspressoSystems/middleware-to-limiter
Browse files Browse the repository at this point in the history
`middleware` -> `limiter`
  • Loading branch information
rob-maron authored Aug 31, 2024
2 parents b1dbd28 + 53ce4d2 commit 7da3c1c
Show file tree
Hide file tree
Showing 17 changed files with 73 additions and 78 deletions.
13 changes: 7 additions & 6 deletions cdn-broker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::{
mod metrics;
use cdn_proto::{
bail,
connection::{middleware::Middleware, protocols::Protocol as _},
connection::{limiter::Limiter, protocols::Protocol as _},
crypto::tls::{generate_cert_from_ca, load_ca},
def::{Listener, Protocol, RunDef, Scheme},
discovery::{BrokerIdentifier, DiscoveryClient},
Expand Down Expand Up @@ -60,6 +60,7 @@ pub struct Config<R: RunDef> {
/// The discovery endpoint. We use this to maintain consistency between brokers and marshals.
pub discovery_endpoint: String,

/// The underlying (public) verification key, used to authenticate with other brokers.
pub keypair: KeyPair<Scheme<R::Broker>>,

/// An optional TLS CA cert path. If not specified, will use the local one.
Expand Down Expand Up @@ -90,8 +91,8 @@ struct Inner<R: RunDef> {
/// state or send messages.
connections: Arc<RwLock<Connections>>,

/// The shared middleware that we use for all connections.
middleware: Middleware,
/// The shared limiter that we use for all connections.
limiter: Limiter,
}

/// The main `Broker` struct. We instantiate this when we want to run a broker.
Expand Down Expand Up @@ -221,8 +222,8 @@ impl<R: RunDef> Broker<R> {
})
.transpose()?;

// Create the globally shared middleware
let middleware = Middleware::new(global_memory_pool_size, None);
// Create the globally shared limiter
let limiter = Limiter::new(global_memory_pool_size, None);

// Create and return `Self` as wrapping an `Inner` (with things that we need to share)
Ok(Self {
Expand All @@ -231,7 +232,7 @@ impl<R: RunDef> Broker<R> {
identity: identity.clone(),
keypair,
connections: Arc::from(RwLock::from(Connections::new(identity))),
middleware,
limiter,
}),
metrics_bind_endpoint,
user_listener,
Expand Down
2 changes: 1 addition & 1 deletion cdn-broker/src/tasks/broker/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl<Def: RunDef> Inner<Def> {
let connection =
// Our TCP protocol is unsecured, so the cert we use does not matter.
// Time out is at protocol level
match Protocol::<Def::Broker>::connect(&to_connect_endpoint, true, inner.middleware.clone()).await
match Protocol::<Def::Broker>::connect(&to_connect_endpoint, true, inner.limiter.clone()).await
{
Ok(connection) => connection,
Err(err) => {
Expand Down
4 changes: 1 addition & 3 deletions cdn-broker/src/tasks/broker/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ impl<Def: RunDef> Inner<Def> {
let inner = self.clone();
spawn(async move {
// Finalize the connection
let Ok(connection) = unfinalized_connection
.finalize(inner.middleware.clone())
.await
let Ok(connection) = unfinalized_connection.finalize(inner.limiter.clone()).await
else {
return;
};
Expand Down
4 changes: 1 addition & 3 deletions cdn-broker/src/tasks/user/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ impl<Def: RunDef> Inner<Def> {
let inner = self.clone();
spawn(async move {
// Finalize the connection
let Ok(connection) = unfinalized_connection
.finalize(inner.middleware.clone())
.await
let Ok(connection) = unfinalized_connection.finalize(inner.limiter.clone()).await
else {
return;
};
Expand Down
6 changes: 3 additions & 3 deletions cdn-broker/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::sync::Arc;

use cdn_proto::{
connection::{
middleware::Middleware,
limiter::Limiter,
protocols::{Connection, Listener, Protocol, UnfinalizedConnection},
UserPublicKey,
},
Expand Down Expand Up @@ -190,14 +190,14 @@ async fn gen_connection_pairs<P: Protocol>(num: usize) -> Vec<(Connection, Conne
// Spawn a task to connect the user to the broker
let bind_endpoint_ = bind_endpoint.clone();
let unfinalized_outgoing_connection =
spawn(async move { P::connect(&bind_endpoint_, true, Middleware::none()).await });
spawn(async move { P::connect(&bind_endpoint_, true, Limiter::none()).await });

// Accept the connection from the user
let incoming_connection = listener
.accept()
.await
.expect("failed to accept connection")
.finalize(Middleware::none())
.finalize(Limiter::none())
.await
.expect("failed to finalize connection");

Expand Down
11 changes: 5 additions & 6 deletions cdn-client/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::{collections::HashSet, sync::Arc, time::Duration};
use cdn_proto::{
connection::{
auth::user::UserAuth,
middleware::Middleware,
limiter::Limiter,
protocols::{Connection, Protocol as _},
},
crypto::signature::KeyPair,
Expand Down Expand Up @@ -80,13 +80,12 @@ impl<C: ConnectionDef> Inner<C> {
/// - If the connection failed
/// - If authentication failed
async fn connect(self: &Arc<Self>) -> Result<Connection> {
// Create the middleware we will use for all connections
let middleware = Middleware::new(None, Some(1));
// Create the limiter we will use for all connections
let limiter = Limiter::new(None, Some(1));

// Make the connection to the marshal
let connection = bail!(
Protocol::<C>::connect(&self.endpoint, self.use_local_authority, middleware.clone())
.await,
Protocol::<C>::connect(&self.endpoint, self.use_local_authority, limiter.clone()).await,
Connection,
"failed to connect to endpoint"
);
Expand All @@ -100,7 +99,7 @@ impl<C: ConnectionDef> Inner<C> {

// Make the connection to the broker
let connection = bail!(
Protocol::<C>::connect(&broker_endpoint, self.use_local_authority, middleware).await,
Protocol::<C>::connect(&broker_endpoint, self.use_local_authority, limiter).await,
Connection,
"failed to connect to broker"
);
Expand Down
16 changes: 8 additions & 8 deletions cdn-marshal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ mod handlers;
use cdn_proto::{
bail,
connection::{
middleware::Middleware,
limiter::Limiter,
protocols::{Listener as _, Protocol as _, UnfinalizedConnection},
},
crypto::tls::{generate_cert_from_ca, load_ca},
Expand Down Expand Up @@ -73,8 +73,8 @@ pub struct Marshal<R: RunDef> {
/// metrics are not exposed.
metrics_bind_endpoint: Option<SocketAddr>,

// The middleware to use for the connection
middleware: Middleware,
// The limiter to use for the connection
limiter: Limiter,
}

impl<R: RunDef> Marshal<R> {
Expand Down Expand Up @@ -131,15 +131,15 @@ impl<R: RunDef> Marshal<R> {
})
.transpose()?;

// Create the middleware
let middleware = Middleware::new(global_memory_pool_size, None);
// Create the limiter
let limiter = Limiter::new(global_memory_pool_size, None);

// Create `Self` from the `Listener`
Ok(Self {
listener: Arc::from(listener),
metrics_bind_endpoint,
discovery_client,
middleware,
limiter,
})
}

Expand All @@ -165,10 +165,10 @@ impl<R: RunDef> Marshal<R> {

// Create a task to handle the connection
let discovery_client = self.discovery_client.clone();
let middleware = self.middleware.clone();
let limiter = self.limiter.clone();
spawn(async move {
// Finalize the connection
let Ok(connection) = unfinalized_connection.finalize(middleware).await else {
let Ok(connection) = unfinalized_connection.finalize(limiter).await else {
return;
};

Expand Down
6 changes: 3 additions & 3 deletions cdn-proto/benches/protocols.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

use cdn_proto::{
connection::{
middleware::Middleware,
limiter::Limiter,
protocols::{quic::Quic, tcp::Tcp, Connection, Listener, Protocol, UnfinalizedConnection},
Bytes,
},
Expand Down Expand Up @@ -70,13 +70,13 @@ fn set_up_bench<Proto: Protocol>(message_size: usize) -> (Runtime, Connection, C

// Finalize the connection
unfinalized_connection
.finalize(Middleware::none())
.finalize(Limiter::none())
.await
.expect("failed to finalize connection")
});

// Attempt to connect
let conn1 = Proto::connect(&format!("127.0.0.1:{port}"), true, Middleware::none())
let conn1 = Proto::connect(&format!("127.0.0.1:{port}"), true, Limiter::none())
.await
.expect("failed to connect to listener");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@ use self::pool::AllocationPermit;

pub mod pool;

/// Shared middleware for all connections.
/// Shared limiter for all connections.
#[derive(Clone)]
pub struct Middleware {
pub struct Limiter {
/// The global memory pool to check with before allocating.
global_memory_pool: Option<MemoryPool>,

/// Per connection, the size of the channel buffer.
connection_message_pool_size: Option<usize>,
}

impl Middleware {
/// Create a new middleware with a global memory pool of `global_memory_pool_size` bytes
impl Limiter {
/// Create a new limiter with a global memory pool of `global_memory_pool_size` bytes
/// and a connection message pool size of `connection_message_pool_size` bytes.
///
/// If the global memory pool is not set, it will not be used.
Expand All @@ -37,11 +37,11 @@ impl Middleware {
}
}

/// Create a new middleware with no global memory pool and no connection message pool size.
/// Create a new limiter with no global memory pool and no connection message pool size.
/// This means an unbounded channel will be used for connections and no global memory pool
/// will be checked.
pub const fn none() -> Self {
// Create a new middleware with no global memory pool and no connection message pool size.
// Create a new limiter with no global memory pool and no connection message pool size.
Self {
global_memory_pool: None,
connection_message_pool_size: None,
Expand Down
File renamed without changes.
4 changes: 2 additions & 2 deletions cdn-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
use std::sync::Arc;

pub mod auth;
pub mod middleware;
pub mod limiter;
pub mod protocols;

use self::middleware::pool::Allocation;
use self::limiter::pool::Allocation;

/// Some type aliases to help with readability
pub type Bytes = Allocation<Vec<u8>>;
Expand Down
13 changes: 6 additions & 7 deletions cdn-proto/src/connection/protocols/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use tokio::{
use super::{Connection, Listener, Protocol, SoftClose, UnfinalizedConnection};
use crate::{
bail,
connection::middleware::Middleware,
connection::limiter::Limiter,
error::{Error, Result},
};

Expand All @@ -47,7 +47,7 @@ impl Protocol for Memory {
async fn connect(
remote_endpoint: &str,
_use_local_authority: bool,
middleware: Middleware,
limiter: Limiter,
) -> Result<Connection> {
// If the peer is not listening, return an error
// Get or initialize the channels as a static value
Expand Down Expand Up @@ -78,7 +78,7 @@ impl Protocol for Memory {
);

// Convert the streams into a `Connection`
let connection = Connection::from_streams(send_to_them, receive_from_them, middleware);
let connection = Connection::from_streams(send_to_them, receive_from_them, limiter);

// Return our connection
Ok(connection)
Expand Down Expand Up @@ -121,10 +121,9 @@ pub struct UnfinalizedMemoryConnection {
#[async_trait]
impl UnfinalizedConnection for UnfinalizedMemoryConnection {
/// Prepares the `MemoryConnection` for usage by `Arc()ing` things.
async fn finalize(self, middleware: Middleware) -> Result<Connection> {
async fn finalize(self, limiter: Limiter) -> Result<Connection> {
// Convert the streams into a `Connection`
let connection =
Connection::from_streams(self.send_stream, self.receive_stream, middleware);
let connection = Connection::from_streams(self.send_stream, self.receive_stream, limiter);

// Return our connection
Ok(connection)
Expand Down Expand Up @@ -196,7 +195,7 @@ impl Memory {
let (sender, receiver) = duplex(8192);

// Convert the streams into a `Connection`
Connection::from_streams(sender, receiver, Middleware::none())
Connection::from_streams(sender, receiver, Limiter::none())
}
}

Expand Down
Loading

0 comments on commit 7da3c1c

Please sign in to comment.