Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delegate sending, improve testing infrastructure #44

Merged
merged 33 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
caeea65
delegate sending
rob-maron Jun 4, 2024
7358e4b
clippy and fmt
rob-maron Jun 4, 2024
73b9bfb
close channel on drop
rob-maron Jun 5, 2024
46d5dd3
move channel closes
rob-maron Jun 5, 2024
25d44ce
Merge remote-tracking branch 'origin/master' into rm/delegated-sending
rob-maron Jun 5, 2024
db997e6
merge
rob-maron Jun 5, 2024
4ba4714
move out bidirectional open
rob-maron Jun 5, 2024
c07b3cd
single cancel for handlers
rob-maron Jun 5, 2024
93a6bb3
soft close
rob-maron Jun 6, 2024
2ca8f56
fmt
rob-maron Jun 6, 2024
55495ea
Merge remote-tracking branch 'origin/master' into rm/delegated-sending
rob-maron Jun 6, 2024
6a53bd2
trusted middleware for client connections
rob-maron Jun 6, 2024
e72666c
move middleware
rob-maron Jun 6, 2024
107a56b
clippy
rob-maron Jun 6, 2024
1075899
conditional dep for console
rob-maron Jun 6, 2024
9aa5370
change log to info
rob-maron Jun 6, 2024
07be574
package updates
rob-maron Jun 7, 2024
5b13eec
more benchmark macros
rob-maron Jun 7, 2024
4997175
minor updates to testing harness
rob-maron Jun 7, 2024
fe2fc82
Revert "minor updates to testing harness"
rob-maron Jun 7, 2024
2d7902d
vastly improve testing infra
rob-maron Jun 7, 2024
ff68ff2
add latency calculation
rob-maron Jun 10, 2024
ec31d16
update process compose
rob-maron Jun 10, 2024
c5069af
add metrics to second broker
rob-maron Jun 10, 2024
46321a5
add per-message latency calculation
rob-maron Jun 10, 2024
4ee374a
clippy
rob-maron Jun 10, 2024
d8206f8
implement tcp+tls
rob-maron Jun 10, 2024
18f7995
clippy lints
rob-maron Jun 10, 2024
21f2ee3
allow lint for test
rob-maron Jun 10, 2024
61900cb
fmt
rob-maron Jun 10, 2024
7b48e28
change prometheus label
rob-maron Jun 10, 2024
8e408f6
update dependencies
rob-maron Jun 10, 2024
07b67b6
only calculate latency if count difference != 0
rob-maron Jun 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
389 changes: 304 additions & 85 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion cdn-broker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ path = "src/binaries/bad-broker.rs"

# This dependency is used for the Tokio console
[target.'cfg(tokio_unstable)'.dependencies]
console-subscriber = "0.2"
console-subscriber = "0.3"


[dependencies]
Expand Down
24 changes: 16 additions & 8 deletions cdn-broker/benches/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@

use std::time::Duration;

use cdn_broker::reexports::tests::{TestDefinition, TestRun};
use cdn_broker::reexports::tests::{TestBroker, TestDefinition, TestRun, TestUser};
use cdn_broker::{assert_received, send_message_as};
use cdn_proto::connection::{protocols::Connection as _, Bytes};
use cdn_proto::connection::protocols::memory::Memory;
use cdn_proto::connection::Bytes;
use cdn_proto::def::TestTopic;
use cdn_proto::message::{Broadcast, Message};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
Expand Down Expand Up @@ -49,11 +50,14 @@ fn bench_broadcast_user(c: &mut Criterion) {
// Set up our broker under test
let run = benchmark_runtime.block_on(async move {
let run_definition = TestDefinition {
connected_users: vec![vec![TestTopic::Global as u8], vec![TestTopic::Global as u8]],
connected_users: vec![
TestUser::with_index(0, vec![TestTopic::Global.into()]),
TestUser::with_index(1, vec![TestTopic::Global.into()]),
],
connected_brokers: vec![],
};

run_definition.into_run().await
run_definition.into_run::<Memory, Memory>().await
});

// Benchmark
Expand All @@ -71,14 +75,18 @@ fn bench_broadcast_broker(c: &mut Criterion) {
// Set up our broker under test
let run = benchmark_runtime.block_on(async move {
let run_definition = TestDefinition {
connected_users: vec![vec![]],
connected_users: vec![TestUser::with_index(0, vec![])],
connected_brokers: vec![
(vec![], vec![TestTopic::Global as u8]),
(vec![], vec![TestTopic::Global as u8]),
TestBroker {
connected_users: vec![TestUser::with_index(1, vec![TestTopic::Global.into()])],
},
TestBroker {
connected_users: vec![TestUser::with_index(2, vec![TestTopic::Global.into()])],
},
],
};

run_definition.into_run().await
run_definition.into_run::<Memory, Memory>().await
});

// Benchmark
Expand Down
48 changes: 31 additions & 17 deletions cdn-broker/benches/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@

use std::time::Duration;

use cdn_broker::reexports::tests::{TestDefinition, TestRun};
use cdn_broker::{assert_received, send_message_as};
use cdn_proto::connection::{protocols::Connection as _, Bytes};
use cdn_broker::reexports::tests::{TestBroker, TestDefinition, TestRun, TestUser};
use cdn_broker::{assert_received, at_index, send_message_as};
use cdn_proto::connection::protocols::memory::Memory;
use cdn_proto::connection::Bytes;
use cdn_proto::def::TestTopic;
use cdn_proto::message::{Direct, Message};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
Expand All @@ -15,7 +16,7 @@ use pprof::criterion::{Output, PProfProfiler};
async fn direct_user_to_self(run: &TestRun) {
// Allocate a rather large message
let message = Message::Direct(Direct {
recipient: vec![0],
recipient: at_index![0],
message: vec![0; 10000],
});

Expand All @@ -29,7 +30,7 @@ async fn direct_user_to_self(run: &TestRun) {
async fn direct_user_to_user(run: &TestRun) {
// Allocate a rather large message
let message = Message::Direct(Direct {
recipient: vec![1],
recipient: at_index![1],
message: vec![0; 10000],
});

Expand All @@ -43,7 +44,7 @@ async fn direct_user_to_user(run: &TestRun) {
async fn direct_user_to_broker(run: &TestRun) {
// Allocate a rather large message
let message = Message::Direct(Direct {
recipient: vec![2],
recipient: at_index![2],
message: vec![0; 10000],
});

Expand All @@ -57,7 +58,7 @@ async fn direct_user_to_broker(run: &TestRun) {
async fn direct_broker_to_user(run: &TestRun) {
// Allocate a rather large message
let message = Message::Direct(Direct {
recipient: vec![0],
recipient: at_index![0],
message: vec![0; 10000],
});

Expand All @@ -76,11 +77,11 @@ fn bench_direct_user_to_self(c: &mut Criterion) {
// Set up our broker under test
let run = benchmark_runtime.block_on(async move {
let run_definition = TestDefinition {
connected_users: vec![vec![TestTopic::Global as u8]],
connected_users: vec![TestUser::with_index(0, vec![TestTopic::Global as u8])],
connected_brokers: vec![],
};

run_definition.into_run().await
run_definition.into_run::<Memory, Memory>().await
});

// Run the benchmark
Expand All @@ -99,11 +100,14 @@ fn bench_direct_user_to_user(c: &mut Criterion) {
// Set up our broker under test
let run = benchmark_runtime.block_on(async move {
let run_definition = TestDefinition {
connected_users: vec![vec![TestTopic::Global as u8], vec![TestTopic::Global as u8]],
connected_users: vec![
TestUser::with_index(0, vec![TestTopic::Global as u8]),
TestUser::with_index(1, vec![TestTopic::Global as u8]),
],
connected_brokers: vec![],
};

run_definition.into_run().await
run_definition.into_run::<Memory, Memory>().await
});

// Run the benchmark
Expand All @@ -122,11 +126,16 @@ fn bench_direct_user_to_broker(c: &mut Criterion) {
// Set up our broker under test
let run = benchmark_runtime.block_on(async move {
let run_definition = TestDefinition {
connected_users: vec![vec![TestTopic::Global as u8], vec![TestTopic::Global as u8]],
connected_brokers: vec![(vec![2], vec![TestTopic::Global as u8])],
connected_users: vec![
TestUser::with_index(0, vec![TestTopic::Global as u8]),
TestUser::with_index(1, vec![TestTopic::Global as u8]),
],
connected_brokers: vec![TestBroker {
connected_users: vec![TestUser::with_index(2, vec![TestTopic::Global as u8])],
}],
};

run_definition.into_run().await
run_definition.into_run::<Memory, Memory>().await
});

// Run the benchmark
Expand All @@ -145,11 +154,16 @@ fn bench_direct_broker_to_user(c: &mut Criterion) {
// Set up our broker under test
let run = benchmark_runtime.block_on(async move {
let run_definition = TestDefinition {
connected_users: vec![vec![TestTopic::Global as u8], vec![TestTopic::Global as u8]],
connected_brokers: vec![(vec![2], vec![TestTopic::Global as u8])],
connected_users: vec![
TestUser::with_index(0, vec![TestTopic::Global as u8]),
TestUser::with_index(1, vec![TestTopic::Global as u8]),
],
connected_brokers: vec![TestBroker {
connected_users: vec![TestUser::with_index(0, vec![TestTopic::Global as u8])],
}],
};

run_definition.into_run().await
run_definition.into_run::<Memory, Memory>().await
});

// Run the benchmark
Expand Down
2 changes: 2 additions & 0 deletions cdn-broker/src/binaries/bad-broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use clap::Parser;
use jf_signature::{bls_over_bn254::BLSOverBN254CurveSignatureScheme as BLS, SignatureScheme};
use rand::{rngs::StdRng, SeedableRng};
use tokio::{spawn, time::sleep};
#[cfg(not(tokio_unstable))]
use tracing_subscriber::EnvFilter;

#[derive(Parser, Debug)]
Expand Down Expand Up @@ -68,6 +69,7 @@ async fn main() -> Result<()> {
public_advertise_endpoint: format!("local_ip:{public_port}"),
private_bind_endpoint: format!("0.0.0.0:{private_port}"),
private_advertise_endpoint: format!("local_ip:{private_port}"),
global_memory_pool_size: None,
};

// Create new `Broker`
Expand Down
9 changes: 9 additions & 0 deletions cdn-broker/src/binaries/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use cdn_proto::{crypto::signature::KeyPair, def::ProductionRunDef, error::Result
use clap::Parser;
use jf_signature::{bls_over_bn254::BLSOverBN254CurveSignatureScheme as BLS, SignatureScheme};
use rand::{rngs::StdRng, SeedableRng};
#[cfg(not(tokio_unstable))]
use tracing_subscriber::EnvFilter;

#[derive(Parser, Debug)]
Expand Down Expand Up @@ -52,6 +53,13 @@ struct Args {
/// The seed for broker key generation
#[arg(short, long, default_value_t = 0)]
key_seed: u64,

/// The size of the global memory pool (in bytes). This is the maximum number of bytes that
/// can be allocated at once for all connections. A connection will block if it
/// tries to allocate more than this amount until some memory is freed.
/// Default is 1GB.
#[arg(long, default_value_t = 1_073_741_824)]
global_memory_pool_size: usize,
}

#[tokio::main]
Expand Down Expand Up @@ -96,6 +104,7 @@ async fn main() -> Result<()> {
public_advertise_endpoint: args.public_advertise_endpoint,
private_bind_endpoint: args.private_bind_endpoint,
private_advertise_endpoint: args.private_advertise_endpoint,
global_memory_pool_size: Some(args.global_memory_pool_size),
};

// Create new `Broker`
Expand Down
2 changes: 2 additions & 0 deletions cdn-broker/src/connections/broadcast/relational_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ impl<K: Hash + PartialEq + Eq + Clone, V: Hash + PartialEq + Eq + Clone> Relatio
}

#[cfg(test)]
// Makes tests more readable
#[allow(clippy::unnecessary_get_then_check)]
pub mod tests {
use super::RelationalMap;

Expand Down
Loading