From 8ad799042ec9ad5fa1369ee2737c06714acea2f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 2 Jan 2024 16:56:52 +0100 Subject: [PATCH 01/20] bencher: tweak udp sets, update TODO --- TODO.md | 4 +++ crates/bencher/src/protocols/udp.rs | 46 +++++++++++++++++++++++++---- 2 files changed, 45 insertions(+), 5 deletions(-) diff --git a/TODO.md b/TODO.md index 66fdd607..73ea5837 100644 --- a/TODO.md +++ b/TODO.md @@ -2,6 +2,10 @@ ## High priority +* aquatic_bench + * consider removing 3 core sets + * check effects of channel size on RAM use and performance + * CI transfer test * add udp with io_uring * add HTTP without TLS diff --git a/crates/bencher/src/protocols/udp.rs b/crates/bencher/src/protocols/udp.rs index cf5e0239..7bae18bf 100644 --- a/crates/bencher/src/protocols/udp.rs +++ b/crates/bencher/src/protocols/udp.rs @@ -113,6 +113,7 @@ impl UdpCommand { UdpTracker::Aquatic => vec![ AquaticUdpRunner::new(2, 1), AquaticUdpRunner::new(3, 1), + AquaticUdpRunner::new(4, 1), AquaticUdpRunner::new(5, 1), ], UdpTracker::OpenTracker => vec![ @@ -123,13 +124,15 @@ impl UdpCommand { ChihayaUdpRunner::new(), ], }, - load_test_runs: simple_load_test_runs(cpu_mode, &[4, 6, 8]), + load_test_runs: simple_load_test_runs(cpu_mode, &[4, 6, 8, 12, 16]), }, 4 => SetConfig { implementations: indexmap! { UdpTracker::Aquatic => vec![ AquaticUdpRunner::new(3, 1), AquaticUdpRunner::new(4, 1), + AquaticUdpRunner::new(5, 1), + AquaticUdpRunner::new(6, 1), AquaticUdpRunner::new(7, 1), ], UdpTracker::OpenTracker => vec![ @@ -140,14 +143,17 @@ impl UdpCommand { ChihayaUdpRunner::new(), ], }, - load_test_runs: simple_load_test_runs(cpu_mode, &[4, 6, 8]), + load_test_runs: simple_load_test_runs(cpu_mode, &[6, 8, 12, 16]), }, 6 => SetConfig { implementations: indexmap! { UdpTracker::Aquatic => vec![ AquaticUdpRunner::new(5, 1), + AquaticUdpRunner::new(6, 1), AquaticUdpRunner::new(10, 1), + AquaticUdpRunner::new(4, 2), + AquaticUdpRunner::new(6, 2), AquaticUdpRunner::new(8, 2), ], UdpTracker::OpenTracker => vec![ @@ -155,31 +161,44 @@ impl UdpCommand { OpenTrackerUdpRunner::new(12), ], }, - load_test_runs: simple_load_test_runs(cpu_mode, &[4, 6, 8, 12]), + load_test_runs: simple_load_test_runs(cpu_mode, &[6, 8, 12, 16, 24]), }, 8 => SetConfig { implementations: indexmap! { UdpTracker::Aquatic => vec![ AquaticUdpRunner::new(7, 1), + AquaticUdpRunner::new(8, 1), + AquaticUdpRunner::new(14, 1), AquaticUdpRunner::new(6, 2), AquaticUdpRunner::new(12, 2), AquaticUdpRunner::new(5, 3), + AquaticUdpRunner::new(10, 3), ], UdpTracker::OpenTracker => vec![ OpenTrackerUdpRunner::new(8), OpenTrackerUdpRunner::new(16), ], }, - load_test_runs: simple_load_test_runs(cpu_mode, &[4, 8, 12, 16]), + load_test_runs: simple_load_test_runs(cpu_mode, &[8, 12, 16, 24]), }, 12 => SetConfig { implementations: indexmap! { UdpTracker::Aquatic => vec![ AquaticUdpRunner::new(10, 2), + AquaticUdpRunner::new(12, 2), + AquaticUdpRunner::new(20, 2), + AquaticUdpRunner::new(9, 3), + AquaticUdpRunner::new(12, 3), + AquaticUdpRunner::new(18, 3), + AquaticUdpRunner::new(8, 4), + AquaticUdpRunner::new(12, 4), AquaticUdpRunner::new(16, 4), - AquaticUdpRunner::new(9, 5), + + AquaticUdpRunner::new(7, 5), + AquaticUdpRunner::new(12, 5), + AquaticUdpRunner::new(14, 5), ], UdpTracker::OpenTracker => vec![ OpenTrackerUdpRunner::new(12), @@ -191,12 +210,29 @@ impl UdpCommand { 16 => SetConfig { implementations: indexmap! { UdpTracker::Aquatic => vec![ + AquaticUdpRunner::new(12, 2), + AquaticUdpRunner::new(16, 2), + AquaticUdpRunner::new(24, 2), + AquaticUdpRunner::new(13, 3), + AquaticUdpRunner::new(16, 3), + AquaticUdpRunner::new(26, 3), + AquaticUdpRunner::new(12, 4), + AquaticUdpRunner::new(16, 4), + AquaticUdpRunner::new(24, 4), + AquaticUdpRunner::new(11, 5), + AquaticUdpRunner::new(16, 5), + AquaticUdpRunner::new(22, 5), + AquaticUdpRunner::new(10, 6), + AquaticUdpRunner::new(16, 6), AquaticUdpRunner::new(20, 6), + AquaticUdpRunner::new(9, 7), + AquaticUdpRunner::new(16, 7), + AquaticUdpRunner::new(18, 7), ], UdpTracker::OpenTracker => vec![ OpenTrackerUdpRunner::new(16), From c4e644cb23fbb84fc2e2607b0ad9ae5f39bb7e3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 2 Jan 2024 16:57:46 +0100 Subject: [PATCH 02/20] udp: log (info-level) estimated channel memory use --- crates/udp/src/common.rs | 10 ++++++++++ crates/udp/src/lib.rs | 23 ++++++++++++++++++++++- 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/crates/udp/src/common.rs b/crates/udp/src/common.rs index 8cc322c2..6504f06b 100644 --- a/crates/udp/src/common.rs +++ b/crates/udp/src/common.rs @@ -2,6 +2,7 @@ use std::borrow::Cow; use std::collections::BTreeMap; use std::hash::Hash; use std::io::Write; +use std::mem::size_of; use std::net::{SocketAddr, SocketAddrV4}; use std::sync::atomic::AtomicUsize; use std::sync::Arc; @@ -101,6 +102,15 @@ pub struct ConnectedResponseWithAddr { pub addr: CanonicalSocketAddr, } +impl ConnectedResponseWithAddr { + pub fn estimated_max_size(config: &Config) -> usize { + size_of::() + + config.protocol.max_response_peers + * (size_of::>() + + size_of::>()) + } +} + pub struct Recycler; impl thingbuf::Recycle for Recycler { diff --git a/crates/udp/src/lib.rs b/crates/udp/src/lib.rs index 6658b370..c9871ba3 100644 --- a/crates/udp/src/lib.rs +++ b/crates/udp/src/lib.rs @@ -3,6 +3,7 @@ pub mod config; pub mod workers; use std::collections::BTreeMap; +use std::mem::size_of; use std::thread::Builder; use std::time::Duration; @@ -15,7 +16,7 @@ use aquatic_common::access_list::update_access_list; #[cfg(feature = "cpu-pinning")] use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; use aquatic_common::privileges::PrivilegeDropper; -use aquatic_common::{PanicSentinelWatcher, ServerStartInstant}; +use aquatic_common::{CanonicalSocketAddr, PanicSentinelWatcher, ServerStartInstant}; use common::{ ConnectedRequestSender, ConnectedResponseSender, Recycler, SocketWorkerIndex, State, @@ -24,12 +25,19 @@ use common::{ use config::Config; use workers::socket::ConnectionValidator; +use crate::common::{ConnectedRequest, ConnectedResponseWithAddr}; + pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker"; pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION"); pub fn run(config: Config) -> ::anyhow::Result<()> { let mut signals = Signals::new([SIGUSR1, SIGTERM])?; + ::log::info!( + "Estimated max channel memory use: {:.02} MB", + est_max_total_channel_memory(&config) + ); + let state = State::new(config.swarm_workers); let connection_validator = ConnectionValidator::new(&config)?; let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel(); @@ -206,3 +214,16 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { Ok(()) } + +fn est_max_total_channel_memory(config: &Config) -> f64 { + let request_channel_max_size = config.swarm_workers + * config.worker_channel_size + * (size_of::() + + size_of::() + + size_of::()); + let response_channel_max_size = config.socket_workers + * config.worker_channel_size + * ConnectedResponseWithAddr::estimated_max_size(&config); + + (request_channel_max_size as u64 + response_channel_max_size as u64) as f64 / (1024.0 * 1024.0) +} From 4249a7f48dfd06a817ad9f34f5c399c25226f5c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 2 Jan 2024 16:59:08 +0100 Subject: [PATCH 03/20] udp: improve defaults for worker channel size and socket recv buffer --- crates/udp/src/config.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/udp/src/config.rs b/crates/udp/src/config.rs index baaabf78..bf8cc555 100644 --- a/crates/udp/src/config.rs +++ b/crates/udp/src/config.rs @@ -58,7 +58,7 @@ impl Default for Config { socket_workers: 1, swarm_workers: 1, log_level: LogLevel::Error, - worker_channel_size: 1024 * 128, + worker_channel_size: 4_096, request_channel_recv_timeout_ms: 100, network: NetworkConfig::default(), protocol: ProtocolConfig::default(), @@ -92,11 +92,11 @@ pub struct NetworkConfig { /// values for different operating systems: /// /// macOS: - /// $ sudo sysctl net.inet.udp.recvspace=6000000 + /// $ sudo sysctl net.inet.udp.recvspace=8000000 /// /// Linux: - /// $ sudo sysctl -w net.core.rmem_max=104857600 - /// $ sudo sysctl -w net.core.rmem_default=104857600 + /// $ sudo sysctl -w net.core.rmem_max=8000000 + /// $ sudo sysctl -w net.core.rmem_default=8000000 pub socket_recv_buffer_size: usize, /// Poll timeout in milliseconds (mio backend only) pub poll_timeout_ms: u64, @@ -129,7 +129,7 @@ impl Default for NetworkConfig { Self { address: SocketAddr::from(([0, 0, 0, 0], 3000)), only_ipv6: false, - socket_recv_buffer_size: 4096 * 128, + socket_recv_buffer_size: 8_000_000, poll_timeout_ms: 50, #[cfg(feature = "io-uring")] ring_size: 1024, From 6870a9e9c4a26ec47475ae09084a45f5aba59e3f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 2 Jan 2024 17:00:00 +0100 Subject: [PATCH 04/20] udp: lower log level from info to debug for several mio statements --- crates/udp/src/workers/socket/mio.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/udp/src/workers/socket/mio.rs b/crates/udp/src/workers/socket/mio.rs index fab0f678..64daf5f5 100644 --- a/crates/udp/src/workers/socket/mio.rs +++ b/crates/udp/src/workers/socket/mio.rs @@ -125,7 +125,7 @@ impl SocketWorker { self.read_and_handle_requests(pending_scrape_valid_until); } PollMode::SkipReceiving => { - ::log::info!("Postponing receiving requests because swarm worker channel is full. This means that the OS will be relied on to buffer incoming packets. To prevent this, raise config.worker_channel_size."); + ::log::debug!("Postponing receiving requests because swarm worker channel is full. This means that the OS will be relied on to buffer incoming packets. To prevent this, raise config.worker_channel_size."); self.polling_mode = PollMode::SkipPolling; } @@ -192,7 +192,7 @@ impl SocketWorker { match self.socket.recv_from(&mut self.buffer[..]) { Ok((bytes_read, src)) => { if src.port() == 0 { - ::log::info!("Ignored request from {} because source port is zero", src); + ::log::debug!("Ignored request from {} because source port is zero", src); continue; } @@ -501,7 +501,7 @@ impl SocketWorker { || (err.kind() == ErrorKind::WouldBlock) => { if resend_buffer.len() < config.network.resend_buffer_max_len { - ::log::info!("Adding response to resend queue, since sending it to {} failed with: {:#}", addr, err); + ::log::debug!("Adding response to resend queue, since sending it to {} failed with: {:#}", addr, err); resend_buffer.push((response.into_owned(), canonical_addr)); } else { From e59890709569acca911f0b4d44447d0b1f4eb13b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 2 Jan 2024 17:12:04 +0100 Subject: [PATCH 05/20] setup-udp-bookworm.sh: print some software versions etc --- scripts/bench/setup-udp-bookworm.sh | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/scripts/bench/setup-udp-bookworm.sh b/scripts/bench/setup-udp-bookworm.sh index 51c87777..88ecbee0 100755 --- a/scripts/bench/setup-udp-bookworm.sh +++ b/scripts/bench/setup-udp-bookworm.sh @@ -14,6 +14,7 @@ source "$HOME/.cargo/env" cargo build --profile "release-debug" -p aquatic_udp cargo build --profile "release-debug" -p aquatic_udp_load_test cargo build --profile "release-debug" -p aquatic_bencher --features udp +git log --oneline | head -n 1 cd $HOME mkdir -p projects @@ -30,6 +31,7 @@ sed -i "s/^OPTS_production=-O3/OPTS_production=-O3 -march=native -mtune=native/g sed -i "s/if \(numwant > 200\) numwant = 200/if (numwant > 50) numwant = 50/g" ot_udp.c make sudo cp ./opentracker /usr/local/bin/ +git log --oneline | head -n 1 cd .. # Install chihaya @@ -37,6 +39,12 @@ git clone https://github.com/chihaya/chihaya.git cd chihaya go build ./cmd/chihaya sudo cp ./chihaya /usr/local/bin/ +git log --oneline | head -n 1 cd .. +rustc --version +gcc --version +go version +lscpu + echo "Finished. Reboot before running aquatic_bencher" \ No newline at end of file From e76394b60d160906102528ec616b080e5d80b8fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 2 Jan 2024 17:25:54 +0100 Subject: [PATCH 06/20] udp: decrease default worker_channel_size to 1024 --- crates/udp/src/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/udp/src/config.rs b/crates/udp/src/config.rs index bf8cc555..988f7992 100644 --- a/crates/udp/src/config.rs +++ b/crates/udp/src/config.rs @@ -58,7 +58,7 @@ impl Default for Config { socket_workers: 1, swarm_workers: 1, log_level: LogLevel::Error, - worker_channel_size: 4_096, + worker_channel_size: 1_024, request_channel_recv_timeout_ms: 100, network: NetworkConfig::default(), protocol: ProtocolConfig::default(), From ff50b010fb191e1813d130f3e1005a753ee2111b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 2 Jan 2024 17:26:44 +0100 Subject: [PATCH 07/20] bench: tweak udp sets --- crates/bencher/src/protocols/udp.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/bencher/src/protocols/udp.rs b/crates/bencher/src/protocols/udp.rs index 7bae18bf..e517beff 100644 --- a/crates/bencher/src/protocols/udp.rs +++ b/crates/bencher/src/protocols/udp.rs @@ -210,9 +210,9 @@ impl UdpCommand { 16 => SetConfig { implementations: indexmap! { UdpTracker::Aquatic => vec![ - AquaticUdpRunner::new(12, 2), + AquaticUdpRunner::new(14, 2), AquaticUdpRunner::new(16, 2), - AquaticUdpRunner::new(24, 2), + AquaticUdpRunner::new(28, 2), AquaticUdpRunner::new(13, 3), AquaticUdpRunner::new(16, 3), From 74155b4d792fe9ebf8ab86df48cdb3c215653423 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 2 Jan 2024 17:41:42 +0100 Subject: [PATCH 08/20] bench: udp: remove 3 cpu core sets to save runtime --- TODO.md | 1 - crates/bencher/src/protocols/udp.rs | 18 ------------------ 2 files changed, 19 deletions(-) diff --git a/TODO.md b/TODO.md index 73ea5837..fec4c915 100644 --- a/TODO.md +++ b/TODO.md @@ -3,7 +3,6 @@ ## High priority * aquatic_bench - * consider removing 3 core sets * check effects of channel size on RAM use and performance * CI transfer test diff --git a/crates/bencher/src/protocols/udp.rs b/crates/bencher/src/protocols/udp.rs index e517beff..735162d5 100644 --- a/crates/bencher/src/protocols/udp.rs +++ b/crates/bencher/src/protocols/udp.rs @@ -108,24 +108,6 @@ impl UdpCommand { }, load_test_runs: simple_load_test_runs(cpu_mode, &[1, 2, 4, 6, 8]), }, - 3 => SetConfig { - implementations: indexmap! { - UdpTracker::Aquatic => vec![ - AquaticUdpRunner::new(2, 1), - AquaticUdpRunner::new(3, 1), - AquaticUdpRunner::new(4, 1), - AquaticUdpRunner::new(5, 1), - ], - UdpTracker::OpenTracker => vec![ - OpenTrackerUdpRunner::new(3), - OpenTrackerUdpRunner::new(6), - ], - UdpTracker::Chihaya => vec![ - ChihayaUdpRunner::new(), - ], - }, - load_test_runs: simple_load_test_runs(cpu_mode, &[4, 6, 8, 12, 16]), - }, 4 => SetConfig { implementations: indexmap! { UdpTracker::Aquatic => vec![ From e18b50227c81489f00fca32a790215687055cfcc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 3 Jan 2024 16:32:10 +0100 Subject: [PATCH 09/20] bencher: implement load test run priorities --- crates/bencher/src/common.rs | 25 ++- crates/bencher/src/main.rs | 12 +- crates/bencher/src/protocols/udp.rs | 243 +++++++++++++++++----------- crates/bencher/src/run.rs | 4 +- crates/bencher/src/set.rs | 28 +++- 5 files changed, 212 insertions(+), 100 deletions(-) diff --git a/crates/bencher/src/common.rs b/crates/bencher/src/common.rs index 3cb4e116..6d176640 100644 --- a/crates/bencher/src/common.rs +++ b/crates/bencher/src/common.rs @@ -2,6 +2,23 @@ use std::{fmt::Display, ops::Range, thread::available_parallelism}; use itertools::Itertools; +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, clap::ValueEnum)] +pub enum Priority { + Low, + Medium, + High, +} + +impl Display for Priority { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Low => f.write_str("low"), + Self::Medium => f.write_str("medium"), + Self::High => f.write_str("high"), + } + } +} + #[derive(Debug, Clone)] pub struct TaskSetCpuList(pub Vec); @@ -141,13 +158,17 @@ pub enum CpuDirection { Desc, } -pub fn simple_load_test_runs(cpu_mode: CpuMode, workers: &[usize]) -> Vec<(usize, TaskSetCpuList)> { +pub fn simple_load_test_runs( + cpu_mode: CpuMode, + workers: &[(usize, Priority)], +) -> Vec<(usize, Priority, TaskSetCpuList)> { workers .into_iter() .copied() - .map(|workers| { + .map(|(workers, priority)| { ( workers, + priority, TaskSetCpuList::new(cpu_mode, CpuDirection::Desc, workers).unwrap(), ) }) diff --git a/crates/bencher/src/main.rs b/crates/bencher/src/main.rs index 5494d5c8..fbbc4c6f 100644 --- a/crates/bencher/src/main.rs +++ b/crates/bencher/src/main.rs @@ -5,7 +5,7 @@ pub mod run; pub mod set; use clap::{Parser, Subcommand}; -use common::CpuMode; +use common::{CpuMode, Priority}; #[derive(Parser)] #[command(author, version, about)] @@ -20,6 +20,9 @@ struct Args { /// Maximum number of tracker cpu cores to run benchmarks for #[arg(long)] max_cores: Option, + /// Minimum benchmark priority + #[arg(long, default_value_t = Priority::Medium)] + min_priority: Priority, #[command(subcommand)] command: Command, } @@ -37,7 +40,12 @@ fn main() { match args.command { #[cfg(feature = "udp")] Command::Udp(command) => command - .run(args.cpu_mode, args.min_cores, args.max_cores) + .run( + args.cpu_mode, + args.min_cores, + args.max_cores, + args.min_priority, + ) .unwrap(), } } diff --git a/crates/bencher/src/protocols/udp.rs b/crates/bencher/src/protocols/udp.rs index 735162d5..4180be55 100644 --- a/crates/bencher/src/protocols/udp.rs +++ b/crates/bencher/src/protocols/udp.rs @@ -11,7 +11,7 @@ use indoc::writedoc; use tempfile::NamedTempFile; use crate::{ - common::{simple_load_test_runs, CpuMode, TaskSetCpuList}, + common::{simple_load_test_runs, CpuMode, Priority, TaskSetCpuList}, run::ProcessRunner, set::{run_sets, SetConfig, Tracker}, }; @@ -55,173 +55,212 @@ impl UdpCommand { cpu_mode: CpuMode, min_cores: Option, max_cores: Option, + min_priority: Priority, ) -> anyhow::Result<()> { - let mut sets = self.sets(cpu_mode); - - if let Some(min_cores) = min_cores { - sets = sets.into_iter().filter(|(k, _)| *k >= min_cores).collect(); - } - if let Some(max_cores) = max_cores { - sets = sets.into_iter().filter(|(k, _)| *k <= max_cores).collect(); - } - - run_sets(self, cpu_mode, sets, |workers| { - Box::new(AquaticUdpLoadTestRunner { workers }) - }); + let sets = self.sets(cpu_mode); + + run_sets( + self, + cpu_mode, + min_cores, + max_cores, + min_priority, + sets, + |workers| Box::new(AquaticUdpLoadTestRunner { workers }), + ); Ok(()) } fn sets(&self, cpu_mode: CpuMode) -> IndexMap> { + // Priorities are based on what has previously produced the best results indexmap::indexmap! { 1 => SetConfig { implementations: indexmap! { UdpTracker::Aquatic => vec![ - AquaticUdpRunner::new(1, 1), - AquaticUdpRunner::new(2, 1), + AquaticUdpRunner::new(1, 1, Priority::High), + AquaticUdpRunner::new(2, 1, Priority::High), ], UdpTracker::OpenTracker => vec![ - OpenTrackerUdpRunner::new(0), // Handle requests within event loop - OpenTrackerUdpRunner::new(1), - OpenTrackerUdpRunner::new(2), + OpenTrackerUdpRunner::new(0, Priority::Low), // Handle requests within event loop + OpenTrackerUdpRunner::new(1, Priority::Medium), + OpenTrackerUdpRunner::new(2, Priority::High), ], UdpTracker::Chihaya => vec![ ChihayaUdpRunner::new(), ], }, - load_test_runs: simple_load_test_runs(cpu_mode, &[1, 2, 4, 6, 8]), + load_test_runs: simple_load_test_runs(cpu_mode, &[ + (1, Priority::High), + (2, Priority::Medium), + (4, Priority::Medium), + (6, Priority::Medium), + (8, Priority::High) + ]), }, 2 => SetConfig { implementations: indexmap! { UdpTracker::Aquatic => vec![ - AquaticUdpRunner::new(1, 1), - AquaticUdpRunner::new(2, 1), - AquaticUdpRunner::new(3, 1), + AquaticUdpRunner::new(1, 1, Priority::Low), + AquaticUdpRunner::new(2, 1, Priority::Medium), + AquaticUdpRunner::new(3, 1, Priority::High), ], UdpTracker::OpenTracker => vec![ - OpenTrackerUdpRunner::new(2), - OpenTrackerUdpRunner::new(4), + OpenTrackerUdpRunner::new(2, Priority::Medium), + OpenTrackerUdpRunner::new(4, Priority::High), ], UdpTracker::Chihaya => vec![ ChihayaUdpRunner::new(), ], }, - load_test_runs: simple_load_test_runs(cpu_mode, &[1, 2, 4, 6, 8]), + load_test_runs: simple_load_test_runs(cpu_mode, &[ + (1, Priority::Medium), + (2, Priority::Medium), + (4, Priority::Medium), + (6, Priority::Medium), + (8, Priority::High) + ]), }, 4 => SetConfig { implementations: indexmap! { UdpTracker::Aquatic => vec![ - AquaticUdpRunner::new(3, 1), - AquaticUdpRunner::new(4, 1), - AquaticUdpRunner::new(5, 1), - AquaticUdpRunner::new(6, 1), - AquaticUdpRunner::new(7, 1), + AquaticUdpRunner::new(3, 1, Priority::Low), + AquaticUdpRunner::new(4, 1, Priority::Low), + AquaticUdpRunner::new(5, 1, Priority::Medium), + AquaticUdpRunner::new(6, 1, Priority::Medium), + AquaticUdpRunner::new(7, 1, Priority::High), ], UdpTracker::OpenTracker => vec![ - OpenTrackerUdpRunner::new(4), - OpenTrackerUdpRunner::new(8), + OpenTrackerUdpRunner::new(4, Priority::High), + OpenTrackerUdpRunner::new(8, Priority::Medium), ], UdpTracker::Chihaya => vec![ ChihayaUdpRunner::new(), ], }, - load_test_runs: simple_load_test_runs(cpu_mode, &[6, 8, 12, 16]), + load_test_runs: simple_load_test_runs(cpu_mode, &[ + (6, Priority::High), + (8, Priority::Medium), + (12, Priority::High), + (16, Priority::Medium) + ]), }, 6 => SetConfig { implementations: indexmap! { UdpTracker::Aquatic => vec![ - AquaticUdpRunner::new(5, 1), - AquaticUdpRunner::new(6, 1), - AquaticUdpRunner::new(10, 1), + AquaticUdpRunner::new(5, 1, Priority::Medium), + AquaticUdpRunner::new(6, 1, Priority::Medium), + AquaticUdpRunner::new(10, 1, Priority::Low), - AquaticUdpRunner::new(4, 2), - AquaticUdpRunner::new(6, 2), - AquaticUdpRunner::new(8, 2), + AquaticUdpRunner::new(4, 2, Priority::Low), + AquaticUdpRunner::new(6, 2, Priority::Medium), + AquaticUdpRunner::new(8, 2, Priority::High), ], UdpTracker::OpenTracker => vec![ - OpenTrackerUdpRunner::new(6), - OpenTrackerUdpRunner::new(12), + OpenTrackerUdpRunner::new(6, Priority::High), + OpenTrackerUdpRunner::new(12, Priority::Medium), ], }, - load_test_runs: simple_load_test_runs(cpu_mode, &[6, 8, 12, 16, 24]), + load_test_runs: simple_load_test_runs(cpu_mode, &[ + (6, Priority::Medium), + (8, Priority::Medium), + (12, Priority::High), + (16, Priority::High), + (24, Priority::Medium), + ]), }, 8 => SetConfig { implementations: indexmap! { UdpTracker::Aquatic => vec![ - AquaticUdpRunner::new(7, 1), - AquaticUdpRunner::new(8, 1), - AquaticUdpRunner::new(14, 1), - AquaticUdpRunner::new(6, 2), - AquaticUdpRunner::new(12, 2), - AquaticUdpRunner::new(5, 3), - AquaticUdpRunner::new(10, 3), + AquaticUdpRunner::new(7, 1, Priority::Medium), + AquaticUdpRunner::new(8, 1, Priority::Medium), + AquaticUdpRunner::new(14, 1, Priority::Low), + AquaticUdpRunner::new(6, 2, Priority::Low), + AquaticUdpRunner::new(12, 2, Priority::High), + AquaticUdpRunner::new(5, 3, Priority::Low), + AquaticUdpRunner::new(10, 3, Priority::Medium), ], UdpTracker::OpenTracker => vec![ - OpenTrackerUdpRunner::new(8), - OpenTrackerUdpRunner::new(16), + OpenTrackerUdpRunner::new(8, Priority::High), + OpenTrackerUdpRunner::new(16, Priority::Medium), ], }, - load_test_runs: simple_load_test_runs(cpu_mode, &[8, 12, 16, 24]), + load_test_runs: simple_load_test_runs(cpu_mode, &[ + (8, Priority::High), + (12, Priority::Medium), + (16, Priority::High), + (24, Priority::Medium) + ]), }, 12 => SetConfig { implementations: indexmap! { UdpTracker::Aquatic => vec![ - AquaticUdpRunner::new(10, 2), - AquaticUdpRunner::new(12, 2), - AquaticUdpRunner::new(20, 2), + AquaticUdpRunner::new(10, 2, Priority::Medium), + AquaticUdpRunner::new(12, 2, Priority::Medium), + AquaticUdpRunner::new(20, 2, Priority::Low), - AquaticUdpRunner::new(9, 3), - AquaticUdpRunner::new(12, 3), - AquaticUdpRunner::new(18, 3), + AquaticUdpRunner::new(9, 3, Priority::Low), + AquaticUdpRunner::new(12, 3, Priority::Medium), + AquaticUdpRunner::new(18, 3, Priority::Low), - AquaticUdpRunner::new(8, 4), - AquaticUdpRunner::new(12, 4), - AquaticUdpRunner::new(16, 4), + AquaticUdpRunner::new(8, 4, Priority::Low), + AquaticUdpRunner::new(12, 4, Priority::Medium), + AquaticUdpRunner::new(16, 4, Priority::High), - AquaticUdpRunner::new(7, 5), - AquaticUdpRunner::new(12, 5), - AquaticUdpRunner::new(14, 5), + AquaticUdpRunner::new(7, 5, Priority::Low), + AquaticUdpRunner::new(12, 5, Priority::Medium), + AquaticUdpRunner::new(14, 5, Priority::Medium), ], UdpTracker::OpenTracker => vec![ - OpenTrackerUdpRunner::new(12), - OpenTrackerUdpRunner::new(24), + OpenTrackerUdpRunner::new(12, Priority::High), + OpenTrackerUdpRunner::new(24, Priority::Medium), ], }, - load_test_runs: simple_load_test_runs(cpu_mode, &[8, 12, 16, 24]), + load_test_runs: simple_load_test_runs(cpu_mode, &[ + (8, Priority::Medium), + (12, Priority::Medium), + (16, Priority::High), + (24, Priority::High), + ]), }, 16 => SetConfig { implementations: indexmap! { UdpTracker::Aquatic => vec![ - AquaticUdpRunner::new(14, 2), - AquaticUdpRunner::new(16, 2), - AquaticUdpRunner::new(28, 2), + AquaticUdpRunner::new(14, 2, Priority::Low), + AquaticUdpRunner::new(16, 2, Priority::Low), + AquaticUdpRunner::new(28, 2, Priority::Low), - AquaticUdpRunner::new(13, 3), - AquaticUdpRunner::new(16, 3), - AquaticUdpRunner::new(26, 3), + AquaticUdpRunner::new(13, 3, Priority::Low), + AquaticUdpRunner::new(16, 3, Priority::Low), + AquaticUdpRunner::new(26, 3, Priority::Low), - AquaticUdpRunner::new(12, 4), - AquaticUdpRunner::new(16, 4), - AquaticUdpRunner::new(24, 4), + AquaticUdpRunner::new(12, 4, Priority::Medium), + AquaticUdpRunner::new(16, 4, Priority::Medium), + AquaticUdpRunner::new(24, 4, Priority::Low), - AquaticUdpRunner::new(11, 5), - AquaticUdpRunner::new(16, 5), - AquaticUdpRunner::new(22, 5), + AquaticUdpRunner::new(11, 5, Priority::Low), + AquaticUdpRunner::new(16, 5, Priority::Medium), + AquaticUdpRunner::new(22, 5, Priority::Low), - AquaticUdpRunner::new(10, 6), - AquaticUdpRunner::new(16, 6), - AquaticUdpRunner::new(20, 6), + AquaticUdpRunner::new(10, 6, Priority::Low), + AquaticUdpRunner::new(16, 6, Priority::High), + AquaticUdpRunner::new(20, 6, Priority::Medium), - AquaticUdpRunner::new(9, 7), - AquaticUdpRunner::new(16, 7), - AquaticUdpRunner::new(18, 7), + AquaticUdpRunner::new(9, 7, Priority::Low), + AquaticUdpRunner::new(16, 7, Priority::Medium), + AquaticUdpRunner::new(18, 7, Priority::Low), ], UdpTracker::OpenTracker => vec![ - OpenTrackerUdpRunner::new(16), - OpenTrackerUdpRunner::new(32), + OpenTrackerUdpRunner::new(16, Priority::High), + OpenTrackerUdpRunner::new(32, Priority::Medium), ], }, - load_test_runs: simple_load_test_runs(cpu_mode, &[8, 12, 16, 24]), + load_test_runs: simple_load_test_runs(cpu_mode, &[ + (8, Priority::High), + (12, Priority::High), + (16, Priority::High), + (24, Priority::High), + ]), }, } } @@ -231,16 +270,19 @@ impl UdpCommand { struct AquaticUdpRunner { socket_workers: usize, swarm_workers: usize, + priority: Priority, } impl AquaticUdpRunner { fn new( socket_workers: usize, swarm_workers: usize, + priority: Priority, ) -> Rc> { Rc::new(Self { socket_workers, swarm_workers, + priority, }) } } @@ -275,6 +317,10 @@ impl ProcessRunner for AquaticUdpRunner { .spawn()?) } + fn priority(&self) -> crate::common::Priority { + self.priority + } + fn keys(&self) -> IndexMap { indexmap! { "socket workers".to_string() => self.socket_workers.to_string(), @@ -286,11 +332,12 @@ impl ProcessRunner for AquaticUdpRunner { #[derive(Debug, Clone)] struct OpenTrackerUdpRunner { workers: usize, + priority: Priority, } impl OpenTrackerUdpRunner { - fn new(workers: usize) -> Rc> { - Rc::new(Self { workers }) + fn new(workers: usize, priority: Priority) -> Rc> { + Rc::new(Self { workers, priority }) } } @@ -320,6 +367,10 @@ impl ProcessRunner for OpenTrackerUdpRunner { .spawn()?) } + fn priority(&self) -> crate::common::Priority { + self.priority + } + fn keys(&self) -> IndexMap { indexmap! { "workers".to_string() => self.workers.to_string(), @@ -332,7 +383,7 @@ struct ChihayaUdpRunner; impl ChihayaUdpRunner { fn new() -> Rc> { - Rc::new(Self) + Rc::new(Self {}) } } @@ -372,6 +423,10 @@ impl ProcessRunner for ChihayaUdpRunner { .spawn()?) } + fn priority(&self) -> crate::common::Priority { + Priority::High + } + fn keys(&self) -> IndexMap { Default::default() } @@ -415,6 +470,12 @@ impl ProcessRunner for AquaticUdpLoadTestRunner { .spawn()?) } + fn priority(&self) -> crate::common::Priority { + eprintln!("load test runner priority method called"); + + Priority::High + } + fn keys(&self) -> IndexMap { indexmap! { "workers".to_string() => self.workers.to_string(), diff --git a/crates/bencher/src/run.rs b/crates/bencher/src/run.rs index d3b02107..0f0fc4fe 100644 --- a/crates/bencher/src/run.rs +++ b/crates/bencher/src/run.rs @@ -12,7 +12,7 @@ use once_cell::sync::Lazy; use regex::Regex; use tempfile::NamedTempFile; -use crate::common::TaskSetCpuList; +use crate::common::{Priority, TaskSetCpuList}; pub trait ProcessRunner: ::std::fmt::Debug { type Command; @@ -26,6 +26,8 @@ pub trait ProcessRunner: ::std::fmt::Debug { fn keys(&self) -> IndexMap; + fn priority(&self) -> Priority; + fn info(&self) -> String { self.keys() .into_iter() diff --git a/crates/bencher/src/set.rs b/crates/bencher/src/set.rs index d4f7f803..a7d59c04 100644 --- a/crates/bencher/src/set.rs +++ b/crates/bencher/src/set.rs @@ -5,7 +5,7 @@ use indexmap::IndexMap; use num_format::{Locale, ToFormattedString}; use crate::{ - common::{CpuDirection, CpuMode, TaskSetCpuList}, + common::{CpuDirection, CpuMode, Priority, TaskSetCpuList}, html::{html_all_runs, html_best_results}, run::{ProcessRunner, ProcessStats, RunConfig}, }; @@ -16,19 +16,39 @@ pub trait Tracker: ::std::fmt::Debug + Copy + Clone + ::std::hash::Hash { pub struct SetConfig { pub implementations: IndexMap>>>, - pub load_test_runs: Vec<(usize, TaskSetCpuList)>, + pub load_test_runs: Vec<(usize, Priority, TaskSetCpuList)>, } pub fn run_sets( command: &C, cpu_mode: CpuMode, - set_configs: IndexMap>, + min_cores: Option, + max_cores: Option, + min_priority: Priority, + mut set_configs: IndexMap>, load_test_gen: F, ) where C: ::std::fmt::Debug, I: Tracker, F: Fn(usize) -> Box>, { + if let Some(min_cores) = min_cores { + set_configs.retain(|cores, _| *cores >= min_cores); + } + if let Some(max_cores) = max_cores { + set_configs.retain(|cores, _| *cores <= max_cores); + } + + for set_config in set_configs.values_mut() { + for runners in set_config.implementations.values_mut() { + runners.retain(|r| r.priority() >= min_priority); + } + + set_config + .load_test_runs + .retain(|(_, priority, _)| *priority >= min_priority); + } + println!("# Benchmark report"); let total_num_runs = set_configs @@ -75,7 +95,7 @@ pub fn run_sets( .load_test_runs .clone() .into_iter() - .map(|(workers, load_test_vcpus)| { + .map(|(workers, _, load_test_vcpus)| { LoadTestRunResults::produce( command, &load_test_gen, From 7863782413894ca7fccef0f71a7f72d1a52f47f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 3 Jan 2024 17:07:06 +0100 Subject: [PATCH 10/20] udp load test: add config.summarize_last: only summarize last N seconds --- crates/udp_load_test/src/config.rs | 5 +++++ crates/udp_load_test/src/lib.rs | 27 ++++++++++++++++++++++++--- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/crates/udp_load_test/src/config.rs b/crates/udp_load_test/src/config.rs index 8db6bae0..b1802575 100644 --- a/crates/udp_load_test/src/config.rs +++ b/crates/udp_load_test/src/config.rs @@ -21,6 +21,10 @@ pub struct Config { pub workers: u8, /// Run duration (quit and generate report after this many seconds) pub duration: usize, + /// Only report summary for the last N seconds of run + /// + /// 0 = include whole run + pub summarize_last: usize, pub network: NetworkConfig, pub requests: RequestConfig, #[cfg(feature = "cpu-pinning")] @@ -34,6 +38,7 @@ impl Default for Config { log_level: LogLevel::Error, workers: 1, duration: 0, + summarize_last: 0, network: NetworkConfig::default(), requests: RequestConfig::default(), #[cfg(feature = "cpu-pinning")] diff --git a/crates/udp_load_test/src/lib.rs b/crates/udp_load_test/src/lib.rs index b8b3ebbe..d040c419 100644 --- a/crates/udp_load_test/src/lib.rs +++ b/crates/udp_load_test/src/lib.rs @@ -33,6 +33,10 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { panic!("Error: at least one weight must be larger than zero."); } + if config.summarize_last > config.duration { + panic!("Error: report_last_seconds can't be larger than duration"); + } + println!("Starting client with config: {:#?}", config); let mut info_hashes = Vec::with_capacity(config.requests.number_of_torrents); @@ -103,7 +107,7 @@ fn monitor_statistics(state: LoadTestState, config: &Config) { let mut report_avg_scrape: Vec = Vec::new(); let mut report_avg_error: Vec = Vec::new(); - let interval = 5; + const INTERVAL: u64 = 5; let start_time = Instant::now(); let duration = Duration::from_secs(config.duration as u64); @@ -111,7 +115,7 @@ fn monitor_statistics(state: LoadTestState, config: &Config) { let mut last = start_time; let time_elapsed = loop { - thread::sleep(Duration::from_secs(interval)); + thread::sleep(Duration::from_secs(INTERVAL)); let requests = fetch_and_reset(&state.statistics.requests); let response_peers = fetch_and_reset(&state.statistics.response_peers); @@ -163,6 +167,15 @@ fn monitor_statistics(state: LoadTestState, config: &Config) { } }; + if config.summarize_last != 0 { + let split_at = (config.duration - config.summarize_last) / INTERVAL as usize; + + report_avg_connect = report_avg_connect.split_off(split_at); + report_avg_announce = report_avg_announce.split_off(split_at); + report_avg_scrape = report_avg_scrape.split_off(split_at); + report_avg_error = report_avg_error.split_off(split_at); + } + let len = report_avg_connect.len() as f64; let avg_connect: f64 = report_avg_connect.into_iter().sum::() / len; @@ -175,7 +188,15 @@ fn monitor_statistics(state: LoadTestState, config: &Config) { println!(); println!("# aquatic load test report"); println!(); - println!("Test ran for {} seconds", time_elapsed.as_secs()); + println!( + "Test ran for {} seconds {}", + time_elapsed.as_secs(), + if config.summarize_last != 0 { + format!("(only last {} included in summary)", config.summarize_last) + } else { + "".to_string() + } + ); println!("Average responses per second: {:.2}", avg_total); println!(" - Connect responses: {:.2}", avg_connect); println!(" - Announce responses: {:.2}", avg_announce); From 85862f161a82fb6592083d252df04025f949c224 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 3 Jan 2024 17:36:04 +0100 Subject: [PATCH 11/20] bencher: add args duration and summarize_last (seconds) --- crates/bencher/src/main.rs | 28 +++++++++++++++++--- crates/bencher/src/protocols/udp.rs | 41 +++++++++-------------------- crates/bencher/src/run.rs | 8 ++++-- crates/bencher/src/set.rs | 30 +++++++++++++++------ 4 files changed, 65 insertions(+), 42 deletions(-) diff --git a/crates/bencher/src/main.rs b/crates/bencher/src/main.rs index fbbc4c6f..1ced0407 100644 --- a/crates/bencher/src/main.rs +++ b/crates/bencher/src/main.rs @@ -6,6 +6,7 @@ pub mod set; use clap::{Parser, Subcommand}; use common::{CpuMode, Priority}; +use set::run_sets; #[derive(Parser)] #[command(author, version, about)] @@ -23,6 +24,17 @@ struct Args { /// Minimum benchmark priority #[arg(long, default_value_t = Priority::Medium)] min_priority: Priority, + /// How long to run each load test for + #[arg(long, default_value_t = 60)] + duration: usize, + /// Only include data for last N seconds of load test runs. + /// + /// Useful if the tracker/load tester combination is slow at reaching + /// maximum throughput + /// + /// 0 = use data for whole run + #[arg(long, default_value_t = 0)] + summarize_last: usize, #[command(subcommand)] command: Command, } @@ -39,13 +51,21 @@ fn main() { match args.command { #[cfg(feature = "udp")] - Command::Udp(command) => command - .run( + Command::Udp(command) => { + let sets = command.sets(args.cpu_mode); + let load_test_gen = protocols::udp::UdpCommand::load_test_gen; + + run_sets( + &command, args.cpu_mode, args.min_cores, args.max_cores, args.min_priority, - ) - .unwrap(), + args.duration, + args.summarize_last, + sets, + load_test_gen, + ); + } } } diff --git a/crates/bencher/src/protocols/udp.rs b/crates/bencher/src/protocols/udp.rs index 4180be55..68151d8e 100644 --- a/crates/bencher/src/protocols/udp.rs +++ b/crates/bencher/src/protocols/udp.rs @@ -13,7 +13,7 @@ use tempfile::NamedTempFile; use crate::{ common::{simple_load_test_runs, CpuMode, Priority, TaskSetCpuList}, run::ProcessRunner, - set::{run_sets, SetConfig, Tracker}, + set::{LoadTestRunnerParameters, SetConfig, Tracker}, }; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] @@ -50,29 +50,7 @@ pub struct UdpCommand { } impl UdpCommand { - pub fn run( - &self, - cpu_mode: CpuMode, - min_cores: Option, - max_cores: Option, - min_priority: Priority, - ) -> anyhow::Result<()> { - let sets = self.sets(cpu_mode); - - run_sets( - self, - cpu_mode, - min_cores, - max_cores, - min_priority, - sets, - |workers| Box::new(AquaticUdpLoadTestRunner { workers }), - ); - - Ok(()) - } - - fn sets(&self, cpu_mode: CpuMode) -> IndexMap> { + pub fn sets(&self, cpu_mode: CpuMode) -> IndexMap> { // Priorities are based on what has previously produced the best results indexmap::indexmap! { 1 => SetConfig { @@ -264,6 +242,12 @@ impl UdpCommand { }, } } + + pub fn load_test_gen( + parameters: LoadTestRunnerParameters, + ) -> Box> { + Box::new(AquaticUdpLoadTestRunner { parameters }) + } } #[derive(Debug, Clone)] @@ -434,7 +418,7 @@ impl ProcessRunner for ChihayaUdpRunner { #[derive(Debug, Clone)] struct AquaticUdpLoadTestRunner { - workers: usize, + parameters: LoadTestRunnerParameters, } impl ProcessRunner for AquaticUdpLoadTestRunner { @@ -448,8 +432,9 @@ impl ProcessRunner for AquaticUdpLoadTestRunner { ) -> anyhow::Result { let mut c = aquatic_udp_load_test::config::Config::default(); - c.workers = self.workers as u8; - c.duration = 60; + c.workers = self.parameters.workers as u8; + c.duration = self.parameters.duration; + c.summarize_last = self.parameters.summarize_last; c.requests.weight_connect = 0; c.requests.weight_announce = 100; @@ -478,7 +463,7 @@ impl ProcessRunner for AquaticUdpLoadTestRunner { fn keys(&self) -> IndexMap { indexmap! { - "workers".to_string() => self.workers.to_string(), + "workers".to_string() => self.parameters.workers.to_string(), } } } diff --git a/crates/bencher/src/run.rs b/crates/bencher/src/run.rs index 0f0fc4fe..27574042 100644 --- a/crates/bencher/src/run.rs +++ b/crates/bencher/src/run.rs @@ -45,7 +45,11 @@ pub struct RunConfig { } impl RunConfig { - pub fn run(self, command: &C) -> Result> { + pub fn run( + self, + command: &C, + duration: usize, + ) -> Result> { let mut tracker_config_file = NamedTempFile::new().unwrap(); let mut load_test_config_file = NamedTempFile::new().unwrap(); @@ -75,7 +79,7 @@ impl RunConfig { } }; - for _ in 0..59 { + for _ in 0..(duration - 1) { if let Ok(Some(status)) = tracker.0.try_wait() { return Err(RunErrorResults::new(self) .set_tracker_outputs(tracker) diff --git a/crates/bencher/src/set.rs b/crates/bencher/src/set.rs index a7d59c04..a34a3e70 100644 --- a/crates/bencher/src/set.rs +++ b/crates/bencher/src/set.rs @@ -10,6 +10,13 @@ use crate::{ run::{ProcessRunner, ProcessStats, RunConfig}, }; +#[derive(Debug, Clone, Copy)] +pub struct LoadTestRunnerParameters { + pub workers: usize, + pub duration: usize, + pub summarize_last: usize, +} + pub trait Tracker: ::std::fmt::Debug + Copy + Clone + ::std::hash::Hash { fn name(&self) -> String; } @@ -25,12 +32,14 @@ pub fn run_sets( min_cores: Option, max_cores: Option, min_priority: Priority, + duration: usize, + summarize_last: usize, mut set_configs: IndexMap>, load_test_gen: F, ) where C: ::std::fmt::Debug, I: Tracker, - F: Fn(usize) -> Box>, + F: Fn(LoadTestRunnerParameters) -> Box>, { if let Some(min_cores) = min_cores { set_configs.retain(|cores, _| *cores >= min_cores); @@ -59,7 +68,7 @@ pub fn run_sets( .sum::(); let (estimated_hours, estimated_minutes) = { - let minutes = (total_num_runs * 67) / 60; + let minutes = (total_num_runs * (duration + 7)) / 60; (minutes / 60, minutes % 60) }; @@ -96,13 +105,18 @@ pub fn run_sets( .clone() .into_iter() .map(|(workers, _, load_test_vcpus)| { + let load_test_parameters = LoadTestRunnerParameters { + workers, + duration, + summarize_last, + }; LoadTestRunResults::produce( command, &load_test_gen, + load_test_parameters, implementation, &tracker_run, tracker_vcpus.clone(), - workers, load_test_vcpus, ) }) @@ -188,26 +202,26 @@ impl LoadTestRunResults { pub fn produce( command: &C, load_test_gen: &F, + load_test_parameters: LoadTestRunnerParameters, implementation: I, tracker_process: &Rc>, tracker_vcpus: TaskSetCpuList, - workers: usize, load_test_vcpus: TaskSetCpuList, ) -> Self where C: ::std::fmt::Debug, I: Tracker, - F: Fn(usize) -> Box>, + F: Fn(LoadTestRunnerParameters) -> Box>, { println!( "### {} run ({}) (load test workers: {}, cpus: {})", implementation.name(), tracker_process.info(), - workers, + load_test_parameters.workers, load_test_vcpus.as_cpu_list() ); - let load_test_runner = load_test_gen(workers); + let load_test_runner = load_test_gen(load_test_parameters); let load_test_keys = load_test_runner.keys(); let run_config = RunConfig { @@ -217,7 +231,7 @@ impl LoadTestRunResults { load_test_vcpus: load_test_vcpus.clone(), }; - match run_config.run(command) { + match run_config.run(command, load_test_parameters.duration) { Ok(r) => { println!( "- Average responses per second: {}", From 255edf1b1690bb4eb5f21b5fc4876029d2d34bfe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 3 Jan 2024 17:38:06 +0100 Subject: [PATCH 12/20] bencher: always listen on localhost --- crates/bencher/src/protocols/udp.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/bencher/src/protocols/udp.rs b/crates/bencher/src/protocols/udp.rs index 68151d8e..b5801273 100644 --- a/crates/bencher/src/protocols/udp.rs +++ b/crates/bencher/src/protocols/udp.rs @@ -1,5 +1,6 @@ use std::{ io::Write, + net::{Ipv4Addr, SocketAddr, SocketAddrV4}, path::PathBuf, process::{Child, Command, Stdio}, rc::Rc, @@ -284,6 +285,7 @@ impl ProcessRunner for AquaticUdpRunner { c.socket_workers = self.socket_workers; c.swarm_workers = self.swarm_workers; + c.network.address = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 3000)); c.protocol.max_response_peers = 50; let c = toml::to_string_pretty(&c)?; @@ -336,7 +338,7 @@ impl ProcessRunner for OpenTrackerUdpRunner { ) -> anyhow::Result { writeln!( tmp_file, - "listen.udp.workers {}\nlisten.udp 0.0.0.0:3000", + "listen.udp.workers {}\nlisten.udp 127.0.0.1:3000", self.workers )?; From 1df1014798bc2989f4aad5ae58d66a256145a15b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 3 Jan 2024 17:45:47 +0100 Subject: [PATCH 13/20] Update TODO --- TODO.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/TODO.md b/TODO.md index fec4c915..76f665c8 100644 --- a/TODO.md +++ b/TODO.md @@ -3,10 +3,10 @@ ## High priority * aquatic_bench - * check effects of channel size on RAM use and performance + * Check if opentracker is slow to get up to speed, adjust bencher + * Maybe investigate aquatic memory use * CI transfer test - * add udp with io_uring * add HTTP without TLS * http From bbe09bd0dfe81ad5a51fc5a64a70619f1a49aa3f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 3 Jan 2024 19:49:28 +0100 Subject: [PATCH 14/20] udp load test: improve config defaults, add announce_peers_wanted field --- crates/udp_load_test/src/config.rs | 17 ++++++++++------- crates/udp_load_test/src/worker/request_gen.rs | 4 ++-- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/crates/udp_load_test/src/config.rs b/crates/udp_load_test/src/config.rs index b1802575..5571b11d 100644 --- a/crates/udp_load_test/src/config.rs +++ b/crates/udp_load_test/src/config.rs @@ -68,11 +68,11 @@ pub struct NetworkConfig { /// values for different operating systems: /// /// macOS: - /// $ sudo sysctl net.inet.udp.recvspace=6000000 + /// $ sudo sysctl net.inet.udp.recvspace=8000000 /// /// Linux: - /// $ sudo sysctl -w net.core.rmem_max=104857600 - /// $ sudo sysctl -w net.core.rmem_default=104857600 + /// $ sudo sysctl -w net.core.rmem_max=8000000 + /// $ sudo sysctl -w net.core.rmem_default=8000000 pub recv_buffer: usize, } @@ -81,8 +81,8 @@ impl Default for NetworkConfig { Self { multiple_client_ipv4s: true, first_port: 45_000, - poll_timeout: 276, - recv_buffer: 6_000_000, + poll_timeout: 1, + recv_buffer: 8_000_000, } } } @@ -94,6 +94,8 @@ pub struct RequestConfig { pub number_of_torrents: usize, /// Maximum number of torrents to ask about in scrape requests pub scrape_max_torrents: usize, + /// Ask for this number of peers in announce requests + pub announce_peers_wanted: i32, /// Probability that a generated request is a connect request as part /// of sum of the various weight arguments. pub weight_connect: usize, @@ -118,13 +120,14 @@ impl Default for RequestConfig { fn default() -> Self { Self { number_of_torrents: 10_000, - scrape_max_torrents: 50, + scrape_max_torrents: 10, + announce_peers_wanted: 30, weight_connect: 0, weight_announce: 100, weight_scrape: 1, torrent_gamma_shape: 0.2, torrent_gamma_scale: 100.0, - peer_seeder_probability: 0.25, + peer_seeder_probability: 0.75, additional_request_probability: 0.5, } } diff --git a/crates/udp_load_test/src/worker/request_gen.rs b/crates/udp_load_test/src/worker/request_gen.rs index a9183415..60a67985 100644 --- a/crates/udp_load_test/src/worker/request_gen.rs +++ b/crates/udp_load_test/src/worker/request_gen.rs @@ -160,8 +160,8 @@ fn create_announce_request( bytes_left, event: event.into(), ip_address: Ipv4AddrBytes([0; 4]), - key: PeerKey::new(12345), - peers_wanted: NumberOfPeers::new(100), + key: PeerKey::new(0), + peers_wanted: NumberOfPeers::new(config.requests.announce_peers_wanted), port: torrent_peer.port, }) .into() From df13ae9399d4975aea5ccdf1ac100878acc07e19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 3 Jan 2024 19:51:09 +0100 Subject: [PATCH 15/20] bencher: update config defaults, use load test max_response_peers --- crates/bencher/src/main.rs | 4 ++-- crates/bencher/src/protocols/udp.rs | 7 ++++--- scripts/bench/setup-udp-bookworm.sh | 1 - 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/bencher/src/main.rs b/crates/bencher/src/main.rs index 1ced0407..70cbc448 100644 --- a/crates/bencher/src/main.rs +++ b/crates/bencher/src/main.rs @@ -25,7 +25,7 @@ struct Args { #[arg(long, default_value_t = Priority::Medium)] min_priority: Priority, /// How long to run each load test for - #[arg(long, default_value_t = 60)] + #[arg(long, default_value_t = 90)] duration: usize, /// Only include data for last N seconds of load test runs. /// @@ -33,7 +33,7 @@ struct Args { /// maximum throughput /// /// 0 = use data for whole run - #[arg(long, default_value_t = 0)] + #[arg(long, default_value_t = 30)] summarize_last: usize, #[command(subcommand)] command: Command, diff --git a/crates/bencher/src/protocols/udp.rs b/crates/bencher/src/protocols/udp.rs index b5801273..5ea7eaf2 100644 --- a/crates/bencher/src/protocols/udp.rs +++ b/crates/bencher/src/protocols/udp.rs @@ -286,7 +286,7 @@ impl ProcessRunner for AquaticUdpRunner { c.socket_workers = self.socket_workers; c.swarm_workers = self.swarm_workers; c.network.address = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 3000)); - c.protocol.max_response_peers = 50; + c.protocol.max_response_peers = 30; let c = toml::to_string_pretty(&c)?; @@ -391,8 +391,8 @@ impl ProcessRunner for ChihayaUdpRunner { udp: addr: "127.0.0.1:3000" private_key: "abcdefghijklmnopqrst" - max_numwant: 50 - default_numwant: 50 + max_numwant: 30 + default_numwant: 30 storage: name: "memory" "#, @@ -438,6 +438,7 @@ impl ProcessRunner for AquaticUdpLoadTestRunner { c.duration = self.parameters.duration; c.summarize_last = self.parameters.summarize_last; + c.requests.announce_peers_wanted = 30; c.requests.weight_connect = 0; c.requests.weight_announce = 100; c.requests.weight_scrape = 1; diff --git a/scripts/bench/setup-udp-bookworm.sh b/scripts/bench/setup-udp-bookworm.sh index 88ecbee0..1e508f21 100755 --- a/scripts/bench/setup-udp-bookworm.sh +++ b/scripts/bench/setup-udp-bookworm.sh @@ -28,7 +28,6 @@ cd .. git clone git://erdgeist.org/opentracker cd opentracker sed -i "s/^OPTS_production=-O3/OPTS_production=-O3 -march=native -mtune=native/g" Makefile -sed -i "s/if \(numwant > 200\) numwant = 200/if (numwant > 50) numwant = 50/g" ot_udp.c make sudo cp ./opentracker /usr/local/bin/ git log --oneline | head -n 1 From 0eaa4475e2198963f36b2f36b61f418ed4480da2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 4 Jan 2024 17:04:17 +0100 Subject: [PATCH 16/20] udp: index peers by packet src ip and provided port instead of peer_id --- CHANGELOG.md | 3 ++ TODO.md | 8 +++-- crates/udp/src/workers/swarm/storage.rs | 39 ++++++++++--------------- crates/udp_protocol/src/common.rs | 4 +-- 4 files changed, 27 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 08f2c3fd..09708fcd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,9 @@ #### Changed +* Index peers by packet source IP and provided port, instead of by peer_id. + This prevents users from impersonating others and is likely also slightly + faster for IPv4 peers. * Remove support for unbounded worker channels * Add backpressure in socket workers. They will postpone reading from the socket if sending a request to a swarm worker failed diff --git a/TODO.md b/TODO.md index 76f665c8..8f0b3ab2 100644 --- a/TODO.md +++ b/TODO.md @@ -3,8 +3,12 @@ ## High priority * aquatic_bench - * Check if opentracker is slow to get up to speed, adjust bencher - * Maybe investigate aquatic memory use + * Opentracker "slow to get up to speed", is it due to getting faster once + inserts are rarely needed since most ip-port combinations have been sent? + In that case, a shorter duration (e.g., 30 seconds) would be a good idea. + * Maybe investigate aquatic memory use. + * Would it use significantly less memory to store peers in an ArrayVec if + there are only, say, 2 of them? * CI transfer test * add HTTP without TLS diff --git a/crates/udp/src/workers/swarm/storage.rs b/crates/udp/src/workers/swarm/storage.rs index 00452ecd..4feb80d9 100644 --- a/crates/udp/src/workers/swarm/storage.rs +++ b/crates/udp/src/workers/swarm/storage.rs @@ -157,7 +157,7 @@ impl TorrentMap { } pub struct TorrentData { - peers: IndexMap>, + peers: IndexMap, Peer>, num_seeders: usize, } @@ -184,7 +184,12 @@ impl TorrentData { let status = PeerStatus::from_event_and_bytes_left(request.event.into(), request.bytes_left); - let opt_removed_peer = self.peers.remove(&request.peer_id); + let peer_map_key = ResponsePeer { + ip_address, + port: request.port, + }; + + let opt_removed_peer = self.peers.remove(&peer_map_key); if let Some(Peer { is_seeder: true, .. @@ -208,20 +213,19 @@ impl TorrentData { rng, &self.peers, max_num_peers_to_take, - Peer::to_response_peer, + |k, _| *k, &mut response.peers, ); match status { PeerStatus::Leeching => { let peer = Peer { - ip_address, - port: request.port, + peer_id: request.peer_id, is_seeder: false, valid_until, }; - self.peers.insert(request.peer_id, peer); + self.peers.insert(peer_map_key, peer); if config.statistics.peer_clients && opt_removed_peer.is_none() { statistics_sender @@ -231,13 +235,12 @@ impl TorrentData { } PeerStatus::Seeding => { let peer = Peer { - ip_address, - port: request.port, + peer_id: request.peer_id, is_seeder: true, valid_until, }; - self.peers.insert(request.peer_id, peer); + self.peers.insert(peer_map_key, peer); self.num_seeders += 1; @@ -279,7 +282,7 @@ impl TorrentData { statistics_sender: &Sender, now: SecondsSinceServerStart, ) { - self.peers.retain(|peer_id, peer| { + self.peers.retain(|_, peer| { let keep = peer.valid_until.valid(now); if !keep { @@ -288,7 +291,7 @@ impl TorrentData { } if config.statistics.peer_clients { if let Err(_) = - statistics_sender.try_send(StatisticsMessage::PeerRemoved(*peer_id)) + statistics_sender.try_send(StatisticsMessage::PeerRemoved(peer.peer_id)) { // Should never happen in practice ::log::error!("Couldn't send StatisticsMessage::PeerRemoved"); @@ -315,22 +318,12 @@ impl Default for TorrentData { } #[derive(Clone, Debug)] -struct Peer { - ip_address: I, - port: Port, +struct Peer { + peer_id: PeerId, is_seeder: bool, valid_until: ValidUntil, } -impl Peer { - fn to_response_peer(_: &PeerId, peer: &Self) -> ResponsePeer { - ResponsePeer { - ip_address: peer.ip_address, - port: peer.port, - } - } -} - /// Extract response peers /// /// If there are more peers in map than `max_num_peers_to_take`, do a random diff --git a/crates/udp_protocol/src/common.rs b/crates/udp_protocol/src/common.rs index 829cd55e..a0043df7 100644 --- a/crates/udp_protocol/src/common.rs +++ b/crates/udp_protocol/src/common.rs @@ -5,7 +5,7 @@ pub use aquatic_peer_id::{PeerClient, PeerId}; use zerocopy::network_endian::{I32, I64, U16, U32}; use zerocopy::{AsBytes, FromBytes, FromZeroes}; -pub trait Ip: Clone + Copy + Debug + PartialEq + Eq + AsBytes {} +pub trait Ip: Clone + Copy + Debug + PartialEq + Eq + std::hash::Hash + AsBytes {} #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug, AsBytes, FromBytes, FromZeroes)] #[repr(transparent)] @@ -91,7 +91,7 @@ impl PeerKey { } } -#[derive(PartialEq, Eq, Clone, Copy, Debug, AsBytes, FromBytes, FromZeroes)] +#[derive(PartialEq, Eq, Clone, Copy, Debug, Hash, AsBytes, FromBytes, FromZeroes)] #[repr(C, packed)] pub struct ResponsePeer { pub ip_address: I, From 99791c7154db0dbd493e1e9f6f6e039b986358ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 4 Jan 2024 17:24:35 +0100 Subject: [PATCH 17/20] udp load test: store info hashes as Arc slice to save space --- crates/udp_load_test/src/common.rs | 2 +- crates/udp_load_test/src/lib.rs | 2 +- crates/udp_load_test/src/worker/request_gen.rs | 10 +++++----- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/crates/udp_load_test/src/common.rs b/crates/udp_load_test/src/common.rs index 6f407298..ade23acf 100644 --- a/crates/udp_load_test/src/common.rs +++ b/crates/udp_load_test/src/common.rs @@ -27,7 +27,7 @@ pub struct Statistics { #[derive(Clone)] pub struct LoadTestState { - pub info_hashes: Arc>, + pub info_hashes: Arc<[InfoHash]>, pub statistics: Arc, } diff --git a/crates/udp_load_test/src/lib.rs b/crates/udp_load_test/src/lib.rs index d040c419..0ce9a51e 100644 --- a/crates/udp_load_test/src/lib.rs +++ b/crates/udp_load_test/src/lib.rs @@ -46,7 +46,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { } let state = LoadTestState { - info_hashes: Arc::new(info_hashes), + info_hashes: Arc::from(info_hashes.into_boxed_slice()), statistics: Arc::new(Statistics::default()), }; diff --git a/crates/udp_load_test/src/worker/request_gen.rs b/crates/udp_load_test/src/worker/request_gen.rs index 60a67985..1f311f3f 100644 --- a/crates/udp_load_test/src/worker/request_gen.rs +++ b/crates/udp_load_test/src/worker/request_gen.rs @@ -13,7 +13,7 @@ use crate::utils::*; pub fn process_response( rng: &mut impl Rng, gamma: Gamma, - info_hashes: &Arc>, + info_hashes: &Arc<[InfoHash]>, config: &Config, torrent_peers: &mut TorrentPeerMap, response: Response, @@ -89,7 +89,7 @@ pub fn process_response( fn if_torrent_peer_move_and_create_random_request( config: &Config, rng: &mut impl Rng, - info_hashes: &Arc>, + info_hashes: &Arc<[InfoHash]>, torrent_peers: &mut TorrentPeerMap, transaction_id: TransactionId, ) -> Option { @@ -110,7 +110,7 @@ fn if_torrent_peer_move_and_create_random_request( fn create_random_request( config: &Config, rng: &mut impl Rng, - info_hashes: &Arc>, + info_hashes: &Arc<[InfoHash]>, transaction_id: TransactionId, torrent_peer: &TorrentPeer, ) -> Request { @@ -168,7 +168,7 @@ fn create_announce_request( } fn create_scrape_request( - info_hashes: &Arc>, + info_hashes: &Arc<[InfoHash]>, torrent_peer: &TorrentPeer, transaction_id: TransactionId, ) -> Request { @@ -192,7 +192,7 @@ fn create_torrent_peer( config: &Config, rng: &mut impl Rng, gamma: Gamma, - info_hashes: &Arc>, + info_hashes: &Arc<[InfoHash]>, connection_id: ConnectionId, ) -> TorrentPeer { let num_scape_hashes = rng.gen_range(1..config.requests.scrape_max_torrents); From 55516956ca5fe192ee134b1bcda6209fbc07493d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 4 Jan 2024 17:28:48 +0100 Subject: [PATCH 18/20] udp load test: store peer scrape_hash_indices as boxed slice --- crates/udp_load_test/src/common.rs | 2 +- crates/udp_load_test/src/worker/request_gen.rs | 17 ++++++++--------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/crates/udp_load_test/src/common.rs b/crates/udp_load_test/src/common.rs index ade23acf..7cbffbdd 100644 --- a/crates/udp_load_test/src/common.rs +++ b/crates/udp_load_test/src/common.rs @@ -7,7 +7,7 @@ use aquatic_udp_protocol::*; #[derive(PartialEq, Eq, Clone)] pub struct TorrentPeer { pub info_hash: InfoHash, - pub scrape_hash_indeces: Vec, + pub scrape_hash_indices: Box<[usize]>, pub connection_id: ConnectionId, pub peer_id: PeerId, pub port: Port, diff --git a/crates/udp_load_test/src/worker/request_gen.rs b/crates/udp_load_test/src/worker/request_gen.rs index 1f311f3f..6ce23556 100644 --- a/crates/udp_load_test/src/worker/request_gen.rs +++ b/crates/udp_load_test/src/worker/request_gen.rs @@ -172,11 +172,11 @@ fn create_scrape_request( torrent_peer: &TorrentPeer, transaction_id: TransactionId, ) -> Request { - let indeces = &torrent_peer.scrape_hash_indeces; + let indeces = &torrent_peer.scrape_hash_indices; let mut scape_hashes = Vec::with_capacity(indeces.len()); - for i in indeces { + for i in indeces.iter() { scape_hashes.push(info_hashes[*i].to_owned()) } @@ -195,19 +195,18 @@ fn create_torrent_peer( info_hashes: &Arc<[InfoHash]>, connection_id: ConnectionId, ) -> TorrentPeer { - let num_scape_hashes = rng.gen_range(1..config.requests.scrape_max_torrents); + let num_scrape_hashes = rng.gen_range(1..config.requests.scrape_max_torrents); - let mut scrape_hash_indeces = Vec::new(); - - for _ in 0..num_scape_hashes { - scrape_hash_indeces.push(select_info_hash_index(config, rng, gamma)) - } + let scrape_hash_indices = (0..num_scrape_hashes) + .map(|_| select_info_hash_index(config, rng, gamma)) + .collect::>() + .into_boxed_slice(); let info_hash_index = select_info_hash_index(config, rng, gamma); TorrentPeer { info_hash: info_hashes[info_hash_index], - scrape_hash_indeces, + scrape_hash_indices, connection_id, peer_id: generate_peer_id(), port: Port::new(rng.gen()), From 1e9b5c450c264ab196398aba86eb65c699fbba3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 4 Jan 2024 17:58:11 +0100 Subject: [PATCH 19/20] udp load test: use fixed rng seed --- crates/udp_load_test/src/worker/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/udp_load_test/src/worker/mod.rs b/crates/udp_load_test/src/worker/mod.rs index a06e7249..66c61651 100644 --- a/crates/udp_load_test/src/worker/mod.rs +++ b/crates/udp_load_test/src/worker/mod.rs @@ -7,7 +7,7 @@ use std::time::Duration; use mio::{net::UdpSocket, Events, Interest, Poll, Token}; use rand::Rng; -use rand::{prelude::SmallRng, thread_rng, SeedableRng}; +use rand::{prelude::SmallRng, SeedableRng}; use rand_distr::Gamma; use socket2::{Domain, Protocol, Socket, Type}; @@ -28,7 +28,7 @@ pub fn run_worker_thread( let mut socket = UdpSocket::from_std(create_socket(config, addr)); let mut buffer = [0u8; MAX_PACKET_SIZE]; - let mut rng = SmallRng::from_rng(thread_rng()).expect("create SmallRng from thread_rng()"); + let mut rng = SmallRng::seed_from_u64(0xc3aa8be617b3acce); let mut torrent_peers = TorrentPeerMap::default(); let token = Token(0); @@ -46,7 +46,7 @@ pub fn run_worker_thread( let mut statistics = SocketWorkerLocalStatistics::default(); // Bootstrap request cycle - let initial_request = create_connect_request(generate_transaction_id(&mut thread_rng())); + let initial_request = create_connect_request(generate_transaction_id(&mut rng)); send_request(&mut socket, &mut buffer, &mut statistics, initial_request); loop { From d48deeff8c460cbbbe1090c27a2bca5cce0c9651 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 4 Jan 2024 18:34:18 +0100 Subject: [PATCH 20/20] udp load test: refactor worker --- crates/udp_load_test/src/lib.rs | 2 +- crates/udp_load_test/src/worker/mod.rs | 406 ++++++++++++------ .../udp_load_test/src/worker/request_gen.rs | 218 ---------- 3 files changed, 287 insertions(+), 339 deletions(-) delete mode 100644 crates/udp_load_test/src/worker/request_gen.rs diff --git a/crates/udp_load_test/src/lib.rs b/crates/udp_load_test/src/lib.rs index 0ce9a51e..caa26eb6 100644 --- a/crates/udp_load_test/src/lib.rs +++ b/crates/udp_load_test/src/lib.rs @@ -84,7 +84,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { WorkerIndex::SocketWorker(i as usize), ); - run_worker_thread(state, gamma, &config, addr) + Worker::run(state, gamma, config, addr) })?; } diff --git a/crates/udp_load_test/src/worker/mod.rs b/crates/udp_load_test/src/worker/mod.rs index 66c61651..b3be4762 100644 --- a/crates/udp_load_test/src/worker/mod.rs +++ b/crates/udp_load_test/src/worker/mod.rs @@ -1,5 +1,3 @@ -mod request_gen; - use std::io::Cursor; use std::net::SocketAddr; use std::sync::atomic::Ordering; @@ -8,87 +6,75 @@ use std::time::Duration; use mio::{net::UdpSocket, Events, Interest, Poll, Token}; use rand::Rng; use rand::{prelude::SmallRng, SeedableRng}; -use rand_distr::Gamma; +use rand_distr::{Distribution, Gamma, WeightedIndex}; use socket2::{Domain, Protocol, Socket, Type}; use aquatic_udp_protocol::*; use crate::config::Config; use crate::{common::*, utils::*}; -use request_gen::process_response; const MAX_PACKET_SIZE: usize = 8192; -pub fn run_worker_thread( - state: LoadTestState, +pub struct Worker { + config: Config, + shared_state: LoadTestState, gamma: Gamma, - config: &Config, addr: SocketAddr, -) { - let mut socket = UdpSocket::from_std(create_socket(config, addr)); - let mut buffer = [0u8; MAX_PACKET_SIZE]; + socket: UdpSocket, + buffer: [u8; MAX_PACKET_SIZE], + rng: SmallRng, + torrent_peers: TorrentPeerMap, + statistics: SocketWorkerLocalStatistics, +} - let mut rng = SmallRng::seed_from_u64(0xc3aa8be617b3acce); - let mut torrent_peers = TorrentPeerMap::default(); +impl Worker { + pub fn run(shared_state: LoadTestState, gamma: Gamma, config: Config, addr: SocketAddr) { + let socket = UdpSocket::from_std(create_socket(&config, addr)); + let buffer = [0u8; MAX_PACKET_SIZE]; + let rng = SmallRng::seed_from_u64(0xc3aa8be617b3acce); + let torrent_peers = TorrentPeerMap::default(); + let statistics = SocketWorkerLocalStatistics::default(); - let token = Token(0); - let interests = Interest::READABLE; - let timeout = Duration::from_micros(config.network.poll_timeout); + let mut instance = Self { + config, + shared_state, + gamma, + addr, + socket, + buffer, + rng, + torrent_peers, + statistics, + }; - let mut poll = Poll::new().expect("create poll"); + instance.run_inner(); + } - poll.registry() - .register(&mut socket, token, interests) - .unwrap(); + fn run_inner(&mut self) { + let mut poll = Poll::new().expect("create poll"); + let mut events = Events::with_capacity(1); - let mut events = Events::with_capacity(1); + poll.registry() + .register(&mut self.socket, Token(0), Interest::READABLE) + .unwrap(); - let mut statistics = SocketWorkerLocalStatistics::default(); + // Bootstrap request cycle + let initial_request = create_connect_request(generate_transaction_id(&mut self.rng)); + self.send_request(initial_request); - // Bootstrap request cycle - let initial_request = create_connect_request(generate_transaction_id(&mut rng)); - send_request(&mut socket, &mut buffer, &mut statistics, initial_request); + let timeout = Duration::from_micros(self.config.network.poll_timeout); - loop { - poll.poll(&mut events, Some(timeout)) - .expect("failed polling"); + loop { + poll.poll(&mut events, Some(timeout)) + .expect("failed polling"); - for event in events.iter() { - if (event.token() == token) & event.is_readable() { - while let Ok(amt) = socket.recv(&mut buffer) { - match Response::from_bytes(&buffer[0..amt], addr.is_ipv4()) { + for _ in events.iter() { + while let Ok(amt) = self.socket.recv(&mut self.buffer) { + match Response::from_bytes(&self.buffer[0..amt], self.addr.is_ipv4()) { Ok(response) => { - match response { - Response::AnnounceIpv4(ref r) => { - statistics.responses_announce += 1; - statistics.response_peers += r.peers.len(); - } - Response::AnnounceIpv6(ref r) => { - statistics.responses_announce += 1; - statistics.response_peers += r.peers.len(); - } - Response::Scrape(_) => { - statistics.responses_scrape += 1; - } - Response::Connect(_) => { - statistics.responses_connect += 1; - } - Response::Error(_) => { - statistics.responses_error += 1; - } - } - - let opt_request = process_response( - &mut rng, - gamma, - &state.info_hashes, - &config, - &mut torrent_peers, - response, - ); - - if let Some(request) = opt_request { - send_request(&mut socket, &mut buffer, &mut statistics, request); + if let Some(request) = self.process_response(response) { + self.send_request(request); } } Err(err) => { @@ -97,79 +83,259 @@ pub fn run_worker_thread( } } - if rng.gen::() <= config.requests.additional_request_probability { + if self.rng.gen::() <= self.config.requests.additional_request_probability { let additional_request = - create_connect_request(generate_transaction_id(&mut rng)); + create_connect_request(generate_transaction_id(&mut self.rng)); - send_request( - &mut socket, - &mut buffer, - &mut statistics, - additional_request, - ); + self.send_request(additional_request); } - update_shared_statistics(&state, &mut statistics); + self.update_shared_statistics(); } } } -} -fn send_request( - socket: &mut UdpSocket, - buffer: &mut [u8], - statistics: &mut SocketWorkerLocalStatistics, - request: Request, -) { - let mut cursor = Cursor::new(buffer); - - match request.write(&mut cursor) { - Ok(()) => { - let position = cursor.position() as usize; - let inner = cursor.get_ref(); - - match socket.send(&inner[..position]) { - Ok(_) => { - statistics.requests += 1; + fn process_response(&mut self, response: Response) -> Option { + match response { + Response::Connect(r) => { + self.statistics.responses_connect += 1; + + // Fetch the torrent peer or create it if is doesn't exists. Update + // the connection id if fetched. Create a request and move the + // torrent peer appropriately. + + let mut torrent_peer = self + .torrent_peers + .remove(&r.transaction_id) + .unwrap_or_else(|| self.create_torrent_peer(r.connection_id)); + + torrent_peer.connection_id = r.connection_id; + + let new_transaction_id = generate_transaction_id(&mut self.rng); + let request = self.create_random_request(new_transaction_id, &torrent_peer); + + self.torrent_peers.insert(new_transaction_id, torrent_peer); + + Some(request) + } + Response::AnnounceIpv4(r) => { + self.statistics.responses_announce += 1; + self.statistics.response_peers += r.peers.len(); + + self.if_torrent_peer_move_and_create_random_request(r.fixed.transaction_id) + } + Response::AnnounceIpv6(r) => { + self.statistics.responses_announce += 1; + self.statistics.response_peers += r.peers.len(); + + self.if_torrent_peer_move_and_create_random_request(r.fixed.transaction_id) + } + Response::Scrape(r) => { + self.statistics.responses_scrape += 1; + + self.if_torrent_peer_move_and_create_random_request(r.transaction_id) + } + Response::Error(r) => { + self.statistics.responses_error += 1; + + if !r.message.to_lowercase().contains("connection") { + eprintln!( + "Received error response which didn't contain the word 'connection': {}", + r.message + ); } - Err(err) => { - eprintln!("Couldn't send packet: {:?}", err); + + if let Some(torrent_peer) = self.torrent_peers.remove(&r.transaction_id) { + let new_transaction_id = generate_transaction_id(&mut self.rng); + + self.torrent_peers.insert(new_transaction_id, torrent_peer); + + Some(create_connect_request(new_transaction_id)) + } else { + Some(create_connect_request(generate_transaction_id( + &mut self.rng, + ))) } } } - Err(err) => { - eprintln!("request_to_bytes err: {}", err); + } + + fn if_torrent_peer_move_and_create_random_request( + &mut self, + transaction_id: TransactionId, + ) -> Option { + let torrent_peer = self.torrent_peers.remove(&transaction_id)?; + + let new_transaction_id = generate_transaction_id(&mut self.rng); + + let request = self.create_random_request(new_transaction_id, &torrent_peer); + + self.torrent_peers.insert(new_transaction_id, torrent_peer); + + Some(request) + } + + fn create_torrent_peer(&mut self, connection_id: ConnectionId) -> TorrentPeer { + let num_scrape_hashes = self + .rng + .gen_range(1..self.config.requests.scrape_max_torrents); + + let scrape_hash_indices = (0..num_scrape_hashes) + .map(|_| self.random_info_hash_index()) + .collect::>() + .into_boxed_slice(); + + let info_hash_index = self.random_info_hash_index(); + + TorrentPeer { + info_hash: self.shared_state.info_hashes[info_hash_index], + scrape_hash_indices, + connection_id, + peer_id: generate_peer_id(), + port: Port::new(self.rng.gen()), } } -} -fn update_shared_statistics(state: &LoadTestState, statistics: &mut SocketWorkerLocalStatistics) { - state - .statistics - .requests - .fetch_add(statistics.requests, Ordering::Relaxed); - state - .statistics - .responses_connect - .fetch_add(statistics.responses_connect, Ordering::Relaxed); - state - .statistics - .responses_announce - .fetch_add(statistics.responses_announce, Ordering::Relaxed); - state - .statistics - .responses_scrape - .fetch_add(statistics.responses_scrape, Ordering::Relaxed); - state - .statistics - .responses_error - .fetch_add(statistics.responses_error, Ordering::Relaxed); - state - .statistics - .response_peers - .fetch_add(statistics.response_peers, Ordering::Relaxed); - - *statistics = SocketWorkerLocalStatistics::default(); + fn create_random_request( + &mut self, + transaction_id: TransactionId, + torrent_peer: &TorrentPeer, + ) -> Request { + const ITEMS: [RequestType; 3] = [ + RequestType::Announce, + RequestType::Connect, + RequestType::Scrape, + ]; + + let weights = [ + self.config.requests.weight_announce as u32, + self.config.requests.weight_connect as u32, + self.config.requests.weight_scrape as u32, + ]; + + let dist = WeightedIndex::new(weights).expect("random request weighted index"); + + match ITEMS[dist.sample(&mut self.rng)] { + RequestType::Announce => self.create_announce_request(torrent_peer, transaction_id), + RequestType::Connect => (ConnectRequest { transaction_id }).into(), + RequestType::Scrape => self.create_scrape_request(torrent_peer, transaction_id), + } + } + + fn create_announce_request( + &mut self, + torrent_peer: &TorrentPeer, + transaction_id: TransactionId, + ) -> Request { + let (event, bytes_left) = { + if self + .rng + .gen_bool(self.config.requests.peer_seeder_probability) + { + (AnnounceEvent::Completed, NumberOfBytes::new(0)) + } else { + (AnnounceEvent::Started, NumberOfBytes::new(50)) + } + }; + + (AnnounceRequest { + connection_id: torrent_peer.connection_id, + action_placeholder: Default::default(), + transaction_id, + info_hash: torrent_peer.info_hash, + peer_id: torrent_peer.peer_id, + bytes_downloaded: NumberOfBytes::new(50), + bytes_uploaded: NumberOfBytes::new(50), + bytes_left, + event: event.into(), + ip_address: Ipv4AddrBytes([0; 4]), + key: PeerKey::new(0), + peers_wanted: NumberOfPeers::new(self.config.requests.announce_peers_wanted), + port: torrent_peer.port, + }) + .into() + } + + fn create_scrape_request( + &self, + torrent_peer: &TorrentPeer, + transaction_id: TransactionId, + ) -> Request { + let indeces = &torrent_peer.scrape_hash_indices; + + let mut scape_hashes = Vec::with_capacity(indeces.len()); + + for i in indeces.iter() { + scape_hashes.push(self.shared_state.info_hashes[*i].to_owned()) + } + + (ScrapeRequest { + connection_id: torrent_peer.connection_id, + transaction_id, + info_hashes: scape_hashes, + }) + .into() + } + + fn random_info_hash_index(&mut self) -> usize { + gamma_usize( + &mut self.rng, + self.gamma, + &self.config.requests.number_of_torrents - 1, + ) + } + + fn send_request(&mut self, request: Request) { + let mut cursor = Cursor::new(self.buffer); + + match request.write(&mut cursor) { + Ok(()) => { + let position = cursor.position() as usize; + let inner = cursor.get_ref(); + + match self.socket.send(&inner[..position]) { + Ok(_) => { + self.statistics.requests += 1; + } + Err(err) => { + eprintln!("Couldn't send packet: {:?}", err); + } + } + } + Err(err) => { + eprintln!("request_to_bytes err: {}", err); + } + } + } + + fn update_shared_statistics(&mut self) { + self.shared_state + .statistics + .requests + .fetch_add(self.statistics.requests, Ordering::Relaxed); + self.shared_state + .statistics + .responses_connect + .fetch_add(self.statistics.responses_connect, Ordering::Relaxed); + self.shared_state + .statistics + .responses_announce + .fetch_add(self.statistics.responses_announce, Ordering::Relaxed); + self.shared_state + .statistics + .responses_scrape + .fetch_add(self.statistics.responses_scrape, Ordering::Relaxed); + self.shared_state + .statistics + .responses_error + .fetch_add(self.statistics.responses_error, Ordering::Relaxed); + self.shared_state + .statistics + .response_peers + .fetch_add(self.statistics.response_peers, Ordering::Relaxed); + + self.statistics = SocketWorkerLocalStatistics::default(); + } } fn create_socket(config: &Config, addr: SocketAddr) -> ::std::net::UdpSocket { diff --git a/crates/udp_load_test/src/worker/request_gen.rs b/crates/udp_load_test/src/worker/request_gen.rs deleted file mode 100644 index 6ce23556..00000000 --- a/crates/udp_load_test/src/worker/request_gen.rs +++ /dev/null @@ -1,218 +0,0 @@ -use std::sync::Arc; - -use rand::distributions::WeightedIndex; -use rand::prelude::*; -use rand_distr::Gamma; - -use aquatic_udp_protocol::*; - -use crate::common::*; -use crate::config::Config; -use crate::utils::*; - -pub fn process_response( - rng: &mut impl Rng, - gamma: Gamma, - info_hashes: &Arc<[InfoHash]>, - config: &Config, - torrent_peers: &mut TorrentPeerMap, - response: Response, -) -> Option { - match response { - Response::Connect(r) => { - // Fetch the torrent peer or create it if is doesn't exists. Update - // the connection id if fetched. Create a request and move the - // torrent peer appropriately. - - let torrent_peer = torrent_peers - .remove(&r.transaction_id) - .map(|mut torrent_peer| { - torrent_peer.connection_id = r.connection_id; - - torrent_peer - }) - .unwrap_or_else(|| { - create_torrent_peer(config, rng, gamma, info_hashes, r.connection_id) - }); - - let new_transaction_id = generate_transaction_id(rng); - - let request = - create_random_request(config, rng, info_hashes, new_transaction_id, &torrent_peer); - - torrent_peers.insert(new_transaction_id, torrent_peer); - - Some(request) - } - Response::AnnounceIpv4(r) => if_torrent_peer_move_and_create_random_request( - config, - rng, - info_hashes, - torrent_peers, - r.fixed.transaction_id, - ), - Response::AnnounceIpv6(r) => if_torrent_peer_move_and_create_random_request( - config, - rng, - info_hashes, - torrent_peers, - r.fixed.transaction_id, - ), - Response::Scrape(r) => if_torrent_peer_move_and_create_random_request( - config, - rng, - info_hashes, - torrent_peers, - r.transaction_id, - ), - Response::Error(r) => { - if !r.message.to_lowercase().contains("connection") { - eprintln!( - "Received error response which didn't contain the word 'connection': {}", - r.message - ); - } - - if let Some(torrent_peer) = torrent_peers.remove(&r.transaction_id) { - let new_transaction_id = generate_transaction_id(rng); - - torrent_peers.insert(new_transaction_id, torrent_peer); - - Some(create_connect_request(new_transaction_id)) - } else { - Some(create_connect_request(generate_transaction_id(rng))) - } - } - } -} - -fn if_torrent_peer_move_and_create_random_request( - config: &Config, - rng: &mut impl Rng, - info_hashes: &Arc<[InfoHash]>, - torrent_peers: &mut TorrentPeerMap, - transaction_id: TransactionId, -) -> Option { - if let Some(torrent_peer) = torrent_peers.remove(&transaction_id) { - let new_transaction_id = generate_transaction_id(rng); - - let request = - create_random_request(config, rng, info_hashes, new_transaction_id, &torrent_peer); - - torrent_peers.insert(new_transaction_id, torrent_peer); - - Some(request) - } else { - None - } -} - -fn create_random_request( - config: &Config, - rng: &mut impl Rng, - info_hashes: &Arc<[InfoHash]>, - transaction_id: TransactionId, - torrent_peer: &TorrentPeer, -) -> Request { - const ITEMS: [RequestType; 3] = [ - RequestType::Announce, - RequestType::Connect, - RequestType::Scrape, - ]; - - let weights = [ - config.requests.weight_announce as u32, - config.requests.weight_connect as u32, - config.requests.weight_scrape as u32, - ]; - - let dist = WeightedIndex::new(weights).expect("random request weighted index"); - - match ITEMS[dist.sample(rng)] { - RequestType::Announce => create_announce_request(config, rng, torrent_peer, transaction_id), - RequestType::Connect => create_connect_request(transaction_id), - RequestType::Scrape => create_scrape_request(&info_hashes, torrent_peer, transaction_id), - } -} - -fn create_announce_request( - config: &Config, - rng: &mut impl Rng, - torrent_peer: &TorrentPeer, - transaction_id: TransactionId, -) -> Request { - let (event, bytes_left) = { - if rng.gen_bool(config.requests.peer_seeder_probability) { - (AnnounceEvent::Completed, NumberOfBytes::new(0)) - } else { - (AnnounceEvent::Started, NumberOfBytes::new(50)) - } - }; - - (AnnounceRequest { - connection_id: torrent_peer.connection_id, - action_placeholder: Default::default(), - transaction_id, - info_hash: torrent_peer.info_hash, - peer_id: torrent_peer.peer_id, - bytes_downloaded: NumberOfBytes::new(50), - bytes_uploaded: NumberOfBytes::new(50), - bytes_left, - event: event.into(), - ip_address: Ipv4AddrBytes([0; 4]), - key: PeerKey::new(0), - peers_wanted: NumberOfPeers::new(config.requests.announce_peers_wanted), - port: torrent_peer.port, - }) - .into() -} - -fn create_scrape_request( - info_hashes: &Arc<[InfoHash]>, - torrent_peer: &TorrentPeer, - transaction_id: TransactionId, -) -> Request { - let indeces = &torrent_peer.scrape_hash_indices; - - let mut scape_hashes = Vec::with_capacity(indeces.len()); - - for i in indeces.iter() { - scape_hashes.push(info_hashes[*i].to_owned()) - } - - (ScrapeRequest { - connection_id: torrent_peer.connection_id, - transaction_id, - info_hashes: scape_hashes, - }) - .into() -} - -fn create_torrent_peer( - config: &Config, - rng: &mut impl Rng, - gamma: Gamma, - info_hashes: &Arc<[InfoHash]>, - connection_id: ConnectionId, -) -> TorrentPeer { - let num_scrape_hashes = rng.gen_range(1..config.requests.scrape_max_torrents); - - let scrape_hash_indices = (0..num_scrape_hashes) - .map(|_| select_info_hash_index(config, rng, gamma)) - .collect::>() - .into_boxed_slice(); - - let info_hash_index = select_info_hash_index(config, rng, gamma); - - TorrentPeer { - info_hash: info_hashes[info_hash_index], - scrape_hash_indices, - connection_id, - peer_id: generate_peer_id(), - port: Port::new(rng.gen()), - } -} - -fn select_info_hash_index(config: &Config, rng: &mut impl Rng, gamma: Gamma) -> usize { - gamma_usize(rng, gamma, config.requests.number_of_torrents - 1) -}