diff --git a/examples/wrk-bench.rs b/examples/wrk-bench.rs new file mode 100644 index 00000000..4a76ed62 --- /dev/null +++ b/examples/wrk-bench.rs @@ -0,0 +1,41 @@ +use std::io; +use std::rc::Rc; +use tokio::task::JoinHandle; + +pub const RESPONSE: &'static [u8] = + b"HTTP/1.1 200 OK\nContent-Type: text/plain\nContent-Length: 12\n\nHello world!"; + +pub const ADDRESS: &'static str = "127.0.0.1:8080"; + +fn main() -> io::Result<()> { + tokio_uring::start(async { + let mut tasks = Vec::with_capacity(16); + let listener = Rc::new(tokio_uring::net::TcpListener::bind( + ADDRESS.parse().unwrap(), + )?); + + for _ in 0..16 { + let listener = listener.clone(); + let task: JoinHandle> = tokio::task::spawn_local(async move { + loop { + let (stream, _) = listener.accept().await?; + + tokio_uring::spawn(async move { + let (result, _) = stream.write(RESPONSE).await; + + if let Err(err) = result { + eprintln!("Client connection failed: {}", err); + } + }); + } + }); + tasks.push(task); + } + + for t in tasks { + t.await.unwrap()?; + } + + Ok(()) + }) +} diff --git a/src/driver/mod.rs b/src/driver/mod.rs index 3505cdf7..59892732 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -44,19 +44,19 @@ pub(crate) struct Driver { type Handle = Rc>; -struct Inner { +pub(crate) struct Inner { /// In-flight operations ops: Ops, /// IoUring bindings - uring: IoUring, + pub(crate) uring: IoUring, } // When dropping the driver, all in-flight operations must have completed. This // type wraps the slab and ensures that, on drop, the slab is empty. struct Ops(Slab); -scoped_thread_local!(static CURRENT: Rc>); +scoped_thread_local!(pub(crate) static CURRENT: Rc>); impl Driver { pub(crate) fn new() -> io::Result { @@ -112,7 +112,7 @@ impl Inner { } } - fn submit(&mut self) -> io::Result<()> { + pub(crate) fn submit(&mut self) -> io::Result<()> { loop { match self.uring.submit() { Ok(_) => { diff --git a/src/driver/op.rs b/src/driver/op.rs index 266a953b..d244080e 100644 --- a/src/driver/op.rs +++ b/src/driver/op.rs @@ -88,12 +88,6 @@ impl Op { } } - // Submit the new operation. At this point, the operation has been - // pushed onto the queue and the tail pointer has been updated, so - // the submission entry is visible to the kernel. If there is an - // error here (probably EAGAIN), we still return the operation. A - // future `io_uring_enter` will fully submit the event. - let _ = inner.submit(); Ok(op) }) } diff --git a/src/runtime.rs b/src/runtime.rs index d88c3972..4b05ad41 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -1,4 +1,5 @@ -use crate::driver::Driver; +use crate::driver::{Driver, CURRENT}; +use std::cell::RefCell; use std::future::Future; use std::io; @@ -51,6 +52,11 @@ pub fn spawn(task: T) -> tokio::task::JoinHand impl Runtime { pub(crate) fn new() -> io::Result { let rt = tokio::runtime::Builder::new_current_thread() + .on_thread_park(|| { + CURRENT.with(|x| { + let _ = RefCell::borrow_mut(x).uring.submit(); + }); + }) .enable_all() .build()?;