Skip to content

Commit

Permalink
introduce message hooking
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-maron committed Sep 4, 2024
1 parent 7da3c1c commit 42461b0
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 9 deletions.
9 changes: 8 additions & 1 deletion cdn-broker/src/binaries/bad-broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@
use std::time::Duration;

use cdn_broker::{Broker, Config};
use cdn_proto::{crypto::signature::KeyPair, def::ProductionRunDef, error::Result};
use cdn_proto::{
crypto::signature::KeyPair,
def::{NoMessageHook, ProductionRunDef},
error::Result,
};
use clap::Parser;
use jf_signature::{bls_over_bn254::BLSOverBN254CurveSignatureScheme as BLS, SignatureScheme};
use rand::{rngs::StdRng, SeedableRng};
Expand Down Expand Up @@ -76,6 +80,9 @@ async fn main() -> Result<()> {
private_bind_endpoint: format!("0.0.0.0:{private_port}"),
private_advertise_endpoint: format!("local_ip:{private_port}"),
global_memory_pool_size: None,

user_message_hook: NoMessageHook,
broker_message_hook: NoMessageHook,
};

// Create new `Broker`
Expand Down
9 changes: 8 additions & 1 deletion cdn-broker/src/binaries/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
//! The following is the main `Broker` binary, which just instantiates and runs
//! a `Broker` object.
use cdn_broker::{Broker, Config};
use cdn_proto::{crypto::signature::KeyPair, def::ProductionRunDef, error::Result};
use cdn_proto::{
crypto::signature::KeyPair,
def::{NoMessageHook, ProductionRunDef},
error::Result,
};
use clap::Parser;
use jf_signature::{bls_over_bn254::BLSOverBN254CurveSignatureScheme as BLS, SignatureScheme};
use rand::{rngs::StdRng, SeedableRng};
Expand Down Expand Up @@ -111,6 +115,9 @@ async fn main() -> Result<()> {
private_bind_endpoint: args.private_bind_endpoint,
private_advertise_endpoint: args.private_advertise_endpoint,
global_memory_pool_size: Some(args.global_memory_pool_size),

user_message_hook: NoMessageHook,
broker_message_hook: NoMessageHook,
};

// Create new `Broker`
Expand Down
19 changes: 18 additions & 1 deletion cdn-broker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use cdn_proto::{
bail,
connection::{limiter::Limiter, protocols::Protocol as _},
crypto::tls::{generate_cert_from_ca, load_ca},
def::{Listener, Protocol, RunDef, Scheme},
def::{MessageHook, Listener, Protocol, RunDef, Scheme},
discovery::{BrokerIdentifier, DiscoveryClient},
error::{Error, Result},
util::AbortOnDropHandle,
Expand Down Expand Up @@ -74,6 +74,12 @@ pub struct Config<R: RunDef> {
/// tries to allocate more than this amount until some memory is freed.
/// Default is 1GB.
pub global_memory_pool_size: Option<usize>,

/// The hook we use when receiving incoming messages from users
pub user_message_hook: MessageHook<R::User>,

/// The hook we use when receiving incoming messages from brokers
pub broker_message_hook: MessageHook<R::Broker>,
}

/// The broker `Inner` that we use to share common data between broker tasks.
Expand All @@ -93,6 +99,12 @@ struct Inner<R: RunDef> {

/// The shared limiter that we use for all connections.
limiter: Limiter,

/// The hook we use when receiving incoming messages from users
user_message_hook: MessageHook<R::User>,

/// The hook we use when receiving incoming messages from brokers
broker_message_hook: MessageHook<R::Broker>,
}

/// The main `Broker` struct. We instantiate this when we want to run a broker.
Expand Down Expand Up @@ -136,6 +148,9 @@ impl<R: RunDef> Broker<R> {
ca_key_path,

global_memory_pool_size,

user_message_hook,
broker_message_hook,
} = config;

// Get the local IP address so we can replace in
Expand Down Expand Up @@ -233,6 +248,8 @@ impl<R: RunDef> Broker<R> {
keypair,
connections: Arc::from(RwLock::from(Connections::new(identity))),
limiter,
user_message_hook,
broker_message_hook,
}),
metrics_bind_endpoint,
user_listener,
Expand Down
12 changes: 11 additions & 1 deletion cdn-broker/src/tasks/broker/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{sync::Arc, time::Duration};
use cdn_proto::{
authenticate_with_broker, bail,
connection::{auth::broker::BrokerAuth, protocols::Connection, Bytes, UserPublicKey},
def::RunDef,
def::{MessageHookDef, RunDef},
discovery::BrokerIdentifier,
error::{Error, Result},
message::{Message, Topic},
Expand Down Expand Up @@ -123,13 +123,23 @@ impl<Def: RunDef> Inner<Def> {
broker_identifier: &BrokerIdentifier,
connection: Connection,
) -> Result<()> {
// Clone the hook
let local_message_hook = self.broker_message_hook.clone();

loop {
// Receive a message from the broker
let raw_message = connection.recv_message_raw().await?;

// Attempt to deserialize the message
let message = Message::deserialize(&raw_message)?;

// Call the hook for the broker
bail!(
local_message_hook.on_message_received(&message),
Connection,
"message hook returned error"
);

match message {
// If we receive a direct message from a broker, we want to send it to the user with that key
Message::Direct(ref direct) => {
Expand Down
13 changes: 12 additions & 1 deletion cdn-broker/src/tasks/user/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
use std::sync::Arc;
use std::time::Duration;

use cdn_proto::bail;
use cdn_proto::connection::{protocols::Connection, UserPublicKey};
use cdn_proto::def::{RunDef, Topic as _};
use cdn_proto::def::{MessageHookDef, RunDef, Topic as _};
use cdn_proto::error::{Error, Result};
use cdn_proto::util::mnemonic;
use cdn_proto::{connection::auth::broker::BrokerAuth, message::Message};
Expand Down Expand Up @@ -97,13 +98,23 @@ impl<Def: RunDef> Inner<Def> {
public_key: &UserPublicKey,
connection: Connection,
) -> Result<()> {
// Clone the hook
let local_message_hook = self.user_message_hook.clone();

loop {
// Receive a message from the user
let raw_message = connection.recv_message_raw().await?;

// Attempt to deserialize the message
let message = Message::deserialize(&raw_message)?;

// Call the hook for the user, returning on error
bail!(
local_message_hook.on_message_received(&message),
Connection,
"message hook returned error"
);

match message {
// If we get a direct message from a user, send it to both users and brokers.
Message::Direct(ref direct) => {
Expand Down
4 changes: 3 additions & 1 deletion cdn-broker/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use cdn_proto::{
signature::KeyPair,
tls::{generate_cert_from_ca, LOCAL_CA_CERT, LOCAL_CA_KEY},
},
def::TestingRunDef,
def::{NoMessageHook, TestingRunDef},
discovery::BrokerIdentifier,
message::{Message, Topic},
};
Expand Down Expand Up @@ -240,6 +240,8 @@ async fn new_broker_under_test<B: Protocol, U: Protocol>() -> Broker<TestingRunD
global_memory_pool_size: None,
ca_cert_path: None,
ca_key_path: None,
user_message_hook: NoMessageHook,
broker_message_hook: NoMessageHook,
};

// Create and return the broker
Expand Down
2 changes: 1 addition & 1 deletion cdn-proto/src/connection/auth/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ impl<R: RunDef> BrokerAuth<R> {
let public_key_bytes = bail!(
keypair.public_key.serialize(),
Serialize,
"failed to serialize publi key"
"failed to serialize public key"
);

// We authenticate to the marshal with a key
Expand Down
26 changes: 25 additions & 1 deletion cdn-proto/src/def.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use crate::crypto::signature::SignatureScheme;
use crate::discovery::embedded::Embedded;
use crate::discovery::{redis::Redis, DiscoveryClient};
use crate::error::{Error, Result};
use crate::message::Message;
use anyhow::Result as AnyhowResult;

/// An implementation of `Topic` for testing purposes.
#[repr(u8)]
Expand Down Expand Up @@ -55,12 +57,29 @@ pub trait RunDef: 'static {
type Topic: Topic;
}

/// This trait defines the connection configuration for a single CDN component.
/// This trait defines the connection configuration for a single CDN component
pub trait ConnectionDef: 'static {
type Scheme: SignatureScheme;
type Protocol: ProtocolType;
type MessageHook: MessageHookDef;
}

/// This trait defines a hook that we use to perform additional actions on receiving a message
pub trait MessageHookDef: Send + Sync + 'static + Clone {
/// The hook that is called when a message is received. If the hook returns `true`, the message
/// will be processed as normal. If the hook returns `false`, the message will be ignored.
///
/// If the hook returns an error, the connection will be closed.
fn on_message_received(&self, _message: &Message) -> AnyhowResult<bool> {
Ok(true)
}
}

/// The no-op hook
#[derive(Clone)]
pub struct NoMessageHook;
impl MessageHookDef for NoMessageHook {}

/// The production run configuration.
/// Uses the real network protocols and Redis for discovery.
pub struct ProductionRunDef;
Expand All @@ -77,6 +96,7 @@ pub struct ProductionBrokerConnection;
impl ConnectionDef for ProductionBrokerConnection {
type Scheme = BLS;
type Protocol = Tcp;
type MessageHook = NoMessageHook;
}

/// The production user connection configuration.
Expand All @@ -85,6 +105,7 @@ pub struct ProductionUserConnection;
impl ConnectionDef for ProductionUserConnection {
type Scheme = BLS;
type Protocol = Quic;
type MessageHook = NoMessageHook;
}

/// The production client connection configuration.
Expand All @@ -95,6 +116,7 @@ pub struct ProductionClientConnection;
impl ConnectionDef for ProductionClientConnection {
type Scheme = Scheme<<ProductionRunDef as RunDef>::User>;
type Protocol = Protocol<<ProductionRunDef as RunDef>::User>;
type MessageHook = NoMessageHook;
}

/// The testing run configuration.
Expand All @@ -117,11 +139,13 @@ pub struct TestingConnection<P: ProtocolType> {
impl<P: ProtocolType> ConnectionDef for TestingConnection<P> {
type Scheme = BLS;
type Protocol = P;
type MessageHook = NoMessageHook;
}

// Type aliases to automatically disambiguate usage
pub type Scheme<A> = <A as ConnectionDef>::Scheme;
pub type PublicKey<A> = <Scheme<A> as SignatureScheme>::PublicKey;
pub type MessageHook<A> = <A as ConnectionDef>::MessageHook;

// Type aliases to automatically disambiguate usage
pub type Protocol<A> = <A as ConnectionDef>::Protocol;
Expand Down
4 changes: 3 additions & 1 deletion tests/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use cdn_marshal::{Config as MarshalConfig, Marshal};
use cdn_proto::{
connection::protocols::memory::Memory,
crypto::signature::{KeyPair, Serializable, SignatureScheme},
def::{TestingConnection, TestingRunDef},
def::{NoMessageHook, TestingConnection, TestingRunDef},
discovery::{embedded::Embedded, BrokerIdentifier, DiscoveryClient},
message::Topic,
};
Expand Down Expand Up @@ -78,6 +78,8 @@ async fn new_broker(key: u64, public_ep: &str, private_ep: &str, discovery_ep: &
public_advertise_endpoint: public_ep.to_string(),
public_bind_endpoint: public_ep.to_string(),
global_memory_pool_size: None,
user_message_hook: NoMessageHook,
broker_message_hook: NoMessageHook,
};

// Create broker
Expand Down

0 comments on commit 42461b0

Please sign in to comment.