Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement remaining proxy tests #916

Merged
merged 4 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,19 @@ jobs:
submodules: true
- run: cargo run -p proto-gen -- validate

test:
name: Test
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
- name: Install nextest
run: curl -LsSf https://get.nexte.st/latest/linux | tar zxf - -C ${CARGO_HOME:-~/.cargo}/bin
- name: Build
run: cargo build -p qt -p quilkin --tests
- run: cargo nextest run -p qt -p quilkin

build:
name: Build
strategy:
Expand Down
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ tokio-stream = { version = "0.1.14", features = ["net", "sync"] }
tonic = "0.10.2"
tracing.workspace = true
tracing-futures = { version = "0.2.5", features = ["futures-03"] }
tracing-subscriber = { version = "0.3.17", features = ["json", "env-filter"] }
tracing-subscriber = { workspace = true, features = ["json", "env-filter"] }
tryhard = "0.5.1"
url = { version = "2.4.1", features = ["serde"] }
uuid = { version = "1.4.1", default-features = false, features = ["v4"] }
Expand Down Expand Up @@ -186,3 +186,4 @@ tokio = { version = "1.32.0", features = [
] }
tempfile = "3.8.0"
tracing = "0.1.37"
tracing-subscriber = "0.3"
2 changes: 1 addition & 1 deletion src/codec/qcmp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl Measurement for QcmpMeasurement {
pub fn spawn(socket: socket2::Socket, mut shutdown_rx: crate::ShutdownRx) {
let port = crate::net::socket_port(&socket);

uring_spawn!(async move {
uring_spawn!(tracing::debug_span!("qcmp"), async move {
// Initialize a buffer for the UDP packet. We use the maximum size of a UDP
// packet, which is the maximum value of 16 a bit integer.
let mut input_buf = vec![0; 1 << 16];
Expand Down
7 changes: 4 additions & 3 deletions src/components/proxy.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
mod packet_router;
pub mod packet_router;
mod sessions;

use super::RunArgs;
use crate::{
net::{maxmind_db::IpNetEntry, xds::ResourceType},
pool::PoolBuffer,
};
use sessions::SessionPool;
pub use sessions::SessionPool;
use std::{
net::SocketAddr,
sync::{
Expand Down Expand Up @@ -208,7 +208,8 @@ impl Proxy {
&sessions,
upstream_receiver,
buffer_pool,
)?;
)
.await?;

crate::codec::qcmp::spawn(self.qcmp, shutdown_rx.clone());
crate::net::phoenix::spawn(self.phoenix, config.clone(), shutdown_rx.clone())?;
Expand Down
53 changes: 31 additions & 22 deletions src/components/proxy/packet_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ struct DownstreamPacket {

/// Represents the required arguments to run a worker task that
/// processes packets received downstream.
pub(crate) struct DownstreamReceiveWorkerConfig {
pub struct DownstreamReceiveWorkerConfig {
/// ID of the worker.
pub worker_id: usize,
/// Socket with reused port from which the worker receives packets.
Expand All @@ -34,7 +34,7 @@ pub(crate) struct DownstreamReceiveWorkerConfig {
}

impl DownstreamReceiveWorkerConfig {
pub fn spawn(self) -> Arc<tokio::sync::Notify> {
pub async fn spawn(self) -> eyre::Result<Arc<tokio::sync::Notify>> {
let Self {
worker_id,
upstream_receiver,
Expand All @@ -48,17 +48,22 @@ impl DownstreamReceiveWorkerConfig {
let notify = Arc::new(tokio::sync::Notify::new());
let is_ready = notify.clone();

uring_spawn!(async move {
// Initialize a buffer for the UDP packet. We use the maximum size of a UDP
// packet, which is the maximum value of 16 a bit integer.
let thread_span = tracing::debug_span!("receiver", id = worker_id).or_current();

let worker = uring_spawn!(thread_span, async move {
let mut last_received_at = None;
let socket = crate::net::DualStackLocalSocket::new(port)
.unwrap()
.make_refcnt();

tracing::trace!(port, "bound worker");
let send_socket = socket.clone();

let upstream = tracing::debug_span!("upstream").or_current();

uring_inner_spawn!(async move {
is_ready.notify_one();

loop {
tokio::select! {
result = upstream_receiver.recv() => {
Expand Down Expand Up @@ -103,12 +108,15 @@ impl DownstreamReceiveWorkerConfig {
}
}
}
});
}.instrument(upstream));

loop {
// Initialize a buffer for the UDP packet. We use the maximum size of a UDP
// packet, which is the maximum value of 16 a bit integer.
let buffer = buffer_pool.clone().alloc();

let (result, contents) = socket.recv_from(buffer).await;

match result {
Ok((_size, mut source)) => {
source.set_ip(source.ip().to_canonical());
Expand Down Expand Up @@ -146,7 +154,9 @@ impl DownstreamReceiveWorkerConfig {
}
});

notify
use eyre::WrapErr as _;
worker.await.context("failed to spawn receiver task")??;
Ok(notify)
}

#[inline]
Expand Down Expand Up @@ -237,7 +247,7 @@ impl DownstreamReceiveWorkerConfig {
/// This function also spawns the set of worker tasks responsible for consuming packets
/// off the aforementioned queue and processing them through the filter chain and session
/// pipeline.
pub(super) fn spawn_receivers(
pub async fn spawn_receivers(
config: Arc<Config>,
socket: socket2::Socket,
num_workers: usize,
Expand All @@ -249,21 +259,20 @@ pub(super) fn spawn_receivers(

let port = crate::net::socket_port(&socket);

let worker_notifications = (0..num_workers)
.map(|worker_id| {
let worker = DownstreamReceiveWorkerConfig {
worker_id,
upstream_receiver: upstream_receiver.clone(),
port,
config: config.clone(),
sessions: sessions.clone(),
error_sender: error_sender.clone(),
buffer_pool: buffer_pool.clone(),
};
let mut worker_notifications = Vec::with_capacity(num_workers);
for worker_id in 0..num_workers {
let worker = DownstreamReceiveWorkerConfig {
worker_id,
upstream_receiver: upstream_receiver.clone(),
port,
config: config.clone(),
sessions: sessions.clone(),
error_sender: error_sender.clone(),
buffer_pool: buffer_pool.clone(),
};

worker.spawn()
})
.collect();
worker_notifications.push(worker.spawn().await?);
}

tokio::spawn(async move {
let mut log_task = tokio::time::interval(std::time::Duration::from_secs(5));
Expand Down
2 changes: 1 addition & 1 deletion src/components/proxy/sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl SessionPool {

let pool = self.clone();

let initialised = uring_spawn!(async move {
let initialised = uring_spawn!(tracing::debug_span!("session pool"), async move {
let mut last_received_at = None;
let mut shutdown_rx = pool.shutdown_rx.clone();
cfg_if::cfg_if! {
Expand Down
12 changes: 9 additions & 3 deletions src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,32 @@

/// On linux spawns a io-uring runtime + thread, everywhere else spawns a regular tokio task.
macro_rules! uring_spawn {
($future:expr) => {{
($span:expr, $future:expr) => {{
let (tx, rx) = tokio::sync::oneshot::channel::<crate::Result<()>>();
use tracing::Instrument as _;

cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
let dispatcher = tracing::dispatcher::get_default(|d| d.clone());
std::thread::spawn(move || {
let _guard = tracing::dispatcher::set_default(&dispatcher);

match tokio_uring::Runtime::new(&tokio_uring::builder().entries(2048)) {
Ok(runtime) => {
let _ = tx.send(Ok(()));
runtime.block_on($future);
runtime.block_on($future.instrument($span));
}
Err(error) => {
let _ = tx.send(Err(error.into()));
}
};
});
} else {
use tracing::instrument::WithSubscriber as _;
tokio::spawn(async move {
let _ = tx.send(Ok(()));
$future.await
});
}.instrument($span).with_current_subscriber());
}
}
rx
Expand Down
2 changes: 1 addition & 1 deletion test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ socket2.workspace = true
tempfile.workspace = true
tokio = { workspace = true, features = ["macros"] }
tracing.workspace = true
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
10 changes: 10 additions & 0 deletions test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,16 @@ impl Sandbox {
.expect("operation timed out")
}

#[inline]
pub async fn maybe_timeout<F>(&self, ms: u64, fut: F) -> Option<F::Output>
where
F: std::future::Future,
{
tokio::time::timeout(std::time::Duration::from_millis(ms), fut)
.await
.ok()
}

/// Runs a future, expecting it to timeout instead of resolving, panics if
/// the future finishes before the timeout
#[inline]
Expand Down
Loading