Skip to content

Commit

Permalink
driver: batch submit requests and add benchmark (tokio-rs#78)
Browse files Browse the repository at this point in the history
Batch submit requests for fun and profit
  • Loading branch information
Noah-Kennedy authored Apr 20, 2022
1 parent 6d3ef61 commit 0fdc0c2
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 11 deletions.
41 changes: 41 additions & 0 deletions examples/wrk-bench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use std::io;
use std::rc::Rc;
use tokio::task::JoinHandle;

pub const RESPONSE: &'static [u8] =
b"HTTP/1.1 200 OK\nContent-Type: text/plain\nContent-Length: 12\n\nHello world!";

pub const ADDRESS: &'static str = "127.0.0.1:8080";

fn main() -> io::Result<()> {
tokio_uring::start(async {
let mut tasks = Vec::with_capacity(16);
let listener = Rc::new(tokio_uring::net::TcpListener::bind(
ADDRESS.parse().unwrap(),
)?);

for _ in 0..16 {
let listener = listener.clone();
let task: JoinHandle<io::Result<()>> = tokio::task::spawn_local(async move {
loop {
let (stream, _) = listener.accept().await?;

tokio_uring::spawn(async move {
let (result, _) = stream.write(RESPONSE).await;

if let Err(err) = result {
eprintln!("Client connection failed: {}", err);
}
});
}
});
tasks.push(task);
}

for t in tasks {
t.await.unwrap()?;
}

Ok(())
})
}
8 changes: 4 additions & 4 deletions src/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,19 @@ pub(crate) struct Driver {

type Handle = Rc<RefCell<Inner>>;

struct Inner {
pub(crate) struct Inner {
/// In-flight operations
ops: Ops,

/// IoUring bindings
uring: IoUring,
pub(crate) uring: IoUring,
}

// When dropping the driver, all in-flight operations must have completed. This
// type wraps the slab and ensures that, on drop, the slab is empty.
struct Ops(Slab<op::Lifecycle>);

scoped_thread_local!(static CURRENT: Rc<RefCell<Inner>>);
scoped_thread_local!(pub(crate) static CURRENT: Rc<RefCell<Inner>>);

impl Driver {
pub(crate) fn new() -> io::Result<Driver> {
Expand Down Expand Up @@ -112,7 +112,7 @@ impl Inner {
}
}

fn submit(&mut self) -> io::Result<()> {
pub(crate) fn submit(&mut self) -> io::Result<()> {
loop {
match self.uring.submit() {
Ok(_) => {
Expand Down
6 changes: 0 additions & 6 deletions src/driver/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,6 @@ impl<T> Op<T> {
}
}

// Submit the new operation. At this point, the operation has been
// pushed onto the queue and the tail pointer has been updated, so
// the submission entry is visible to the kernel. If there is an
// error here (probably EAGAIN), we still return the operation. A
// future `io_uring_enter` will fully submit the event.
let _ = inner.submit();
Ok(op)
})
}
Expand Down
8 changes: 7 additions & 1 deletion src/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::driver::Driver;
use crate::driver::{Driver, CURRENT};
use std::cell::RefCell;

use std::future::Future;
use std::io;
Expand Down Expand Up @@ -51,6 +52,11 @@ pub fn spawn<T: std::future::Future + 'static>(task: T) -> tokio::task::JoinHand
impl Runtime {
pub(crate) fn new() -> io::Result<Runtime> {
let rt = tokio::runtime::Builder::new_current_thread()
.on_thread_park(|| {
CURRENT.with(|x| {
let _ = RefCell::borrow_mut(x).uring.submit();
});
})
.enable_all()
.build()?;

Expand Down

0 comments on commit 0fdc0c2

Please sign in to comment.