Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multi completion events: v2 #130

Merged
merged 24 commits into from
Oct 27, 2022
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions src/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ struct Ops {
// When dropping the driver, all in-flight operations must have completed. This
// type wraps the slab and ensures that, on drop, the slab is empty.
lifecycle: Slab<op::Lifecycle>,

/// Received but unserviced Op completions
completions: Slab<op::Completion>,
}

scoped_thread_local!(pub(crate) static CURRENT: Rc<RefCell<Inner>>);
Expand Down Expand Up @@ -162,11 +165,15 @@ impl Ops {
fn new() -> Ops {
Ops {
lifecycle: Slab::with_capacity(64),
completions: Slab::with_capacity(64),
}
}

fn get_mut(&mut self, index: usize) -> Option<&mut op::Lifecycle> {
self.lifecycle.get_mut(index)
fn get_mut(&mut self, index: usize) -> Option<(&mut op::Lifecycle, &mut Slab<op::Completion>)> {
let completions = &mut self.completions;
self.lifecycle
.get_mut(index)
.map(|lifecycle| (lifecycle, completions))
}

// Insert a new operation
Expand All @@ -180,7 +187,8 @@ impl Ops {
}

fn complete(&mut self, index: usize, cqe: op::CqeResult) {
if self.lifecycle[index].complete(cqe) {
let completions = &mut self.completions;
if self.lifecycle[index].complete(completions, cqe) {
self.lifecycle.remove(index);
}
}
Expand All @@ -189,5 +197,6 @@ impl Ops {
impl Drop for Ops {
fn drop(&mut self) {
assert!(self.lifecycle.is_empty());
assert!(self.completions.is_empty());
}
}
108 changes: 91 additions & 17 deletions src/driver/op.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,29 @@
use std::cell::RefCell;
use std::future::Future;
use std::io;
use std::marker::PhantomData;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll, Waker};

use io_uring::{cqueue, squeue};

mod slab_list;

use slab::Slab;
use slab_list::{SlabListEntry, SlabListIndices};

use crate::driver;

/// A SlabList is used to hold unserved completions.
///
/// This is relevant to multi-completion Operations,
/// which require an unknown number of CQE events to be
/// captured before completion.
pub(crate) type Completion = SlabListEntry<CqeResult>;

/// In-flight operation
pub(crate) struct Op<T: 'static> {
pub(crate) struct Op<T: 'static, CqeType = SingleCQE> {
// Driver running the operation
pub(super) driver: Rc<RefCell<driver::Inner>>,

Expand All @@ -19,8 +32,14 @@ pub(crate) struct Op<T: 'static> {

// Per-operation data
data: Option<T>,

// CqeType marker
_cqe_type: PhantomData<CqeType>,
}

/// A Marker for Ops which expect only a single completion event
pub(crate) struct SingleCQE;

pub(crate) trait Completable {
type Output;
fn complete(self, cqe: CqeResult) -> Self::Output;
Expand All @@ -39,12 +58,15 @@ pub(crate) enum Lifecycle {

/// The operation has completed with a single cqe result
Completed(CqeResult),

/// One or more completion results have been recieved
/// This holds the indices uniquely identifying the list within the slab
CompletionList(SlabListIndices),
}

/// A single CQE entry
pub(crate) struct CqeResult {
pub(crate) result: io::Result<u32>,
#[allow(dead_code)]
ollie-etl marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) flags: u32,
}

Expand All @@ -61,7 +83,7 @@ impl From<cqueue::Entry> for CqeResult {
}
}

impl<T> Op<T>
impl<T, CqeType> Op<T, CqeType>
where
T: Completable,
{
Expand All @@ -71,6 +93,7 @@ where
driver: inner_rc.clone(),
index: inner.ops.insert(),
data: Some(data),
_cqe_type: PhantomData,
}
}

Expand Down Expand Up @@ -115,7 +138,7 @@ where
}
}

impl<T> Future for Op<T>
impl<T> Future for Op<T, SingleCQE>
ollie-etl marked this conversation as resolved.
Show resolved Hide resolved
where
T: Unpin + 'static + Completable,
{
Expand All @@ -126,7 +149,11 @@ where

let me = &mut *self;
let mut inner = me.driver.borrow_mut();
let lifecycle = inner.ops.get_mut(me.index).expect("invalid internal state");
let lifecycle = inner
.ops
.get_mut(me.index)
.expect("invalid internal state")
.0;

match mem::replace(lifecycle, Lifecycle::Submitted) {
Lifecycle::Submitted => {
Expand All @@ -147,46 +174,92 @@ where
me.index = usize::MAX;
Poll::Ready(me.data.take().unwrap().complete(cqe))
}
Lifecycle::CompletionList(..) => {
ollie-etl marked this conversation as resolved.
Show resolved Hide resolved
unreachable!("No more flag set for SinglCQE")
FrankReh marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}

impl<T> Drop for Op<T> {
impl<T, CqeType> Drop for Op<T, CqeType> {
fn drop(&mut self) {
use std::mem;

let mut inner = self.driver.borrow_mut();
let lifecycle = match inner.ops.get_mut(self.index) {
Some(lifecycle) => lifecycle,
let (lifecycle, completions) = match inner.ops.get_mut(self.index) {
Some(val) => val,
None => return,
};

ollie-etl marked this conversation as resolved.
Show resolved Hide resolved
match lifecycle {
match mem::replace(lifecycle, Lifecycle::Submitted) {
Lifecycle::Submitted | Lifecycle::Waiting(_) => {
*lifecycle = Lifecycle::Ignored(Box::new(self.data.take()));
}
Lifecycle::Completed(..) => {
inner.ops.remove(self.index);
}
Lifecycle::CompletionList(indices) => {
// Deallocate list entries, recording if the more CQE's are expected
ollie-etl marked this conversation as resolved.
Show resolved Hide resolved
let more = {
let mut list = indices.into_list(completions);
io_uring::cqueue::more(list.peek_end().unwrap().flags)
// Dropping list deallocates the list entries
};
if more {
// If more are expected, we have to keep the op around
*lifecycle = Lifecycle::Ignored(Box::new(self.data.take()));
} else {
inner.ops.remove(self.index);
}
}
Lifecycle::Ignored(..) => unreachable!(),
}
}
}

impl Lifecycle {
pub(super) fn complete(&mut self, cqe: CqeResult) -> bool {
pub(super) fn complete(&mut self, completions: &mut Slab<Completion>, cqe: CqeResult) -> bool {
use std::mem;

match mem::replace(self, Lifecycle::Submitted) {
Lifecycle::Submitted => {
*self = Lifecycle::Completed(cqe);
x @ Lifecycle::Submitted | x @ Lifecycle::Waiting(..) => {
if io_uring::cqueue::more(cqe.flags) {
let mut list = SlabListIndices::new().into_list(completions);
list.push(cqe);
*self = Lifecycle::CompletionList(list.into_indices());
} else {
*self = Lifecycle::Completed(cqe);
}
if let Lifecycle::Waiting(waker) = x {
// wake is called whenever we have more work.
FrankReh marked this conversation as resolved.
Show resolved Hide resolved
// Also possible would be to defer calling until cqe with !more flag set
waker.wake();
}
false
}
Lifecycle::Waiting(waker) => {
*self = Lifecycle::Completed(cqe);
waker.wake();
lifecycle @ Lifecycle::Ignored(..) => {
// We must check if any more CQEs are expected before dropping
ollie-etl marked this conversation as resolved.
Show resolved Hide resolved
if io_uring::cqueue::more(cqe.flags) {
*self = lifecycle;
false
} else {
true
}
}

Lifecycle::Completed(..) => {
// Completions with more flag set go straight onto the slab,
// and are handled in Lifecycle::CompletionList.
// To construct Lifecycle::Completed, a CQE without MORE was received
// we shouldn't be receiving another.
unreachable!("invalid operation state")
}
Lifecycle::CompletionList(indices) => {
ollie-etl marked this conversation as resolved.
Show resolved Hide resolved
let mut list = indices.into_list(completions);
list.push(cqe);
*self = Lifecycle::CompletionList(list.into_indices());
false
}
Lifecycle::Ignored(..) => true,
Lifecycle::Completed(..) => unreachable!("invalid operation state"),
}
}
}
Expand Down Expand Up @@ -356,5 +429,6 @@ mod test {
fn release(driver: crate::driver::Driver) {
// Clear ops, we aren't really doing any I/O
driver.inner.borrow_mut().ops.lifecycle.clear();
driver.inner.borrow_mut().ops.completions.clear();
}
}
Loading