Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multi completion events: v2 #130

Merged
merged 24 commits into from
Oct 27, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 25 additions & 12 deletions src/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,14 @@ pub(crate) struct Inner {
pub(crate) uring: IoUring,
}

// When dropping the driver, all in-flight operations must have completed. This
// type wraps the slab and ensures that, on drop, the slab is empty.
struct Ops(Slab<op::Lifecycle>);
struct Ops {
// When dropping the driver, all in-flight operations must have completed. This
// type wraps the slab and ensures that, on drop, the slab is empty.
lifecycle: Slab<op::Lifecycle>,

/// Received but unserviced Op completions
completions: Slab<op::Completion>,
}

scoped_thread_local!(pub(crate) static CURRENT: Rc<RefCell<Inner>>);

Expand Down Expand Up @@ -95,7 +100,7 @@ impl Driver {

fn num_operations(&self) -> usize {
let inner = self.inner.borrow();
inner.ops.0.len()
inner.ops.lifecycle.len()
}
}

Expand Down Expand Up @@ -155,33 +160,41 @@ impl Drop for Driver {

impl Ops {
fn new() -> Ops {
Ops(Slab::with_capacity(64))
Ops {
lifecycle: Slab::with_capacity(64),
completions: Slab::with_capacity(64),
}
}

fn get_mut(&mut self, index: usize) -> Option<&mut op::Lifecycle> {
self.0.get_mut(index)
fn get_mut(&mut self, index: usize) -> Option<(&mut op::Lifecycle, &mut Slab<op::Completion>)> {
let completions = &mut self.completions;
self.lifecycle
.get_mut(index)
.map(|lifecycle| (lifecycle, completions))
}

// Insert a new operation
fn insert(&mut self) -> usize {
self.0.insert(op::Lifecycle::Submitted)
self.lifecycle.insert(op::Lifecycle::Submitted)
}

// Remove an operation
fn remove(&mut self, index: usize) {
self.0.remove(index);
self.lifecycle.remove(index);
}

fn complete(&mut self, index: usize, result: io::Result<u32>, flags: u32) {
if self.0[index].complete(result, flags) {
self.0.remove(index);
let completions = &mut self.completions;
if self.lifecycle[index].complete(completions, result, flags) {
self.lifecycle.remove(index);
}
}
}

impl Drop for Ops {
fn drop(&mut self) {
assert!(self.0.is_empty());
assert!(self.lifecycle.is_empty());
assert!(self.completions.is_empty());
}
}

Expand Down
98 changes: 76 additions & 22 deletions src/driver/op.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
use std::cell::RefCell;
use std::future::Future;
use std::io;
use std::marker::PhantomData;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll, Waker};

use io_uring::squeue;

mod completion;
pub(crate) use completion::*;
use slab::Slab;

use crate::driver;

/// In-flight operation
pub(crate) struct Op<T: 'static> {
pub(crate) struct Op<T: 'static, CqeType = SingleCQE> {
// Driver running the operation
pub(super) driver: Rc<RefCell<driver::Inner>>,

Expand All @@ -19,8 +24,14 @@ pub(crate) struct Op<T: 'static> {

// Per-operation data
data: Option<T>,

// CqeType marker
_cqe_type: PhantomData<CqeType>,
}

/// A Marker for Ops which expect only a single completion event
pub(crate) struct SingleCQE;

pub(crate) trait Completable {
type Output;
fn complete(self, result: io::Result<u32>, flags: u32) -> Self::Output;
Expand All @@ -37,28 +48,33 @@ pub(crate) enum Lifecycle {
/// must be passed to the driver and held until the operation completes.
Ignored(Box<dyn std::any::Any>),

/// The operation has completed.
/// The operation has completed with a single result
ollie-etl marked this conversation as resolved.
Show resolved Hide resolved
Completed(io::Result<u32>, u32),

/// One or more completion results have been recieved
/// This holds the indices uniquely identifying the list within the slab
CompletionList(CompletionIndices),
}

impl<T> Op<T>
impl<T, CqeType> Op<T, CqeType>
where
T: Completable,
{
/// Create a new operation
fn new(data: T, inner: &mut driver::Inner, inner_rc: &Rc<RefCell<driver::Inner>>) -> Op<T> {
fn new(data: T, inner: &mut driver::Inner, inner_rc: &Rc<RefCell<driver::Inner>>) -> Self {
Op {
driver: inner_rc.clone(),
index: inner.ops.insert(),
data: Some(data),
_cqe_type: PhantomData,
}
}

/// Submit an operation to uring.
///
/// `state` is stored during the operation tracking any state submitted to
/// the kernel.
pub(super) fn submit_with<F>(data: T, f: F) -> io::Result<Op<T>>
pub(super) fn submit_with<F>(data: T, f: F) -> io::Result<Self>
where
F: FnOnce(&mut T) -> squeue::Entry,
{
Expand Down Expand Up @@ -91,7 +107,7 @@ where
}

/// Try submitting an operation to uring
pub(super) fn try_submit_with<F>(data: T, f: F) -> io::Result<Op<T>>
pub(super) fn try_submit_with<F>(data: T, f: F) -> io::Result<Self>
where
F: FnOnce(&mut T) -> squeue::Entry,
{
Expand All @@ -103,7 +119,7 @@ where
}
}

impl<T> Future for Op<T>
impl<T> Future for Op<T, SingleCQE>
ollie-etl marked this conversation as resolved.
Show resolved Hide resolved
where
T: Unpin + 'static + Completable,
{
Expand All @@ -114,7 +130,11 @@ where

let me = &mut *self;
let mut inner = me.driver.borrow_mut();
let lifecycle = inner.ops.get_mut(me.index).expect("invalid internal state");
let lifecycle = inner
.ops
.get_mut(me.index)
.expect("invalid internal state")
.0;

match mem::replace(lifecycle, Lifecycle::Submitted) {
Lifecycle::Submitted => {
Expand All @@ -133,49 +153,82 @@ where
Lifecycle::Completed(result, flags) => {
inner.ops.remove(me.index);
me.index = usize::MAX;

Poll::Ready(me.data.take().unwrap().complete(result, flags))
}
Lifecycle::CompletionList(..) => {
ollie-etl marked this conversation as resolved.
Show resolved Hide resolved
unreachable!()
}
}
}
}

impl<T> Drop for Op<T> {
impl<T, CqeType> Drop for Op<T, CqeType> {
fn drop(&mut self) {
use std::mem;

let mut inner = self.driver.borrow_mut();
let lifecycle = match inner.ops.get_mut(self.index) {
Some(lifecycle) => lifecycle,
let (lifecycle, completions) = match inner.ops.get_mut(self.index) {
Some(val) => val,
None => return,
};

ollie-etl marked this conversation as resolved.
Show resolved Hide resolved
match lifecycle {
match mem::replace(lifecycle, Lifecycle::Submitted) {
Lifecycle::Submitted | Lifecycle::Waiting(_) => {
*lifecycle = Lifecycle::Ignored(Box::new(self.data.take()));
}
Lifecycle::Completed(..) => {
inner.ops.remove(self.index);
}
Lifecycle::CompletionList(indices) => {
// Deallocate list entries, recording if the more CQE's are expected
ollie-etl marked this conversation as resolved.
Show resolved Hide resolved
let more = {
let mut list = indices.into_list(completions);
io_uring::cqueue::more(list.peek_end().unwrap().1)
// Dropping list dealloctes the list entries
ollie-etl marked this conversation as resolved.
Show resolved Hide resolved
};
if more {
// If more are expected, we have to keep the op around
*lifecycle = Lifecycle::Ignored(Box::new(self.data.take()));
} else {
inner.ops.remove(self.index);
}
}
Lifecycle::Ignored(..) => unreachable!(),
}
}
}

impl Lifecycle {
pub(super) fn complete(&mut self, result: io::Result<u32>, flags: u32) -> bool {
pub(super) fn complete(
&mut self,
completions: &mut Slab<Completion>,
result: io::Result<u32>,
flags: u32,
) -> bool {
use std::mem;

match mem::replace(self, Lifecycle::Submitted) {
Lifecycle::Submitted => {
*self = Lifecycle::Completed(result, flags);
false
}
Lifecycle::Waiting(waker) => {
*self = Lifecycle::Completed(result, flags);
waker.wake();
x @ Lifecycle::Submitted | x @ Lifecycle::Waiting(..) => {
if io_uring::cqueue::more(flags) {
let mut list = CompletionIndices::new().into_list(completions);
list.push((result, flags));
*self = Lifecycle::CompletionList(list.into_indices());
} else {
*self = Lifecycle::Completed(result, flags);
}
if let Lifecycle::Waiting(waker) = x {
waker.wake();
}
false
}
Lifecycle::Ignored(..) => true,
Lifecycle::Completed(..) => unreachable!("invalid operation state"),
Lifecycle::CompletionList(indices) => {
ollie-etl marked this conversation as resolved.
Show resolved Hide resolved
let mut list = indices.into_list(completions);
list.push((result, flags));
*self = Lifecycle::CompletionList(list.into_indices());
false
}
}
}
}
Expand Down Expand Up @@ -339,6 +392,7 @@ mod test {

fn release(driver: crate::driver::Driver) {
// Clear ops, we aren't really doing any I/O
driver.inner.borrow_mut().ops.0.clear();
driver.inner.borrow_mut().ops.lifecycle.clear();
driver.inner.borrow_mut().ops.completions.clear();
}
}
Loading