From 0ba316cc1e81fd37baad74006f8bd7d89a986c1b Mon Sep 17 00:00:00 2001 From: Jake Shadle Date: Tue, 26 Mar 2024 13:52:39 +0100 Subject: [PATCH 1/4] Improve logging in tests Spawning an async task or thread would lose span information, and additionally would not capture at all if there was not a global dispatcher set, which is not the case in tests as they create their own dispatcher --- src/net.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/net.rs b/src/net.rs index 1f48faaac7..78fc373d60 100644 --- a/src/net.rs +++ b/src/net.rs @@ -16,15 +16,20 @@ /// On linux spawns a io-uring runtime + thread, everywhere else spawns a regular tokio task. macro_rules! uring_spawn { - ($future:expr) => {{ + ($span:expr, $future:expr) => {{ let (tx, rx) = tokio::sync::oneshot::channel::>(); + use tracing::Instrument as _; + cfg_if::cfg_if! { if #[cfg(target_os = "linux")] { + let dispatcher = tracing::dispatcher::get_default(|d| d.clone()); std::thread::spawn(move || { + let _guard = tracing::dispatcher::set_default(&dispatcher); + match tokio_uring::Runtime::new(&tokio_uring::builder().entries(2048)) { Ok(runtime) => { let _ = tx.send(Ok(())); - runtime.block_on($future); + runtime.block_on($future.instrument($span)); } Err(error) => { let _ = tx.send(Err(error.into())); @@ -35,7 +40,7 @@ macro_rules! uring_spawn { tokio::spawn(async move { let _ = tx.send(Ok(())); $future.await - }); + }.instrument($span).with_current_subscriber()); } } rx From a213f6a698dd07db88ffd3a75cea21d0af0c0a30 Mon Sep 17 00:00:00 2001 From: Jake Shadle Date: Tue, 26 Mar 2024 13:55:02 +0100 Subject: [PATCH 2/4] Implement reamining proxy tests --- Cargo.toml | 3 +- src/codec/qcmp.rs | 2 +- src/components/proxy.rs | 7 +- src/components/proxy/packet_router.rs | 53 +++--- src/components/proxy/sessions.rs | 2 +- test/Cargo.toml | 2 +- test/src/lib.rs | 10 ++ test/tests/proxy.rs | 233 ++++++++++++++------------ 8 files changed, 179 insertions(+), 133 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 54ed553e4b..953e97f708 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -104,7 +104,7 @@ tokio-stream = { version = "0.1.14", features = ["net", "sync"] } tonic = "0.10.2" tracing.workspace = true tracing-futures = { version = "0.2.5", features = ["futures-03"] } -tracing-subscriber = { version = "0.3.17", features = ["json", "env-filter"] } +tracing-subscriber = { workspace = true, features = ["json", "env-filter"] } tryhard = "0.5.1" url = { version = "2.4.1", features = ["serde"] } uuid = { version = "1.4.1", default-features = false, features = ["v4"] } @@ -186,3 +186,4 @@ tokio = { version = "1.32.0", features = [ ] } tempfile = "3.8.0" tracing = "0.1.37" +tracing-subscriber = "0.3" diff --git a/src/codec/qcmp.rs b/src/codec/qcmp.rs index 3f1eaaeb5d..908410ecf9 100644 --- a/src/codec/qcmp.rs +++ b/src/codec/qcmp.rs @@ -75,7 +75,7 @@ impl Measurement for QcmpMeasurement { pub fn spawn(socket: socket2::Socket, mut shutdown_rx: crate::ShutdownRx) { let port = crate::net::socket_port(&socket); - uring_spawn!(async move { + uring_spawn!(tracing::debug_span!("qcmp"), async move { // Initialize a buffer for the UDP packet. We use the maximum size of a UDP // packet, which is the maximum value of 16 a bit integer. let mut input_buf = vec![0; 1 << 16]; diff --git a/src/components/proxy.rs b/src/components/proxy.rs index f7d1b386f6..8c18950b07 100644 --- a/src/components/proxy.rs +++ b/src/components/proxy.rs @@ -1,4 +1,4 @@ -mod packet_router; +pub mod packet_router; mod sessions; use super::RunArgs; @@ -6,7 +6,7 @@ use crate::{ net::{maxmind_db::IpNetEntry, xds::ResourceType}, pool::PoolBuffer, }; -use sessions::SessionPool; +pub use sessions::SessionPool; use std::{ net::SocketAddr, sync::{ @@ -208,7 +208,8 @@ impl Proxy { &sessions, upstream_receiver, buffer_pool, - )?; + ) + .await?; crate::codec::qcmp::spawn(self.qcmp, shutdown_rx.clone()); crate::net::phoenix::spawn(self.phoenix, config.clone(), shutdown_rx.clone())?; diff --git a/src/components/proxy/packet_router.rs b/src/components/proxy/packet_router.rs index e48eda20f1..16da79aa93 100644 --- a/src/components/proxy/packet_router.rs +++ b/src/components/proxy/packet_router.rs @@ -21,7 +21,7 @@ struct DownstreamPacket { /// Represents the required arguments to run a worker task that /// processes packets received downstream. -pub(crate) struct DownstreamReceiveWorkerConfig { +pub struct DownstreamReceiveWorkerConfig { /// ID of the worker. pub worker_id: usize, /// Socket with reused port from which the worker receives packets. @@ -34,7 +34,7 @@ pub(crate) struct DownstreamReceiveWorkerConfig { } impl DownstreamReceiveWorkerConfig { - pub fn spawn(self) -> Arc { + pub async fn spawn(self) -> eyre::Result> { let Self { worker_id, upstream_receiver, @@ -48,17 +48,22 @@ impl DownstreamReceiveWorkerConfig { let notify = Arc::new(tokio::sync::Notify::new()); let is_ready = notify.clone(); - uring_spawn!(async move { - // Initialize a buffer for the UDP packet. We use the maximum size of a UDP - // packet, which is the maximum value of 16 a bit integer. + let thread_span = tracing::debug_span!("receiver", id = worker_id).or_current(); + + let worker = uring_spawn!(thread_span, async move { let mut last_received_at = None; let socket = crate::net::DualStackLocalSocket::new(port) .unwrap() .make_refcnt(); + + tracing::trace!(port, "bound worker"); let send_socket = socket.clone(); + let upstream = tracing::debug_span!("upstream").or_current(); + uring_inner_spawn!(async move { is_ready.notify_one(); + loop { tokio::select! { result = upstream_receiver.recv() => { @@ -103,12 +108,15 @@ impl DownstreamReceiveWorkerConfig { } } } - }); + }.instrument(upstream)); loop { + // Initialize a buffer for the UDP packet. We use the maximum size of a UDP + // packet, which is the maximum value of 16 a bit integer. let buffer = buffer_pool.clone().alloc(); let (result, contents) = socket.recv_from(buffer).await; + match result { Ok((_size, mut source)) => { source.set_ip(source.ip().to_canonical()); @@ -146,7 +154,9 @@ impl DownstreamReceiveWorkerConfig { } }); - notify + use eyre::WrapErr as _; + worker.await.context("failed to spawn receiver task")??; + Ok(notify) } #[inline] @@ -237,7 +247,7 @@ impl DownstreamReceiveWorkerConfig { /// This function also spawns the set of worker tasks responsible for consuming packets /// off the aforementioned queue and processing them through the filter chain and session /// pipeline. -pub(super) fn spawn_receivers( +pub async fn spawn_receivers( config: Arc, socket: socket2::Socket, num_workers: usize, @@ -249,21 +259,20 @@ pub(super) fn spawn_receivers( let port = crate::net::socket_port(&socket); - let worker_notifications = (0..num_workers) - .map(|worker_id| { - let worker = DownstreamReceiveWorkerConfig { - worker_id, - upstream_receiver: upstream_receiver.clone(), - port, - config: config.clone(), - sessions: sessions.clone(), - error_sender: error_sender.clone(), - buffer_pool: buffer_pool.clone(), - }; + let mut worker_notifications = Vec::with_capacity(num_workers); + for worker_id in 0..num_workers { + let worker = DownstreamReceiveWorkerConfig { + worker_id, + upstream_receiver: upstream_receiver.clone(), + port, + config: config.clone(), + sessions: sessions.clone(), + error_sender: error_sender.clone(), + buffer_pool: buffer_pool.clone(), + }; - worker.spawn() - }) - .collect(); + worker_notifications.push(worker.spawn().await?); + } tokio::spawn(async move { let mut log_task = tokio::time::interval(std::time::Duration::from_secs(5)); diff --git a/src/components/proxy/sessions.rs b/src/components/proxy/sessions.rs index 553d073162..1d47217405 100644 --- a/src/components/proxy/sessions.rs +++ b/src/components/proxy/sessions.rs @@ -113,7 +113,7 @@ impl SessionPool { let pool = self.clone(); - let initialised = uring_spawn!(async move { + let initialised = uring_spawn!(tracing::debug_span!("session pool"), async move { let mut last_received_at = None; let mut shutdown_rx = pool.shutdown_rx.clone(); cfg_if::cfg_if! { diff --git a/test/Cargo.toml b/test/Cargo.toml index 342f34e160..6ec6af36ea 100644 --- a/test/Cargo.toml +++ b/test/Cargo.toml @@ -14,4 +14,4 @@ socket2.workspace = true tempfile.workspace = true tokio = { workspace = true, features = ["macros"] } tracing.workspace = true -tracing-subscriber = { version = "0.3", features = ["env-filter"] } +tracing-subscriber = { workspace = true, features = ["env-filter"] } diff --git a/test/src/lib.rs b/test/src/lib.rs index 0858f1cc31..8a55a71dbc 100644 --- a/test/src/lib.rs +++ b/test/src/lib.rs @@ -671,6 +671,16 @@ impl Sandbox { .expect("operation timed out") } + #[inline] + pub async fn maybe_timeout(&self, ms: u64, fut: F) -> Option + where + F: std::future::Future, + { + tokio::time::timeout(std::time::Duration::from_millis(ms), fut) + .await + .ok() + } + /// Runs a future, expecting it to timeout instead of resolving, panics if /// the future finishes before the timeout #[inline] diff --git a/test/tests/proxy.rs b/test/tests/proxy.rs index 49d5f52481..d8f77093ea 100644 --- a/test/tests/proxy.rs +++ b/test/tests/proxy.rs @@ -1,7 +1,8 @@ use qt::*; use quilkin::test::TestConfig; +use tracing::Instrument as _; -trace_test!(run_server, { +trace_test!(server, { let mut sc = qt::sandbox_config!(); sc.push("server1", ServerPailConfig::default(), &[]); @@ -35,7 +36,7 @@ trace_test!(run_server, { ); }); -trace_test!(run_client, { +trace_test!(client, { let mut sc = qt::sandbox_config!(); sc.push("dest", ServerPailConfig::default(), &[]); @@ -53,7 +54,7 @@ trace_test!(run_client, { assert_eq!(msg, sb.timeout(100, dest_rx.recv()).await.unwrap(),); }); -trace_test!(run_with_filter, { +trace_test!(with_filter, { let mut sc = qt::sandbox_config!(); sc.push("server", ServerPailConfig::default(), &[]); @@ -79,104 +80,128 @@ trace_test!(run_with_filter, { assert!(result.starts_with(&format!("{msg}:odr:[::1]:"))); }); -// trace_test!(spawn_downstream_receive_workers, { -// let mut sc = qt::sandbox_config!(); - -// sc.push("server", ServerPailConfig::default(), &[]); -// let mut sb = sc.spinup().await; - -// let (mut packet_rx, endpoint) = sb.server("server"); - -// let (error_sender, mut error_receiver) = tokio::sync::mpsc::unbounded_channel(); - -// tokio::task::spawn(async move { -// while let Some(error) = error_receiver.recv().await { -// tracing::error!(%error, "error sent from DownstreamReceiverWorker"); -// } -// }); - -// let config = std::sync::Arc::new(quilkin::Config::default_non_agent()); -// config -// .clusters -// .modify(|clusters| clusters.insert_default([endpoint.into()].into())); -// let (tx, rx) = async_channel::unbounded(); -// let (_shutdown_tx, shutdown_rx) = -// quilkin::make_shutdown_channel(quilkin::ShutdownKind::Testing); - -// let socket = sb.client(); -// let addr = socket.local_addr().unwrap(); - -// // we'll test a single DownstreamReceiveWorkerConfig -// let ready = quilkin::components::proxy::packet_router::DownstreamReceiveWorkerConfig { -// worker_id: 1, -// port: addr.port(), -// upstream_receiver: rx.clone(), -// config: config.clone(), -// error_sender, -// buffer_pool: quilkin::test::BUFFER_POOL.clone(), -// sessions: quilkin::components::proxy::SessionPool::new( -// config, -// tx, -// BUFFER_POOL.clone(), -// shutdown_rx, -// ), -// } -// .spawn(); - -// sb.timeout(500, ready.notified()).await; - -// let msg = "hello-downstream"; - -// socket.send_to(msg.as_bytes(), addr).await.unwrap(); - -// assert_eq!( -// msg, -// sb.timeout(100, packet_rx.recv()) -// .await -// .expect("should receive a packet") -// ); -// }); - -// #[tokio::test] -// async fn run_recv_from() { -// let t = TestHelper::default(); - -// let msg = "hello"; -// let endpoint = t.open_socket_and_recv_single_packet().await; -// let local_addr = available_addr(&AddressType::Random).await; -// let proxy = crate::cli::Proxy { -// port: local_addr.port(), -// qcmp_port: 0, -// ..<_>::default() -// }; - -// let config = Arc::new(crate::Config::default_non_agent()); -// config.clusters.modify(|clusters| { -// clusters.insert_default( -// [crate::net::endpoint::Endpoint::from( -// endpoint.socket.local_addr().unwrap(), -// )] -// .into(), -// ) -// }); - -// let (tx, rx) = async_channel::unbounded(); -// let (_shutdown_tx, shutdown_rx) = crate::make_shutdown_channel(crate::ShutdownKind::Testing); - -// let sessions = SessionPool::new(config.clone(), tx, BUFFER_POOL.clone(), shutdown_rx); - -// proxy -// .run_recv_from(&config, proxy.port, 1, &sessions, rx, BUFFER_POOL.clone()) -// .unwrap(); -// tokio::time::sleep(Duration::from_millis(500)).await; - -// let socket = create_socket().await; -// socket.send_to(msg.as_bytes(), &local_addr).await.unwrap(); -// assert_eq!( -// msg, -// timeout(Duration::from_secs(1), endpoint.packet_rx) -// .await -// .expect("should receive a packet") -// .unwrap() -// ); -// } +trace_test!(uring_receiver, { + let mut sc = qt::sandbox_config!(); + + sc.push("server", ServerPailConfig::default(), &[]); + let mut sb = sc.spinup().await; + + let (mut packet_rx, endpoint) = sb.server("server"); + + let (error_sender, mut error_receiver) = tokio::sync::mpsc::unbounded_channel(); + + tokio::task::spawn( + async move { + while let Some(error) = error_receiver.recv().await { + tracing::error!(%error, "error sent from DownstreamReceiverWorker"); + } + } + .instrument(tracing::debug_span!("error rx")), + ); + + let config = std::sync::Arc::new(quilkin::Config::default_non_agent()); + config + .clusters + .modify(|clusters| clusters.insert_default([endpoint.into()].into())); + let (tx, rx) = async_channel::unbounded(); + let (_shutdown_tx, shutdown_rx) = + quilkin::make_shutdown_channel(quilkin::ShutdownKind::Testing); + + let socket = sb.client(); + let (ws, addr) = sb.socket(); + + // we'll test a single DownstreamReceiveWorkerConfig + let ready = quilkin::components::proxy::packet_router::DownstreamReceiveWorkerConfig { + worker_id: 1, + port: addr.port(), + upstream_receiver: rx.clone(), + config: config.clone(), + error_sender, + buffer_pool: quilkin::test::BUFFER_POOL.clone(), + sessions: quilkin::components::proxy::SessionPool::new( + config, + tx, + BUFFER_POOL.clone(), + shutdown_rx, + ), + } + .spawn() + .await + .expect("failed to spawn task"); + + // Drop the socket, otherwise it can + drop(ws); + + sb.timeout(500, ready.notified()).await; + + let msg = "hello-downstream"; + tracing::debug!("sending packet"); + socket.send_to(msg.as_bytes(), addr).await.unwrap(); + assert_eq!(msg, sb.timeout(200, packet_rx.recv()).await.unwrap()); +}); + +trace_test!(recv_from, { + let mut sc = qt::sandbox_config!(); + + sc.push("server", ServerPailConfig::default(), &[]); + let mut sb = sc.spinup().await; + + let (mut packet_rx, endpoint) = sb.server("server"); + + let config = std::sync::Arc::new(quilkin::Config::default_non_agent()); + config + .clusters + .modify(|clusters| clusters.insert_default([endpoint.into()].into())); + + let (tx, rx) = async_channel::unbounded(); + let (_shutdown_tx, shutdown_rx) = + quilkin::make_shutdown_channel(quilkin::ShutdownKind::Testing); + + let sessions = quilkin::components::proxy::SessionPool::new( + config.clone(), + tx, + BUFFER_POOL.clone(), + shutdown_rx, + ); + + const WORKER_COUNT: usize = 3; + + let (socket, addr) = sb.socket(); + let workers = quilkin::components::proxy::packet_router::spawn_receivers( + config, + socket, + WORKER_COUNT, + &sessions, + rx, + BUFFER_POOL.clone(), + ) + .await + .unwrap(); + + for wn in workers { + sb.timeout(200, wn.notified()).await; + } + + let socket = std::sync::Arc::new(sb.client()); + let msg = "recv-from"; + + let mut tasks = tokio::task::JoinSet::new(); + + for _ in 0..WORKER_COUNT { + let ss = socket.clone(); + tasks.spawn(async move { ss.send_to(msg.as_bytes(), addr).await.unwrap() }); + } + + while let Some(res) = tasks.join_next().await { + assert_eq!(res.unwrap(), msg.len()); + } + + for _ in 0..WORKER_COUNT { + assert_eq!( + msg, + sb.timeout(20, packet_rx.recv()) + .await + .expect("should receive a packet") + ); + } +}); From c1194549a134f8ebd8106ebd71810cec9dcadb55 Mon Sep 17 00:00:00 2001 From: Jake Shadle Date: Tue, 26 Mar 2024 14:08:27 +0100 Subject: [PATCH 3/4] Fix non-linux --- src/net.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/net.rs b/src/net.rs index 78fc373d60..f5fd36c430 100644 --- a/src/net.rs +++ b/src/net.rs @@ -37,6 +37,7 @@ macro_rules! uring_spawn { }; }); } else { + use tracing::instrument::WithSubscriber as _; tokio::spawn(async move { let _ = tx.send(Ok(())); $future.await From aeed42ff07c56af87953c383ee2d5e154fbc4af8 Mon Sep 17 00:00:00 2001 From: Jake Shadle Date: Tue, 26 Mar 2024 14:11:19 +0100 Subject: [PATCH 4/4] Run linux tests in github actions for quicker feedback --- .github/workflows/ci.yml | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 91083e69f1..94faf2e372 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -58,6 +58,19 @@ jobs: submodules: true - run: cargo run -p proto-gen -- validate + test: + name: Test + runs-on: ubuntu-22.04 + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + - uses: Swatinem/rust-cache@v2 + - name: Install nextest + run: curl -LsSf https://get.nexte.st/latest/linux | tar zxf - -C ${CARGO_HOME:-~/.cargo}/bin + - name: Build + run: cargo build -p qt -p quilkin --tests + - run: cargo nextest run -p qt -p quilkin + build: name: Build strategy: