From 47452dff39d56138b4ed1080c1a410187405dc15 Mon Sep 17 00:00:00 2001 From: Oliver Bunting Date: Wed, 12 Oct 2022 16:31:43 +0100 Subject: [PATCH 01/17] Add Marker SingleCqe This is used to parameterize Op. It is now possible to specialize blanket implmentations, such as Future, to classes of Ops --- src/driver/op.rs | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/src/driver/op.rs b/src/driver/op.rs index c2757256..8186a15a 100644 --- a/src/driver/op.rs +++ b/src/driver/op.rs @@ -1,6 +1,7 @@ use std::cell::RefCell; use std::future::Future; use std::io; +use std::marker::PhantomData; use std::pin::Pin; use std::rc::Rc; use std::task::{Context, Poll, Waker}; @@ -10,7 +11,7 @@ use io_uring::squeue; use crate::driver; /// In-flight operation -pub(crate) struct Op { +pub(crate) struct Op { // Driver running the operation pub(super) driver: Rc>, @@ -19,8 +20,14 @@ pub(crate) struct Op { // Per-operation data data: Option, + + // CqeType marker + _cqe_type: PhantomData } +/// A Marker for Ops which expect only a single completion event +pub(crate) struct SingleCQE; + pub(crate) trait Completable { type Output; fn complete(self, result: io::Result, flags: u32) -> Self::Output; @@ -41,16 +48,17 @@ pub(crate) enum Lifecycle { Completed(io::Result, u32), } -impl Op +impl Op where T: Completable, { /// Create a new operation - fn new(data: T, inner: &mut driver::Inner, inner_rc: &Rc>) -> Op { + fn new(data: T, inner: &mut driver::Inner, inner_rc: &Rc>) -> Self { Op { driver: inner_rc.clone(), index: inner.ops.insert(), data: Some(data), + _cqe_type: PhantomData, } } @@ -58,7 +66,7 @@ where /// /// `state` is stored during the operation tracking any state submitted to /// the kernel. - pub(super) fn submit_with(data: T, f: F) -> io::Result> + pub(super) fn submit_with(data: T, f: F) -> io::Result where F: FnOnce(&mut T) -> squeue::Entry, { @@ -91,7 +99,7 @@ where } /// Try submitting an operation to uring - pub(super) fn try_submit_with(data: T, f: F) -> io::Result> + pub(super) fn try_submit_with(data: T, f: F) -> io::Result where F: FnOnce(&mut T) -> squeue::Entry, { @@ -103,7 +111,7 @@ where } } -impl Future for Op +impl Future for Op where T: Unpin + 'static + Completable, { From 829e7df5c7d2d38f0748df191b0f5264d1d206ee Mon Sep 17 00:00:00 2001 From: Oliver Bunting Date: Wed, 12 Oct 2022 16:34:08 +0100 Subject: [PATCH 02/17] Add CompletionList Adds infrastructure to store Completions a slab backed indexed linked list --- src/driver/op.rs | 4 ++ src/driver/op/completion.rs | 114 ++++++++++++++++++++++++++++++++++++ 2 files changed, 118 insertions(+) create mode 100644 src/driver/op/completion.rs diff --git a/src/driver/op.rs b/src/driver/op.rs index 8186a15a..f5b94a8b 100644 --- a/src/driver/op.rs +++ b/src/driver/op.rs @@ -8,6 +8,10 @@ use std::task::{Context, Poll, Waker}; use io_uring::squeue; +mod completion; +pub(crate) use completion::*; +use slab::Slab; + use crate::driver; /// In-flight operation diff --git a/src/driver/op/completion.rs b/src/driver/op/completion.rs new file mode 100644 index 00000000..a630a10a --- /dev/null +++ b/src/driver/op/completion.rs @@ -0,0 +1,114 @@ + +use std::{io, ops::{Deref, DerefMut}}; +use slab::Slab; + +/// A linked list of CQE events +pub(crate) struct CompletionList<'a> { + index: CompletionIndices, + completions: &'a mut Slab, +} + +// Index to the first and last Completion of a single list held in the slab +pub(crate) struct CompletionIndices { + start: usize, + end: usize, +} + +/// Multi cycle operations may return an unbounded number of CQE's +/// for a single cycle SQE. +/// +/// These are held in an indexed linked list +pub(crate) struct Completion { + val: (io::Result, u32), + next: usize, + prev: usize, +} + +impl Deref for Completion { + type Target = (io::Result, u32); + + fn deref(&self) -> &Self::Target { + &self.val + } +} + +impl DerefMut for Completion { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.val + } +} + + +impl CompletionIndices{ + pub(crate) fn new() -> Self { + let start = usize::MAX; + CompletionIndices { start, end:start } + } + + pub(crate) fn into_list<'a>(self, completions: &'a mut Slab) -> CompletionList<'a> { + CompletionList::from_indices(self, completions) + } + +} + + +impl<'a> CompletionList<'a> { + pub(crate) fn from_indices(index: CompletionIndices, completions: &'a mut Slab) -> Self { + CompletionList { completions, index } + } + + pub(crate) fn is_empty(&self) -> bool{ + self.index.start == usize::MAX + } + + + /// Peek at the end of the list (most recently pushed) + /// This leaves the list unchanged + pub(crate) fn peek_end(&mut self) -> Option<&(io::Result, u32)> { + if self.index.end == usize::MAX { + None + } else { + Some(&self.completions[self.index.end].val) + } + } + + /// Pop from front of list + #[allow(dead_code)] + pub(crate) fn pop(&mut self) -> Option<(io::Result, u32)> { + self.completions.try_remove(self.index.start).map(| Completion{next, val,..} |{ + if next != usize::MAX { + self.completions[next].prev = usize::MAX; + } else { + self.index.end = usize::MAX; + } + self.index.start = next; + val + }) + } + + /// Push to the end of the list + pub(crate) fn push(&mut self, val: (io::Result, u32)) { + let prev = self.index.end; + let completion = Completion { + val, + next: usize::MAX, + prev, + }; + self.index.end = self.completions.insert(completion); + self.completions[prev].next = self.index.end; + } + + /// Consume the list, without dropping entries, returning just the start and end indices + pub(crate) fn into_indices(mut self) -> CompletionIndices { + std::mem::replace(&mut self.index, CompletionIndices::new()) + } +} + +impl<'a> Drop for CompletionList<'a> { + fn drop(&mut self) { + while !self.is_empty() { + let removed = self.completions.remove(self.index.start); + self.index.start = removed.next; + } + } +} \ No newline at end of file From cfe82f64c48c2751744d434816716771547c540e Mon Sep 17 00:00:00 2001 From: Oliver Bunting Date: Wed, 12 Oct 2022 16:37:56 +0100 Subject: [PATCH 03/17] Add CompletionList variant Provides the infrastructure for recieving multiple CQE's for a single Op's lifecycle --- src/driver/mod.rs | 35 ++++++++++++++++--------- src/driver/op.rs | 66 +++++++++++++++++++++++++++++++++++------------ 2 files changed, 73 insertions(+), 28 deletions(-) diff --git a/src/driver/mod.rs b/src/driver/mod.rs index c2813d96..f773a4ea 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -58,9 +58,14 @@ pub(crate) struct Inner { pub(crate) uring: IoUring, } -// When dropping the driver, all in-flight operations must have completed. This -// type wraps the slab and ensures that, on drop, the slab is empty. -struct Ops(Slab); +struct Ops{ + // When dropping the driver, all in-flight operations must have completed. This + // type wraps the slab and ensures that, on drop, the slab is empty. + lifecycle: Slab, + + /// Received but unserviced Op completions + completions: Slab +} scoped_thread_local!(pub(crate) static CURRENT: Rc>); @@ -95,7 +100,7 @@ impl Driver { fn num_operations(&self) -> usize { let inner = self.inner.borrow(); - inner.ops.0.len() + inner.ops.lifecycle.len() } } @@ -155,33 +160,39 @@ impl Drop for Driver { impl Ops { fn new() -> Ops { - Ops(Slab::with_capacity(64)) + Ops{ + lifecycle: Slab::with_capacity(64), + completions: Slab::with_capacity(64) + } } - fn get_mut(&mut self, index: usize) -> Option<&mut op::Lifecycle> { - self.0.get_mut(index) + fn get_mut(&mut self, index: usize) -> Option<(&mut op::Lifecycle, &mut Slab)> { + let completions = &mut self.completions; + self.lifecycle.get_mut(index).map(|lifecycle| (lifecycle, completions)) } // Insert a new operation fn insert(&mut self) -> usize { - self.0.insert(op::Lifecycle::Submitted) + self.lifecycle.insert(op::Lifecycle::Submitted) } // Remove an operation fn remove(&mut self, index: usize) { - self.0.remove(index); + self.lifecycle.remove(index); } fn complete(&mut self, index: usize, result: io::Result, flags: u32) { - if self.0[index].complete(result, flags) { - self.0.remove(index); + let completions = &mut self.completions; + if self.lifecycle[index].complete(completions, result, flags) { + self.lifecycle.remove(index); } } } impl Drop for Ops { fn drop(&mut self) { - assert!(self.0.is_empty()); + assert!(self.lifecycle.is_empty()); + assert!(self.completions.is_empty()); } } diff --git a/src/driver/op.rs b/src/driver/op.rs index f5b94a8b..d59a4e7e 100644 --- a/src/driver/op.rs +++ b/src/driver/op.rs @@ -48,8 +48,11 @@ pub(crate) enum Lifecycle { /// must be passed to the driver and held until the operation completes. Ignored(Box), - /// The operation has completed. + /// The operation has completed with a single result Completed(io::Result, u32), + + /// One or more completion results have been recieved + CompletionIndices(CompletionIndices), } impl Op @@ -126,7 +129,7 @@ where let me = &mut *self; let mut inner = me.driver.borrow_mut(); - let lifecycle = inner.ops.get_mut(me.index).expect("invalid internal state"); + let lifecycle = inner.ops.get_mut(me.index).expect("invalid internal state").0; match mem::replace(lifecycle, Lifecycle::Submitted) { Lifecycle::Submitted => { @@ -145,53 +148,83 @@ where Lifecycle::Completed(result, flags) => { inner.ops.remove(me.index); me.index = usize::MAX; - Poll::Ready(me.data.take().unwrap().complete(result, flags)) } + Lifecycle::CompletionIndices(..) => { + unreachable!() + } } } } -impl Drop for Op { +impl Drop for Op { fn drop(&mut self) { + use std::mem; + let mut inner = self.driver.borrow_mut(); - let lifecycle = match inner.ops.get_mut(self.index) { - Some(lifecycle) => lifecycle, + let (lifecycle, completions) = match inner.ops.get_mut(self.index) { + Some(val) => val, None => return, }; - match lifecycle { + match mem::replace(lifecycle, Lifecycle::Submitted) { Lifecycle::Submitted | Lifecycle::Waiting(_) => { *lifecycle = Lifecycle::Ignored(Box::new(self.data.take())); } Lifecycle::Completed(..) => { inner.ops.remove(self.index); } + Lifecycle::CompletionIndices(indices) => { + // Deallocate list entries, recording if the more CQE's are expected + let more = { + let mut list = indices.into_list(completions); + io_uring::cqueue::more(list.peek_end().unwrap().1) + // Dropping list dealloctes the list entries + }; + if more { + // If more are expected, we have to keep the op around + *lifecycle = Lifecycle::Ignored(Box::new(self.data.take())); + } else { + inner.ops.remove(self.index); + } + + } Lifecycle::Ignored(..) => unreachable!(), } } } impl Lifecycle { - pub(super) fn complete(&mut self, result: io::Result, flags: u32) -> bool { + pub(super) fn complete(&mut self, completions: &mut Slab, result: io::Result, flags: u32) -> bool { use std::mem; match mem::replace(self, Lifecycle::Submitted) { - Lifecycle::Submitted => { - *self = Lifecycle::Completed(result, flags); - false - } - Lifecycle::Waiting(waker) => { - *self = Lifecycle::Completed(result, flags); - waker.wake(); + x@Lifecycle::Submitted | x@Lifecycle::Waiting(..) => { + if io_uring::cqueue::more(flags){ + let mut list = CompletionIndices::new().into_list(completions); + list.push((result, flags)); + *self = Lifecycle::CompletionIndices(list.into_indices()); + } else { + *self = Lifecycle::Completed(result, flags); + } + if let Lifecycle::Waiting(waker) = x { + waker.wake(); + } false } Lifecycle::Ignored(..) => true, Lifecycle::Completed(..) => unreachable!("invalid operation state"), + Lifecycle::CompletionIndices(indices) => { + let mut list = indices.into_list(completions); + list.push((result, flags)); + *self = Lifecycle::CompletionIndices(list.into_indices()); + false + }, } } } + #[cfg(test)] mod test { use std::rc::Rc; @@ -351,6 +384,7 @@ mod test { fn release(driver: crate::driver::Driver) { // Clear ops, we aren't really doing any I/O - driver.inner.borrow_mut().ops.0.clear(); + driver.inner.borrow_mut().ops.lifecycle.clear(); + driver.inner.borrow_mut().ops.completions.clear(); } } From de50370c7bd96315c23b74c2fa2805d12f653e10 Mon Sep 17 00:00:00 2001 From: Oliver Bunting Date: Wed, 12 Oct 2022 16:42:11 +0100 Subject: [PATCH 04/17] fmt --- src/driver/mod.rs | 12 +++++----- src/driver/op.rs | 38 ++++++++++++++++++------------- src/driver/op/completion.rs | 45 ++++++++++++++++++++----------------- 3 files changed, 54 insertions(+), 41 deletions(-) diff --git a/src/driver/mod.rs b/src/driver/mod.rs index f773a4ea..d5492e9c 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -58,13 +58,13 @@ pub(crate) struct Inner { pub(crate) uring: IoUring, } -struct Ops{ +struct Ops { // When dropping the driver, all in-flight operations must have completed. This // type wraps the slab and ensures that, on drop, the slab is empty. lifecycle: Slab, /// Received but unserviced Op completions - completions: Slab + completions: Slab, } scoped_thread_local!(pub(crate) static CURRENT: Rc>); @@ -160,15 +160,17 @@ impl Drop for Driver { impl Ops { fn new() -> Ops { - Ops{ + Ops { lifecycle: Slab::with_capacity(64), - completions: Slab::with_capacity(64) + completions: Slab::with_capacity(64), } } fn get_mut(&mut self, index: usize) -> Option<(&mut op::Lifecycle, &mut Slab)> { let completions = &mut self.completions; - self.lifecycle.get_mut(index).map(|lifecycle| (lifecycle, completions)) + self.lifecycle + .get_mut(index) + .map(|lifecycle| (lifecycle, completions)) } // Insert a new operation diff --git a/src/driver/op.rs b/src/driver/op.rs index d59a4e7e..4e136ec7 100644 --- a/src/driver/op.rs +++ b/src/driver/op.rs @@ -26,7 +26,7 @@ pub(crate) struct Op { data: Option, // CqeType marker - _cqe_type: PhantomData + _cqe_type: PhantomData, } /// A Marker for Ops which expect only a single completion event @@ -52,10 +52,11 @@ pub(crate) enum Lifecycle { Completed(io::Result, u32), /// One or more completion results have been recieved - CompletionIndices(CompletionIndices), + /// This holds the indices uniquely identifying the list within the slab + CompletionList(CompletionIndices), } -impl Op +impl Op where T: Completable, { @@ -129,7 +130,11 @@ where let me = &mut *self; let mut inner = me.driver.borrow_mut(); - let lifecycle = inner.ops.get_mut(me.index).expect("invalid internal state").0; + let lifecycle = inner + .ops + .get_mut(me.index) + .expect("invalid internal state") + .0; match mem::replace(lifecycle, Lifecycle::Submitted) { Lifecycle::Submitted => { @@ -150,7 +155,7 @@ where me.index = usize::MAX; Poll::Ready(me.data.take().unwrap().complete(result, flags)) } - Lifecycle::CompletionIndices(..) => { + Lifecycle::CompletionList(..) => { unreachable!() } } @@ -174,7 +179,7 @@ impl Drop for Op { Lifecycle::Completed(..) => { inner.ops.remove(self.index); } - Lifecycle::CompletionIndices(indices) => { + Lifecycle::CompletionList(indices) => { // Deallocate list entries, recording if the more CQE's are expected let more = { let mut list = indices.into_list(completions); @@ -187,7 +192,6 @@ impl Drop for Op { } else { inner.ops.remove(self.index); } - } Lifecycle::Ignored(..) => unreachable!(), } @@ -195,15 +199,20 @@ impl Drop for Op { } impl Lifecycle { - pub(super) fn complete(&mut self, completions: &mut Slab, result: io::Result, flags: u32) -> bool { + pub(super) fn complete( + &mut self, + completions: &mut Slab, + result: io::Result, + flags: u32, + ) -> bool { use std::mem; match mem::replace(self, Lifecycle::Submitted) { - x@Lifecycle::Submitted | x@Lifecycle::Waiting(..) => { - if io_uring::cqueue::more(flags){ + x @ Lifecycle::Submitted | x @ Lifecycle::Waiting(..) => { + if io_uring::cqueue::more(flags) { let mut list = CompletionIndices::new().into_list(completions); list.push((result, flags)); - *self = Lifecycle::CompletionIndices(list.into_indices()); + *self = Lifecycle::CompletionList(list.into_indices()); } else { *self = Lifecycle::Completed(result, flags); } @@ -214,17 +223,16 @@ impl Lifecycle { } Lifecycle::Ignored(..) => true, Lifecycle::Completed(..) => unreachable!("invalid operation state"), - Lifecycle::CompletionIndices(indices) => { + Lifecycle::CompletionList(indices) => { let mut list = indices.into_list(completions); list.push((result, flags)); - *self = Lifecycle::CompletionIndices(list.into_indices()); + *self = Lifecycle::CompletionList(list.into_indices()); false - }, + } } } } - #[cfg(test)] mod test { use std::rc::Rc; diff --git a/src/driver/op/completion.rs b/src/driver/op/completion.rs index a630a10a..51a712bf 100644 --- a/src/driver/op/completion.rs +++ b/src/driver/op/completion.rs @@ -1,6 +1,8 @@ - -use std::{io, ops::{Deref, DerefMut}}; use slab::Slab; +use std::{ + io, + ops::{Deref, DerefMut}, +}; /// A linked list of CQE events pub(crate) struct CompletionList<'a> { @@ -38,30 +40,29 @@ impl DerefMut for Completion { } } - -impl CompletionIndices{ +impl CompletionIndices { pub(crate) fn new() -> Self { let start = usize::MAX; - CompletionIndices { start, end:start } + CompletionIndices { start, end: start } } pub(crate) fn into_list<'a>(self, completions: &'a mut Slab) -> CompletionList<'a> { CompletionList::from_indices(self, completions) } - } - impl<'a> CompletionList<'a> { - pub(crate) fn from_indices(index: CompletionIndices, completions: &'a mut Slab) -> Self { - CompletionList { completions, index } + pub(crate) fn from_indices( + index: CompletionIndices, + completions: &'a mut Slab, + ) -> Self { + CompletionList { completions, index } } - pub(crate) fn is_empty(&self) -> bool{ + pub(crate) fn is_empty(&self) -> bool { self.index.start == usize::MAX } - /// Peek at the end of the list (most recently pushed) /// This leaves the list unchanged pub(crate) fn peek_end(&mut self) -> Option<&(io::Result, u32)> { @@ -75,15 +76,17 @@ impl<'a> CompletionList<'a> { /// Pop from front of list #[allow(dead_code)] pub(crate) fn pop(&mut self) -> Option<(io::Result, u32)> { - self.completions.try_remove(self.index.start).map(| Completion{next, val,..} |{ - if next != usize::MAX { - self.completions[next].prev = usize::MAX; - } else { - self.index.end = usize::MAX; - } - self.index.start = next; - val - }) + self.completions + .try_remove(self.index.start) + .map(|Completion { next, val, .. }| { + if next != usize::MAX { + self.completions[next].prev = usize::MAX; + } else { + self.index.end = usize::MAX; + } + self.index.start = next; + val + }) } /// Push to the end of the list @@ -111,4 +114,4 @@ impl<'a> Drop for CompletionList<'a> { self.index.start = removed.next; } } -} \ No newline at end of file +} From 5d46bd9bbcf7776647eda780be386bdbd9030443 Mon Sep 17 00:00:00 2001 From: Oliver Bunting Date: Wed, 12 Oct 2022 16:43:08 +0100 Subject: [PATCH 05/17] clippy --- src/driver/op/completion.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/driver/op/completion.rs b/src/driver/op/completion.rs index 51a712bf..1d25ac4d 100644 --- a/src/driver/op/completion.rs +++ b/src/driver/op/completion.rs @@ -46,7 +46,7 @@ impl CompletionIndices { CompletionIndices { start, end: start } } - pub(crate) fn into_list<'a>(self, completions: &'a mut Slab) -> CompletionList<'a> { + pub(crate) fn into_list(self, completions: &mut Slab) -> CompletionList<'_> { CompletionList::from_indices(self, completions) } } From 7d59c25d98dbc2c53cbdfc0969ea1f27afbc0cca Mon Sep 17 00:00:00 2001 From: Oliver Bunting Date: Wed, 12 Oct 2022 18:27:23 +0100 Subject: [PATCH 06/17] Add the overlooked Lifecycle::Ignored state logic --- src/driver/op.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/driver/op.rs b/src/driver/op.rs index 4e136ec7..1436b02c 100644 --- a/src/driver/op.rs +++ b/src/driver/op.rs @@ -221,8 +221,20 @@ impl Lifecycle { } false } - Lifecycle::Ignored(..) => true, - Lifecycle::Completed(..) => unreachable!("invalid operation state"), + lifecycle @ Lifecycle::Ignored(..) => { + // We must check if any more CQEs are expected before dropping + if io_uring::cqueue::more(flags) { + *self = lifecycle; + false + } else { + true + } + } + Lifecycle::Completed(..) => { + // To construct Lifecycle::Completed, a CQE without MORE was received + // we shouldn't be receiving another. + unreachable!("invalid operation state") + } Lifecycle::CompletionList(indices) => { let mut list = indices.into_list(completions); list.push((result, flags)); From b8046dc306d987ad6ae5883eb9b7ff0b4a6f708a Mon Sep 17 00:00:00 2001 From: ollie-etl Date: Sun, 23 Oct 2022 19:49:44 +0100 Subject: [PATCH 07/17] Newtype Cqe resuult tuple --- src/driver/accept.rs | 6 ++-- src/driver/close.rs | 6 ++-- src/driver/connect.rs | 6 ++-- src/driver/fsync.rs | 6 ++-- src/driver/mod.rs | 18 +++------- src/driver/op.rs | 65 ++++++++++++++++++++++++------------- src/driver/op/completion.rs | 29 ++++++++--------- src/driver/open.rs | 6 ++-- src/driver/read.rs | 6 ++-- src/driver/readv.rs | 6 ++-- src/driver/recv_from.rs | 6 ++-- src/driver/rename_at.rs | 6 ++-- src/driver/send_to.rs | 6 ++-- src/driver/unlink_at.rs | 6 ++-- src/driver/write.rs | 6 ++-- src/driver/writev.rs | 6 ++-- 16 files changed, 99 insertions(+), 91 deletions(-) diff --git a/src/driver/accept.rs b/src/driver/accept.rs index 0b5f73b9..0dc2d505 100644 --- a/src/driver/accept.rs +++ b/src/driver/accept.rs @@ -1,4 +1,4 @@ -use crate::driver::op::Completable; +use crate::driver::op::{self, Completable}; use crate::driver::{Op, SharedFd, Socket}; use std::net::SocketAddr; use std::{boxed::Box, io}; @@ -37,8 +37,8 @@ impl Op { impl Completable for Accept { type Output = io::Result<(Socket, Option)>; - fn complete(self, result: io::Result, _flags: u32) -> Self::Output { - let fd = result?; + fn complete(self, cqe: op::CqeResult) -> Self::Output { + let fd = cqe.result?; let fd = SharedFd::new(fd as i32); let socket = Socket { fd }; let (_, addr) = unsafe { diff --git a/src/driver/close.rs b/src/driver/close.rs index 6219e6e0..006cdc39 100644 --- a/src/driver/close.rs +++ b/src/driver/close.rs @@ -1,6 +1,6 @@ use crate::driver::Op; -use crate::driver::op::Completable; +use crate::driver::op::{self, Completable}; use std::io; use std::os::unix::io::RawFd; @@ -21,8 +21,8 @@ impl Op { impl Completable for Close { type Output = io::Result<()>; - fn complete(self, result: io::Result, _flags: u32) -> Self::Output { - let _ = result?; + fn complete(self, cqe: op::CqeResult) -> Self::Output { + let _ = cqe.result?; Ok(()) } diff --git a/src/driver/connect.rs b/src/driver/connect.rs index c3168fc1..07270c09 100644 --- a/src/driver/connect.rs +++ b/src/driver/connect.rs @@ -1,4 +1,4 @@ -use crate::driver::op::Completable; +use crate::driver::op::{self, Completable}; use crate::driver::{Op, SharedFd}; use socket2::SockAddr; use std::io; @@ -36,7 +36,7 @@ impl Op { impl Completable for Connect { type Output = io::Result<()>; - fn complete(self, result: io::Result, _flags: u32) -> Self::Output { - result.map(|_| ()) + fn complete(self, cqe: op::CqeResult) -> Self::Output { + cqe.result.map(|_| ()) } } diff --git a/src/driver/fsync.rs b/src/driver/fsync.rs index 1cd274f5..4e0faf80 100644 --- a/src/driver/fsync.rs +++ b/src/driver/fsync.rs @@ -2,7 +2,7 @@ use crate::driver::{Op, SharedFd}; use std::io; -use crate::driver::op::Completable; +use crate::driver::op::{self, Completable}; use io_uring::{opcode, types}; pub(crate) struct Fsync { @@ -28,7 +28,7 @@ impl Op { impl Completable for Fsync { type Output = io::Result<()>; - fn complete(self, result: io::Result, _flags: u32) -> Self::Output { - result.map(|_| ()) + fn complete(self, cqe: op::CqeResult) -> Self::Output { + cqe.result.map(|_| ()) } } diff --git a/src/driver/mod.rs b/src/driver/mod.rs index d5492e9c..f09f8a5b 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -36,7 +36,7 @@ mod write; mod writev; -use io_uring::{cqueue, IoUring}; +use io_uring::IoUring; use scoped_tls::scoped_thread_local; use slab::Slab; use std::cell::RefCell; @@ -119,7 +119,7 @@ impl Inner { let index = cqe.user_data() as _; - self.ops.complete(index, resultify(&cqe), cqe.flags()); + self.ops.complete(index, cqe.into()); } } @@ -183,9 +183,9 @@ impl Ops { self.lifecycle.remove(index); } - fn complete(&mut self, index: usize, result: io::Result, flags: u32) { + fn complete(&mut self, index: usize, cqe: op::CqeResult) { let completions = &mut self.completions; - if self.lifecycle[index].complete(completions, result, flags) { + if self.lifecycle[index].complete(completions, cqe) { self.lifecycle.remove(index); } } @@ -197,13 +197,3 @@ impl Drop for Ops { assert!(self.completions.is_empty()); } } - -fn resultify(cqe: &cqueue::Entry) -> io::Result { - let res = cqe.result(); - - if res >= 0 { - Ok(res as u32) - } else { - Err(io::Error::from_raw_os_error(-res)) - } -} diff --git a/src/driver/op.rs b/src/driver/op.rs index 1436b02c..f0e9e6b6 100644 --- a/src/driver/op.rs +++ b/src/driver/op.rs @@ -6,7 +6,7 @@ use std::pin::Pin; use std::rc::Rc; use std::task::{Context, Poll, Waker}; -use io_uring::squeue; +use io_uring::{cqueue, squeue}; mod completion; pub(crate) use completion::*; @@ -34,7 +34,7 @@ pub(crate) struct SingleCQE; pub(crate) trait Completable { type Output; - fn complete(self, result: io::Result, flags: u32) -> Self::Output; + fn complete(self, cqe: CqeResult) -> Self::Output; } pub(crate) enum Lifecycle { @@ -48,14 +48,33 @@ pub(crate) enum Lifecycle { /// must be passed to the driver and held until the operation completes. Ignored(Box), - /// The operation has completed with a single result - Completed(io::Result, u32), + /// The operation has completed with a single cqe result + Completed(CqeResult), /// One or more completion results have been recieved /// This holds the indices uniquely identifying the list within the slab CompletionList(CompletionIndices), } +/// A single CQE entry +pub(crate) struct CqeResult { + pub(crate) result: io::Result, + pub(crate) flags: u32, +} + +impl From for CqeResult { + fn from(cqe: cqueue::Entry) -> Self { + let res = cqe.result(); + let flags = cqe.flags(); + let result = if res >= 0 { + Ok(res as u32) + } else { + Err(io::Error::from_raw_os_error(-res)) + }; + CqeResult { result, flags } + } +} + impl Op where T: Completable, @@ -150,10 +169,10 @@ where Poll::Pending } Lifecycle::Ignored(..) => unreachable!(), - Lifecycle::Completed(result, flags) => { + Lifecycle::Completed(cqe) => { inner.ops.remove(me.index); me.index = usize::MAX; - Poll::Ready(me.data.take().unwrap().complete(result, flags)) + Poll::Ready(me.data.take().unwrap().complete(cqe)) } Lifecycle::CompletionList(..) => { unreachable!() @@ -183,7 +202,7 @@ impl Drop for Op { // Deallocate list entries, recording if the more CQE's are expected let more = { let mut list = indices.into_list(completions); - io_uring::cqueue::more(list.peek_end().unwrap().1) + io_uring::cqueue::more(list.peek_end().unwrap().flags) // Dropping list dealloctes the list entries }; if more { @@ -199,22 +218,17 @@ impl Drop for Op { } impl Lifecycle { - pub(super) fn complete( - &mut self, - completions: &mut Slab, - result: io::Result, - flags: u32, - ) -> bool { + pub(super) fn complete(&mut self, completions: &mut Slab, cqe: CqeResult) -> bool { use std::mem; match mem::replace(self, Lifecycle::Submitted) { x @ Lifecycle::Submitted | x @ Lifecycle::Waiting(..) => { - if io_uring::cqueue::more(flags) { + if io_uring::cqueue::more(cqe.flags) { let mut list = CompletionIndices::new().into_list(completions); - list.push((result, flags)); + list.push(cqe); *self = Lifecycle::CompletionList(list.into_indices()); } else { - *self = Lifecycle::Completed(result, flags); + *self = Lifecycle::Completed(cqe); } if let Lifecycle::Waiting(waker) = x { waker.wake(); @@ -223,7 +237,7 @@ impl Lifecycle { } lifecycle @ Lifecycle::Ignored(..) => { // We must check if any more CQEs are expected before dropping - if io_uring::cqueue::more(flags) { + if io_uring::cqueue::more(cqe.flags) { *self = lifecycle; false } else { @@ -237,7 +251,7 @@ impl Lifecycle { } Lifecycle::CompletionList(indices) => { let mut list = indices.into_list(completions); - list.push((result, flags)); + list.push(cqe); *self = Lifecycle::CompletionList(list.into_indices()); false } @@ -263,10 +277,10 @@ mod test { impl Completable for Rc<()> { type Output = Completion; - fn complete(self, result: io::Result, flags: u32) -> Self::Output { + fn complete(self, cqe: CqeResult) -> Self::Output { Completion { - result, - flags, + result: cqe.result, + flags: cqe.flags, data: self.clone(), } } @@ -377,7 +391,11 @@ mod test { assert_eq!(2, Rc::strong_count(&data)); assert_eq!(1, driver.num_operations()); - driver.inner.borrow_mut().ops.complete(index, Ok(1), 0); + let cqe = CqeResult { + result: Ok(1), + flags: 0, + }; + driver.inner.borrow_mut().ops.complete(index, cqe); assert_eq!(1, Rc::strong_count(&data)); assert_eq!(0, driver.num_operations()); release(driver); @@ -399,7 +417,8 @@ mod test { } fn complete(op: &Op>, result: io::Result) { - op.driver.borrow_mut().ops.complete(op.index, result, 0); + let cqe = CqeResult { result, flags: 0 }; + op.driver.borrow_mut().ops.complete(op.index, cqe); } fn release(driver: crate::driver::Driver) { diff --git a/src/driver/op/completion.rs b/src/driver/op/completion.rs index 1d25ac4d..2fde13ae 100644 --- a/src/driver/op/completion.rs +++ b/src/driver/op/completion.rs @@ -1,8 +1,7 @@ use slab::Slab; -use std::{ - io, - ops::{Deref, DerefMut}, -}; +use std::ops::{Deref, DerefMut}; + +use crate::driver::op::CqeResult; /// A linked list of CQE events pub(crate) struct CompletionList<'a> { @@ -21,22 +20,22 @@ pub(crate) struct CompletionIndices { /// /// These are held in an indexed linked list pub(crate) struct Completion { - val: (io::Result, u32), + cqe: CqeResult, next: usize, prev: usize, } impl Deref for Completion { - type Target = (io::Result, u32); + type Target = CqeResult; fn deref(&self) -> &Self::Target { - &self.val + &self.cqe } } impl DerefMut for Completion { fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.val + &mut self.cqe } } @@ -65,35 +64,35 @@ impl<'a> CompletionList<'a> { /// Peek at the end of the list (most recently pushed) /// This leaves the list unchanged - pub(crate) fn peek_end(&mut self) -> Option<&(io::Result, u32)> { + pub(crate) fn peek_end(&mut self) -> Option<&CqeResult> { if self.index.end == usize::MAX { None } else { - Some(&self.completions[self.index.end].val) + Some(&self.completions[self.index.end].cqe) } } /// Pop from front of list #[allow(dead_code)] - pub(crate) fn pop(&mut self) -> Option<(io::Result, u32)> { + pub(crate) fn pop(&mut self) -> Option { self.completions .try_remove(self.index.start) - .map(|Completion { next, val, .. }| { + .map(|Completion { next, cqe, .. }| { if next != usize::MAX { self.completions[next].prev = usize::MAX; } else { self.index.end = usize::MAX; } self.index.start = next; - val + cqe }) } /// Push to the end of the list - pub(crate) fn push(&mut self, val: (io::Result, u32)) { + pub(crate) fn push(&mut self, cqe: CqeResult) { let prev = self.index.end; let completion = Completion { - val, + cqe, next: usize::MAX, prev, }; diff --git a/src/driver/open.rs b/src/driver/open.rs index 83753c75..0d4cc8db 100644 --- a/src/driver/open.rs +++ b/src/driver/open.rs @@ -1,7 +1,7 @@ use crate::driver::{self, Op, SharedFd}; use crate::fs::{File, OpenOptions}; -use crate::driver::op::Completable; +use crate::driver::op::{self, Completable}; use std::ffi::CString; use std::io; use std::path::Path; @@ -40,7 +40,7 @@ impl Op { impl Completable for Open { type Output = io::Result; - fn complete(self, result: io::Result, _flags: u32) -> Self::Output { - Ok(File::from_shared_fd(SharedFd::new(result? as _))) + fn complete(self, cqe: op::CqeResult) -> Self::Output { + Ok(File::from_shared_fd(SharedFd::new(cqe.result? as _))) } } diff --git a/src/driver/read.rs b/src/driver/read.rs index 9ee93734..f90545e0 100644 --- a/src/driver/read.rs +++ b/src/driver/read.rs @@ -2,7 +2,7 @@ use crate::buf::IoBufMut; use crate::driver::{Op, SharedFd}; use crate::BufResult; -use crate::driver::op::Completable; +use crate::driver::op::{self, Completable}; use std::io; use std::task::{Context, Poll}; @@ -56,9 +56,9 @@ where { type Output = BufResult; - fn complete(self, result: io::Result, _flags: u32) -> Self::Output { + fn complete(self, cqe: op::CqeResult) -> Self::Output { // Convert the operation result to `usize` - let res = result.map(|v| v as usize); + let res = cqe.result.map(|v| v as usize); // Recover the buffer let mut buf = self.buf; diff --git a/src/driver/readv.rs b/src/driver/readv.rs index 60b26a04..2300ac59 100644 --- a/src/driver/readv.rs +++ b/src/driver/readv.rs @@ -2,7 +2,7 @@ use crate::buf::IoBufMut; use crate::driver::{Op, SharedFd}; use crate::BufResult; -use crate::driver::op::Completable; +use crate::driver::op::{self, Completable}; use libc::iovec; use std::io; use std::task::{Context, Poll}; @@ -75,9 +75,9 @@ where { type Output = BufResult>; - fn complete(self, result: io::Result, _flags: u32) -> Self::Output { + fn complete(self, cqe: op::CqeResult) -> Self::Output { // Convert the operation result to `usize` - let res = result.map(|v| v as usize); + let res = cqe.result.map(|v| v as usize); // Recover the buffer let mut bufs = self.bufs; diff --git a/src/driver/recv_from.rs b/src/driver/recv_from.rs index 7fba7a20..d3103e25 100644 --- a/src/driver/recv_from.rs +++ b/src/driver/recv_from.rs @@ -1,4 +1,4 @@ -use crate::driver::op::Completable; +use crate::driver::op::{self, Completable}; use crate::{ buf::IoBufMut, driver::{Op, SharedFd}, @@ -79,9 +79,9 @@ where { type Output = BufResult<(usize, SocketAddr), T>; - fn complete(self, result: io::Result, _flags: u32) -> Self::Output { + fn complete(self, cqe: op::CqeResult) -> Self::Output { // Convert the operation result to `usize` - let res = result.map(|v| v as usize); + let res = cqe.result.map(|v| v as usize); // Recover the buffer let mut buf = self.buf; diff --git a/src/driver/rename_at.rs b/src/driver/rename_at.rs index 7af8acb5..c7ddfc8e 100644 --- a/src/driver/rename_at.rs +++ b/src/driver/rename_at.rs @@ -1,6 +1,6 @@ use crate::driver::{self, Op}; -use crate::driver::op::Completable; +use crate::driver::op::{self, Completable}; use std::ffi::CString; use std::io; use std::path::Path; @@ -44,7 +44,7 @@ impl Op { impl Completable for RenameAt { type Output = io::Result<()>; - fn complete(self, result: io::Result, _flags: u32) -> Self::Output { - result.map(|_| ()) + fn complete(self, cqe: op::CqeResult) -> Self::Output { + cqe.result.map(|_| ()) } } diff --git a/src/driver/send_to.rs b/src/driver/send_to.rs index 9e62b53a..bcb549d8 100644 --- a/src/driver/send_to.rs +++ b/src/driver/send_to.rs @@ -1,5 +1,5 @@ use crate::buf::IoBuf; -use crate::driver::op::Completable; +use crate::driver::op::{self, Completable}; use crate::driver::{Op, SharedFd}; use crate::BufResult; use socket2::SockAddr; @@ -77,9 +77,9 @@ where { type Output = BufResult; - fn complete(self, result: io::Result, _flags: u32) -> Self::Output { + fn complete(self, cqe: op::CqeResult) -> Self::Output { // Convert the operation result to `usize` - let res = result.map(|v| v as usize); + let res = cqe.result.map(|v| v as usize); // Recover the buffer let buf = self.buf; diff --git a/src/driver/unlink_at.rs b/src/driver/unlink_at.rs index 59b6f297..fbebfb67 100644 --- a/src/driver/unlink_at.rs +++ b/src/driver/unlink_at.rs @@ -1,6 +1,6 @@ use crate::driver::{self, Op}; -use crate::driver::op::Completable; +use crate::driver::op::{self, Completable}; use std::ffi::CString; use std::io; use std::path::Path; @@ -42,7 +42,7 @@ impl Op { impl Completable for Unlink { type Output = io::Result<()>; - fn complete(self, result: io::Result, _flags: u32) -> Self::Output { - result.map(|_| ()) + fn complete(self, cqe: op::CqeResult) -> Self::Output { + cqe.result.map(|_| ()) } } diff --git a/src/driver/write.rs b/src/driver/write.rs index 648cda17..82addef7 100644 --- a/src/driver/write.rs +++ b/src/driver/write.rs @@ -1,4 +1,4 @@ -use crate::driver::op::Completable; +use crate::driver::op::{self, Completable}; use crate::{ buf::IoBuf, driver::{Op, SharedFd}, @@ -60,9 +60,9 @@ where { type Output = BufResult; - fn complete(self, result: io::Result, _flags: u32) -> Self::Output { + fn complete(self, cqe: op::CqeResult) -> Self::Output { // Convert the operation result to `usize` - let res = result.map(|v| v as usize); + let res = cqe.result.map(|v| v as usize); // Recover the buffer let buf = self.buf; diff --git a/src/driver/writev.rs b/src/driver/writev.rs index 3d064e7d..62c4f507 100644 --- a/src/driver/writev.rs +++ b/src/driver/writev.rs @@ -1,4 +1,4 @@ -use crate::driver::op::Completable; +use crate::driver::op::{self, Completable}; use crate::{ buf::IoBuf, driver::{Op, SharedFd}, @@ -78,9 +78,9 @@ where { type Output = BufResult>; - fn complete(self, result: io::Result, _flags: u32) -> Self::Output { + fn complete(self, cqe: op::CqeResult) -> Self::Output { // Convert the operation result to `usize` - let res = result.map(|v| v as usize); + let res = cqe.result.map(|v| v as usize); // Recover the buffer let buf = self.bufs; From 85d82cd98eda6babdd3788c9562d367ced97ce93 Mon Sep 17 00:00:00 2001 From: ollie-etl Date: Sun, 23 Oct 2022 20:49:53 +0100 Subject: [PATCH 08/17] Add comments and generalise --- src/driver/op.rs | 26 +++++--- src/driver/op/completion.rs | 116 --------------------------------- src/driver/op/slab_list.rs | 125 ++++++++++++++++++++++++++++++++++++ 3 files changed, 142 insertions(+), 125 deletions(-) delete mode 100644 src/driver/op/completion.rs create mode 100644 src/driver/op/slab_list.rs diff --git a/src/driver/op.rs b/src/driver/op.rs index f0e9e6b6..7785852d 100644 --- a/src/driver/op.rs +++ b/src/driver/op.rs @@ -8,12 +8,20 @@ use std::task::{Context, Poll, Waker}; use io_uring::{cqueue, squeue}; -mod completion; -pub(crate) use completion::*; +mod slab_list; + use slab::Slab; +use slab_list::{SlabListEntry, SlabListIndices}; use crate::driver; +/// A SlabList is used to hold unserved completions. +/// +/// This is relevant to multi-completion Operations, +/// which require an unknown number of CQE events to be +/// captured before completion. +pub(crate) type Completion = SlabListEntry; + /// In-flight operation pub(crate) struct Op { // Driver running the operation @@ -53,7 +61,7 @@ pub(crate) enum Lifecycle { /// One or more completion results have been recieved /// This holds the indices uniquely identifying the list within the slab - CompletionList(CompletionIndices), + SlabList(SlabListIndices), } /// A single CQE entry @@ -174,7 +182,7 @@ where me.index = usize::MAX; Poll::Ready(me.data.take().unwrap().complete(cqe)) } - Lifecycle::CompletionList(..) => { + Lifecycle::SlabList(..) => { unreachable!() } } @@ -198,7 +206,7 @@ impl Drop for Op { Lifecycle::Completed(..) => { inner.ops.remove(self.index); } - Lifecycle::CompletionList(indices) => { + Lifecycle::SlabList(indices) => { // Deallocate list entries, recording if the more CQE's are expected let more = { let mut list = indices.into_list(completions); @@ -224,9 +232,9 @@ impl Lifecycle { match mem::replace(self, Lifecycle::Submitted) { x @ Lifecycle::Submitted | x @ Lifecycle::Waiting(..) => { if io_uring::cqueue::more(cqe.flags) { - let mut list = CompletionIndices::new().into_list(completions); + let mut list = SlabListIndices::new().into_list(completions); list.push(cqe); - *self = Lifecycle::CompletionList(list.into_indices()); + *self = Lifecycle::SlabList(list.into_indices()); } else { *self = Lifecycle::Completed(cqe); } @@ -249,10 +257,10 @@ impl Lifecycle { // we shouldn't be receiving another. unreachable!("invalid operation state") } - Lifecycle::CompletionList(indices) => { + Lifecycle::SlabList(indices) => { let mut list = indices.into_list(completions); list.push(cqe); - *self = Lifecycle::CompletionList(list.into_indices()); + *self = Lifecycle::SlabList(list.into_indices()); false } } diff --git a/src/driver/op/completion.rs b/src/driver/op/completion.rs deleted file mode 100644 index 2fde13ae..00000000 --- a/src/driver/op/completion.rs +++ /dev/null @@ -1,116 +0,0 @@ -use slab::Slab; -use std::ops::{Deref, DerefMut}; - -use crate::driver::op::CqeResult; - -/// A linked list of CQE events -pub(crate) struct CompletionList<'a> { - index: CompletionIndices, - completions: &'a mut Slab, -} - -// Index to the first and last Completion of a single list held in the slab -pub(crate) struct CompletionIndices { - start: usize, - end: usize, -} - -/// Multi cycle operations may return an unbounded number of CQE's -/// for a single cycle SQE. -/// -/// These are held in an indexed linked list -pub(crate) struct Completion { - cqe: CqeResult, - next: usize, - prev: usize, -} - -impl Deref for Completion { - type Target = CqeResult; - - fn deref(&self) -> &Self::Target { - &self.cqe - } -} - -impl DerefMut for Completion { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.cqe - } -} - -impl CompletionIndices { - pub(crate) fn new() -> Self { - let start = usize::MAX; - CompletionIndices { start, end: start } - } - - pub(crate) fn into_list(self, completions: &mut Slab) -> CompletionList<'_> { - CompletionList::from_indices(self, completions) - } -} - -impl<'a> CompletionList<'a> { - pub(crate) fn from_indices( - index: CompletionIndices, - completions: &'a mut Slab, - ) -> Self { - CompletionList { completions, index } - } - - pub(crate) fn is_empty(&self) -> bool { - self.index.start == usize::MAX - } - - /// Peek at the end of the list (most recently pushed) - /// This leaves the list unchanged - pub(crate) fn peek_end(&mut self) -> Option<&CqeResult> { - if self.index.end == usize::MAX { - None - } else { - Some(&self.completions[self.index.end].cqe) - } - } - - /// Pop from front of list - #[allow(dead_code)] - pub(crate) fn pop(&mut self) -> Option { - self.completions - .try_remove(self.index.start) - .map(|Completion { next, cqe, .. }| { - if next != usize::MAX { - self.completions[next].prev = usize::MAX; - } else { - self.index.end = usize::MAX; - } - self.index.start = next; - cqe - }) - } - - /// Push to the end of the list - pub(crate) fn push(&mut self, cqe: CqeResult) { - let prev = self.index.end; - let completion = Completion { - cqe, - next: usize::MAX, - prev, - }; - self.index.end = self.completions.insert(completion); - self.completions[prev].next = self.index.end; - } - - /// Consume the list, without dropping entries, returning just the start and end indices - pub(crate) fn into_indices(mut self) -> CompletionIndices { - std::mem::replace(&mut self.index, CompletionIndices::new()) - } -} - -impl<'a> Drop for CompletionList<'a> { - fn drop(&mut self) { - while !self.is_empty() { - let removed = self.completions.remove(self.index.start); - self.index.start = removed.next; - } - } -} diff --git a/src/driver/op/slab_list.rs b/src/driver/op/slab_list.rs new file mode 100644 index 00000000..7eed4992 --- /dev/null +++ b/src/driver/op/slab_list.rs @@ -0,0 +1,125 @@ +//! An indexed linked list, with entries held in slab storage. +//! The slab may hold multiple independent lists concurrently. +//! +//! Each list is uniquely identified by a SlabListIndices, +//! which holds the index of the first element of the list. +//! It also holds the index of the last element, to support +//! push operations without list traversal. +//! +//! SlabListIndices may be upgraded to a SlabList, by providing +//! a reference to the backing slab. This seperates the SlabListIndices +//! from the lifetime of the storage (at type level only). +use slab::Slab; +use std::ops::{Deref, DerefMut}; + +/// A linked list backed by slab storage +pub(crate) struct SlabList<'a, T> { + index: SlabListIndices, + slab: &'a mut Slab>, +} + +// Indices to the head and tail of a single list held within a SlabList +pub(crate) struct SlabListIndices { + start: usize, + end: usize, +} + +/// Multi cycle operations may return an unbounded number of CQE's +/// for a single cycle SQE. +/// +/// These are held in an indexed linked list +pub(crate) struct SlabListEntry { + entry: T, + next: usize, + prev: usize, +} + +impl Deref for SlabListEntry { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.entry + } +} + +impl DerefMut for SlabListEntry { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.entry + } +} + +impl SlabListIndices { + pub(crate) fn new() -> Self { + let start = usize::MAX; + SlabListIndices { start, end: start } + } + + pub(crate) fn into_list(self, slab: &mut Slab>) -> SlabList<'_, T> { + SlabList::from_indices(self, slab) + } +} + +impl<'a, T> SlabList<'a, T> { + pub(crate) fn from_indices( + index: SlabListIndices, + slab: &'a mut Slab>, + ) -> Self { + SlabList { slab, index } + } + + pub(crate) fn is_empty(&self) -> bool { + self.index.start == usize::MAX + } + + /// Peek at the end of the list (most recently pushed) + /// This leaves the list unchanged + pub(crate) fn peek_end(&mut self) -> Option<&T> { + if self.index.end == usize::MAX { + None + } else { + Some(&self.slab[self.index.end].entry) + } + } + + /// Pop from front of list + #[allow(dead_code)] + pub(crate) fn pop(&mut self) -> Option { + self.slab + .try_remove(self.index.start) + .map(|SlabListEntry { next, entry, .. }| { + if next != usize::MAX { + self.slab[next].prev = usize::MAX; + } else { + self.index.end = usize::MAX; + } + self.index.start = next; + entry + }) + } + + /// Push to the end of the list + pub(crate) fn push(&mut self, entry: T) { + let prev = self.index.end; + let entry = SlabListEntry { + entry, + next: usize::MAX, + prev, + }; + self.index.end = self.slab.insert(entry); + self.slab[prev].next = self.index.end; + } + + /// Consume the list, without dropping entries, returning just the start and end indices + pub(crate) fn into_indices(mut self) -> SlabListIndices { + std::mem::replace(&mut self.index, SlabListIndices::new()) + } +} + +impl<'a, T> Drop for SlabList<'a, T> { + fn drop(&mut self) { + while !self.is_empty() { + let removed = self.slab.remove(self.index.start); + self.index.start = removed.next; + } + } +} From 6d058e06371e0663886ccc77c6a4b8a32a10e92d Mon Sep 17 00:00:00 2001 From: ollie-etl Date: Sun, 23 Oct 2022 20:52:41 +0100 Subject: [PATCH 09/17] Make singly linked --- src/driver/op/slab_list.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/driver/op/slab_list.rs b/src/driver/op/slab_list.rs index 7eed4992..3b7ec06c 100644 --- a/src/driver/op/slab_list.rs +++ b/src/driver/op/slab_list.rs @@ -31,7 +31,6 @@ pub(crate) struct SlabListIndices { pub(crate) struct SlabListEntry { entry: T, next: usize, - prev: usize, } impl Deref for SlabListEntry { @@ -87,9 +86,7 @@ impl<'a, T> SlabList<'a, T> { self.slab .try_remove(self.index.start) .map(|SlabListEntry { next, entry, .. }| { - if next != usize::MAX { - self.slab[next].prev = usize::MAX; - } else { + if next == usize::MAX { self.index.end = usize::MAX; } self.index.start = next; @@ -103,7 +100,6 @@ impl<'a, T> SlabList<'a, T> { let entry = SlabListEntry { entry, next: usize::MAX, - prev, }; self.index.end = self.slab.insert(entry); self.slab[prev].next = self.index.end; From 3050e91dde4d1fabb857afc84c84419014e4370c Mon Sep 17 00:00:00 2001 From: ollie-etl Date: Sun, 23 Oct 2022 21:53:40 +0100 Subject: [PATCH 10/17] Add some tests --- src/driver/op/slab_list.rs | 61 ++++++++++++++++++++++++++++++++++---- 1 file changed, 56 insertions(+), 5 deletions(-) diff --git a/src/driver/op/slab_list.rs b/src/driver/op/slab_list.rs index 3b7ec06c..f9400576 100644 --- a/src/driver/op/slab_list.rs +++ b/src/driver/op/slab_list.rs @@ -5,10 +5,6 @@ //! which holds the index of the first element of the list. //! It also holds the index of the last element, to support //! push operations without list traversal. -//! -//! SlabListIndices may be upgraded to a SlabList, by providing -//! a reference to the backing slab. This seperates the SlabListIndices -//! from the lifetime of the storage (at type level only). use slab::Slab; use std::ops::{Deref, DerefMut}; @@ -102,7 +98,12 @@ impl<'a, T> SlabList<'a, T> { next: usize::MAX, }; self.index.end = self.slab.insert(entry); - self.slab[prev].next = self.index.end; + if prev != usize::MAX { + self.slab[prev].next = self.index.end; + } else { + self.index.start = self.index.end; + } + } /// Consume the list, without dropping entries, returning just the start and end indices @@ -119,3 +120,53 @@ impl<'a, T> Drop for SlabList<'a, T> { } } } + + + +mod test { + use super::*; + + #[test] + fn push_pop() { + let mut slab = Slab::with_capacity(8); + let mut list = SlabListIndices::new().into_list(&mut slab); + assert!(list.is_empty()); + assert_eq!(list.pop(), None); + for i in 0..5 { + list.push(i); + assert_eq!(list.peek_end(), Some(&i)); + assert!(!list.is_empty()); + assert!(!list.slab.is_empty()); + } + for i in 0..5 { + assert_eq!(list.pop(), Some(i)) + } + assert!(list.is_empty()); + assert!(list.slab.is_empty()); + assert_eq!(list.pop(), None); + } + + #[test] + fn entries_freed_on_drop() { + let mut slab = Slab::with_capacity(8); + { + let mut list = SlabListIndices::new().into_list(&mut slab); + list.push(42); + assert!(!list.is_empty()); + } + assert!(slab.is_empty()); + } + + #[test] + fn entries_kept_on_converion_to_index() { + let mut slab = Slab::with_capacity(8); + { + let mut list = SlabListIndices::new().into_list(&mut slab); + list.push(42); + assert!(!list.is_empty()); + // This forgets the entries + let _ = list.into_indices(); + } + assert!(!slab.is_empty()); + } +} \ No newline at end of file From b7007753d7f3984c531d42c7a6c1f5309366c515 Mon Sep 17 00:00:00 2001 From: ollie-etl Date: Sun, 23 Oct 2022 21:53:59 +0100 Subject: [PATCH 11/17] fmt --- src/driver/op/slab_list.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/driver/op/slab_list.rs b/src/driver/op/slab_list.rs index f9400576..967ae5f3 100644 --- a/src/driver/op/slab_list.rs +++ b/src/driver/op/slab_list.rs @@ -103,7 +103,6 @@ impl<'a, T> SlabList<'a, T> { } else { self.index.start = self.index.end; } - } /// Consume the list, without dropping entries, returning just the start and end indices @@ -121,15 +120,13 @@ impl<'a, T> Drop for SlabList<'a, T> { } } - - mod test { use super::*; #[test] fn push_pop() { let mut slab = Slab::with_capacity(8); - let mut list = SlabListIndices::new().into_list(&mut slab); + let mut list = SlabListIndices::new().into_list(&mut slab); assert!(list.is_empty()); assert_eq!(list.pop(), None); for i in 0..5 { @@ -150,7 +147,7 @@ mod test { fn entries_freed_on_drop() { let mut slab = Slab::with_capacity(8); { - let mut list = SlabListIndices::new().into_list(&mut slab); + let mut list = SlabListIndices::new().into_list(&mut slab); list.push(42); assert!(!list.is_empty()); } @@ -161,7 +158,7 @@ mod test { fn entries_kept_on_converion_to_index() { let mut slab = Slab::with_capacity(8); { - let mut list = SlabListIndices::new().into_list(&mut slab); + let mut list = SlabListIndices::new().into_list(&mut slab); list.push(42); assert!(!list.is_empty()); // This forgets the entries @@ -169,4 +166,4 @@ mod test { } assert!(!slab.is_empty()); } -} \ No newline at end of file +} From 5b6f1aa9e25a90b2bdbfeec9d55f4e35bb82bd8c Mon Sep 17 00:00:00 2001 From: ollie-etl Date: Sun, 23 Oct 2022 22:03:31 +0100 Subject: [PATCH 12/17] Add missing cfg(test) --- src/driver/op/slab_list.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/driver/op/slab_list.rs b/src/driver/op/slab_list.rs index 967ae5f3..50122c05 100644 --- a/src/driver/op/slab_list.rs +++ b/src/driver/op/slab_list.rs @@ -120,6 +120,7 @@ impl<'a, T> Drop for SlabList<'a, T> { } } +#[cfg(test)] mod test { use super::*; From af0502bcf1d8fd28691a947928cf1b4eb650b145 Mon Sep 17 00:00:00 2001 From: ollie-etl Date: Sun, 23 Oct 2022 22:28:56 +0100 Subject: [PATCH 13/17] Fix over-zealous refactoring --- src/driver/op.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/driver/op.rs b/src/driver/op.rs index 7785852d..8a407a71 100644 --- a/src/driver/op.rs +++ b/src/driver/op.rs @@ -61,7 +61,7 @@ pub(crate) enum Lifecycle { /// One or more completion results have been recieved /// This holds the indices uniquely identifying the list within the slab - SlabList(SlabListIndices), + CompletionList(SlabListIndices), } /// A single CQE entry @@ -182,7 +182,7 @@ where me.index = usize::MAX; Poll::Ready(me.data.take().unwrap().complete(cqe)) } - Lifecycle::SlabList(..) => { + Lifecycle::CompletionList(..) => { unreachable!() } } @@ -206,7 +206,7 @@ impl Drop for Op { Lifecycle::Completed(..) => { inner.ops.remove(self.index); } - Lifecycle::SlabList(indices) => { + Lifecycle::CompletionList(indices) => { // Deallocate list entries, recording if the more CQE's are expected let more = { let mut list = indices.into_list(completions); @@ -234,7 +234,7 @@ impl Lifecycle { if io_uring::cqueue::more(cqe.flags) { let mut list = SlabListIndices::new().into_list(completions); list.push(cqe); - *self = Lifecycle::SlabList(list.into_indices()); + *self = Lifecycle::CompletionList(list.into_indices()); } else { *self = Lifecycle::Completed(cqe); } @@ -257,10 +257,10 @@ impl Lifecycle { // we shouldn't be receiving another. unreachable!("invalid operation state") } - Lifecycle::SlabList(indices) => { + Lifecycle::CompletionList(indices) => { let mut list = indices.into_list(completions); list.push(cqe); - *self = Lifecycle::SlabList(list.into_indices()); + *self = Lifecycle::CompletionList(list.into_indices()); false } } From 7f0b32380a0ecf08b1a5de547fe7b8dc79621c7d Mon Sep 17 00:00:00 2001 From: ollie-etl Date: Mon, 24 Oct 2022 10:44:24 +0100 Subject: [PATCH 14/17] Update no_op complete signature --- src/driver/noop.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/driver/noop.rs b/src/driver/noop.rs index 35f45c8e..7464a7fc 100644 --- a/src/driver/noop.rs +++ b/src/driver/noop.rs @@ -1,4 +1,4 @@ -use crate::driver::{op::Completable, Op}; +use crate::driver::{op::{self, Completable}, Op}; use std::io; /// No operation. Just posts a completion event, nothing else. @@ -17,8 +17,8 @@ impl Op { impl Completable for NoOp { type Output = io::Result<()>; - fn complete(self, _result: io::Result, _flags: u32) -> Self::Output { - Ok(()) + fn complete(self, cqe: op::CqeResult) -> Self::Output { + cqe.result.map(|_| ()) } } From d327eca6ee3d23b7034fa75086d94de1cbfc1f87 Mon Sep 17 00:00:00 2001 From: ollie-etl Date: Mon, 24 Oct 2022 10:44:45 +0100 Subject: [PATCH 15/17] fmt --- src/driver/noop.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/driver/noop.rs b/src/driver/noop.rs index 7464a7fc..0d1cdb98 100644 --- a/src/driver/noop.rs +++ b/src/driver/noop.rs @@ -1,4 +1,7 @@ -use crate::driver::{op::{self, Completable}, Op}; +use crate::driver::{ + op::{self, Completable}, + Op, +}; use std::io; /// No operation. Just posts a completion event, nothing else. From de1630f388a6fe32a935692dba65ef304f350833 Mon Sep 17 00:00:00 2001 From: ollie-etl Date: Tue, 25 Oct 2022 22:32:07 +0100 Subject: [PATCH 16/17] Address nits --- src/driver/op.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/driver/op.rs b/src/driver/op.rs index 801e1047..13bb4c2a 100644 --- a/src/driver/op.rs +++ b/src/driver/op.rs @@ -175,7 +175,7 @@ where Poll::Ready(me.data.take().unwrap().complete(cqe)) } Lifecycle::CompletionList(..) => { - unreachable!() + unreachable!("No more flag set for SinglCQE") } } } @@ -203,7 +203,7 @@ impl Drop for Op { let more = { let mut list = indices.into_list(completions); io_uring::cqueue::more(list.peek_end().unwrap().flags) - // Dropping list dealloctes the list entries + // Dropping list deallocates the list entries }; if more { // If more are expected, we have to keep the op around @@ -231,6 +231,8 @@ impl Lifecycle { *self = Lifecycle::Completed(cqe); } if let Lifecycle::Waiting(waker) = x { + // wake is called whenever we have more work. + // Also possible would be to defer calling until cqe with !more flag set waker.wake(); } false @@ -244,7 +246,10 @@ impl Lifecycle { true } } + Lifecycle::Completed(..) => { + // Completions with more flag set go straight onto the slab, + // and are handled in Lifecycle::CompletionList. // To construct Lifecycle::Completed, a CQE without MORE was received // we shouldn't be receiving another. unreachable!("invalid operation state") From 1d9c3608fceeed08f6a97eb5d8ff6139ffb55e58 Mon Sep 17 00:00:00 2001 From: ollie-etl Date: Thu, 27 Oct 2022 12:46:05 +0100 Subject: [PATCH 17/17] More docs --- src/driver/op.rs | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/src/driver/op.rs b/src/driver/op.rs index e0c7240f..11049783 100644 --- a/src/driver/op.rs +++ b/src/driver/op.rs @@ -33,7 +33,7 @@ pub(crate) struct Op { // CqeType marker _cqe_type: PhantomData, - // Make !Send + !Sync + // Make !Send + !Sync _phantom: PhantomUnsendUnsync, } @@ -155,7 +155,6 @@ where .get_mut(me.index) .expect("invalid internal state"); - match mem::replace(lifecycle, Lifecycle::Submitted) { Lifecycle::Submitted => { *lifecycle = Lifecycle::Waiting(cx.waker().clone()); @@ -176,7 +175,7 @@ where Poll::Ready(me.data.take().unwrap().complete(cqe)) } Lifecycle::CompletionList(..) => { - unreachable!("No more flag set for SinglCQE") + unreachable!("No `more` flag set for SingleCQE") } } }) @@ -184,18 +183,25 @@ where } } +/// The operation may have pending cqe's not yet processed. +/// To manage this, the lifecycle associated with the Op may if required +/// be placed in LifeCycle::Ignored state to handle cqe's which arrive after +/// the Op has been dropped. impl Drop for Op { fn drop(&mut self) { use std::mem; CONTEXT.with(|runtime_context| { runtime_context.with_driver_mut(|driver| { + // Get the Op Lifecycle state from the driver let (lifecycle, completions) = match driver.ops.get_mut(self.index) { Some(val) => val, - None => return, + None => { + // Op dropped after the driver + return; + } }; - match mem::replace(lifecycle, Lifecycle::Submitted) { Lifecycle::Submitted | Lifecycle::Waiting(_) => { *lifecycle = Lifecycle::Ignored(Box::new(self.data.take())); @@ -204,7 +210,7 @@ impl Drop for Op { driver.ops.remove(self.index); } Lifecycle::CompletionList(indices) => { - // Deallocate list entries, recording if the more CQE's are expected + // Deallocate list entries, recording if more CQE's are expected let more = { let mut list = indices.into_list(completions); io_uring::cqueue::more(list.peek_end().unwrap().flags) @@ -238,18 +244,21 @@ impl Lifecycle { *self = Lifecycle::Completed(cqe); } if let Lifecycle::Waiting(waker) = x { - // wake is called whenever we have more work. - // Also possible would be to defer calling until cqe with !more flag set + // waker is woken to notify cqe has arrived + // Note: Maybe defer calling until cqe with !`more` flag set? waker.wake(); } false } + lifecycle @ Lifecycle::Ignored(..) => { - // We must check if any more CQEs are expected before dropping if io_uring::cqueue::more(cqe.flags) { + // Not yet complete. The Op has been dropped, so we can drop the CQE + // but we must keep the lifecycle alive until no more CQE's expected *self = lifecycle; false } else { + // This Op has completed, we can drop true } } @@ -257,11 +266,14 @@ impl Lifecycle { Lifecycle::Completed(..) => { // Completions with more flag set go straight onto the slab, // and are handled in Lifecycle::CompletionList. - // To construct Lifecycle::Completed, a CQE without MORE was received + // To construct Lifecycle::Completed, a CQE with `more` flag unset was received // we shouldn't be receiving another. unreachable!("invalid operation state") } + Lifecycle::CompletionList(indices) => { + // A completion list may contain CQE's with and without `more` flag set. + // Only the final one may have `more` unset, although we don't check. let mut list = indices.into_list(completions); list.push(cqe); *self = Lifecycle::CompletionList(list.into_indices());