Skip to content

Commit

Permalink
Name fields (#149)
Browse files Browse the repository at this point in the history
Co-authored-by: ollie-etl <Oliver Bunting@etlsystems.com>
  • Loading branch information
ollie-etl and ollie-etl authored Oct 25, 2022
1 parent c6b884e commit 2297ed7
Show file tree
Hide file tree
Showing 16 changed files with 106 additions and 85 deletions.
6 changes: 3 additions & 3 deletions src/driver/accept.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::driver::op::Completable;
use crate::driver::op::{self, Completable};
use crate::driver::{Op, SharedFd, Socket};
use std::net::SocketAddr;
use std::{boxed::Box, io};
Expand Down Expand Up @@ -37,8 +37,8 @@ impl Op<Accept> {
impl Completable for Accept {
type Output = io::Result<(Socket, Option<SocketAddr>)>;

fn complete(self, result: io::Result<u32>, _flags: u32) -> Self::Output {
let fd = result?;
fn complete(self, cqe: op::CqeResult) -> Self::Output {
let fd = cqe.result?;
let fd = SharedFd::new(fd as i32);
let socket = Socket { fd };
let (_, addr) = unsafe {
Expand Down
6 changes: 3 additions & 3 deletions src/driver/close.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::driver::Op;

use crate::driver::op::Completable;
use crate::driver::op::{self, Completable};
use std::io;
use std::os::unix::io::RawFd;

Expand All @@ -21,8 +21,8 @@ impl Op<Close> {
impl Completable for Close {
type Output = io::Result<()>;

fn complete(self, result: io::Result<u32>, _flags: u32) -> Self::Output {
let _ = result?;
fn complete(self, cqe: op::CqeResult) -> Self::Output {
let _ = cqe.result?;

Ok(())
}
Expand Down
6 changes: 3 additions & 3 deletions src/driver/connect.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::driver::op::Completable;
use crate::driver::op::{self, Completable};
use crate::driver::{Op, SharedFd};
use socket2::SockAddr;
use std::io;
Expand Down Expand Up @@ -36,7 +36,7 @@ impl Op<Connect> {
impl Completable for Connect {
type Output = io::Result<()>;

fn complete(self, result: io::Result<u32>, _flags: u32) -> Self::Output {
result.map(|_| ())
fn complete(self, cqe: op::CqeResult) -> Self::Output {
cqe.result.map(|_| ())
}
}
6 changes: 3 additions & 3 deletions src/driver/fsync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::driver::{Op, SharedFd};

use std::io;

use crate::driver::op::Completable;
use crate::driver::op::{self, Completable};
use io_uring::{opcode, types};

pub(crate) struct Fsync {
Expand All @@ -28,7 +28,7 @@ impl Op<Fsync> {
impl Completable for Fsync {
type Output = io::Result<()>;

fn complete(self, result: io::Result<u32>, _flags: u32) -> Self::Output {
result.map(|_| ())
fn complete(self, cqe: op::CqeResult) -> Self::Output {
cqe.result.map(|_| ())
}
}
42 changes: 18 additions & 24 deletions src/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ mod write;

mod writev;

use io_uring::{cqueue, IoUring};
use io_uring::IoUring;
use scoped_tls::scoped_thread_local;
use slab::Slab;
use std::cell::RefCell;
Expand All @@ -61,9 +61,11 @@ pub(crate) struct Inner {
pub(crate) uring: IoUring,
}

// When dropping the driver, all in-flight operations must have completed. This
// type wraps the slab and ensures that, on drop, the slab is empty.
struct Ops(Slab<op::Lifecycle>);
struct Ops {
// When dropping the driver, all in-flight operations must have completed. This
// type wraps the slab and ensures that, on drop, the slab is empty.
lifecycle: Slab<op::Lifecycle>,
}

scoped_thread_local!(pub(crate) static CURRENT: Rc<RefCell<Inner>>);

Expand Down Expand Up @@ -98,7 +100,7 @@ impl Driver {

fn num_operations(&self) -> usize {
let inner = self.inner.borrow();
inner.ops.0.len()
inner.ops.lifecycle.len()
}
}

Expand All @@ -117,7 +119,7 @@ impl Inner {

let index = cqe.user_data() as _;

self.ops.complete(index, resultify(&cqe), cqe.flags());
self.ops.complete(index, cqe.into());
}
}

Expand Down Expand Up @@ -158,42 +160,34 @@ impl Drop for Driver {

impl Ops {
fn new() -> Ops {
Ops(Slab::with_capacity(64))
Ops {
lifecycle: Slab::with_capacity(64),
}
}

fn get_mut(&mut self, index: usize) -> Option<&mut op::Lifecycle> {
self.0.get_mut(index)
self.lifecycle.get_mut(index)
}

// Insert a new operation
fn insert(&mut self) -> usize {
self.0.insert(op::Lifecycle::Submitted)
self.lifecycle.insert(op::Lifecycle::Submitted)
}

// Remove an operation
fn remove(&mut self, index: usize) {
self.0.remove(index);
self.lifecycle.remove(index);
}

fn complete(&mut self, index: usize, result: io::Result<u32>, flags: u32) {
if self.0[index].complete(result, flags) {
self.0.remove(index);
fn complete(&mut self, index: usize, cqe: op::CqeResult) {
if self.lifecycle[index].complete(cqe) {
self.lifecycle.remove(index);
}
}
}

impl Drop for Ops {
fn drop(&mut self) {
assert!(self.0.is_empty());
}
}

fn resultify(cqe: &cqueue::Entry) -> io::Result<u32> {
let res = cqe.result();

if res >= 0 {
Ok(res as u32)
} else {
Err(io::Error::from_raw_os_error(-res))
assert!(self.lifecycle.is_empty());
}
}
9 changes: 6 additions & 3 deletions src/driver/noop.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::driver::{op::Completable, Op};
use crate::driver::{
op::{self, Completable},
Op,
};
use std::io;

/// No operation. Just posts a completion event, nothing else.
Expand All @@ -17,8 +20,8 @@ impl Op<NoOp> {
impl Completable for NoOp {
type Output = io::Result<()>;

fn complete(self, _result: io::Result<u32>, _flags: u32) -> Self::Output {
Ok(())
fn complete(self, cqe: op::CqeResult) -> Self::Output {
cqe.result.map(|_| ())
}
}

Expand Down
62 changes: 43 additions & 19 deletions src/driver/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll, Waker};

use io_uring::squeue;
use io_uring::{cqueue, squeue};

use crate::driver;

Expand All @@ -23,7 +23,7 @@ pub(crate) struct Op<T: 'static> {

pub(crate) trait Completable {
type Output;
fn complete(self, result: io::Result<u32>, flags: u32) -> Self::Output;
fn complete(self, cqe: CqeResult) -> Self::Output;
}

pub(crate) enum Lifecycle {
Expand All @@ -37,16 +37,36 @@ pub(crate) enum Lifecycle {
/// must be passed to the driver and held until the operation completes.
Ignored(Box<dyn std::any::Any>),

/// The operation has completed.
Completed(io::Result<u32>, u32),
/// The operation has completed with a single cqe result
Completed(CqeResult),
}

/// A single CQE entry
pub(crate) struct CqeResult {
pub(crate) result: io::Result<u32>,
#[allow(dead_code)]
pub(crate) flags: u32,
}

impl From<cqueue::Entry> for CqeResult {
fn from(cqe: cqueue::Entry) -> Self {
let res = cqe.result();
let flags = cqe.flags();
let result = if res >= 0 {
Ok(res as u32)
} else {
Err(io::Error::from_raw_os_error(-res))
};
CqeResult { result, flags }
}
}

impl<T> Op<T>
where
T: Completable,
{
/// Create a new operation
fn new(data: T, inner: &mut driver::Inner, inner_rc: &Rc<RefCell<driver::Inner>>) -> Op<T> {
fn new(data: T, inner: &mut driver::Inner, inner_rc: &Rc<RefCell<driver::Inner>>) -> Self {
Op {
driver: inner_rc.clone(),
index: inner.ops.insert(),
Expand All @@ -58,7 +78,7 @@ where
///
/// `state` is stored during the operation tracking any state submitted to
/// the kernel.
pub(super) fn submit_with<F>(data: T, f: F) -> io::Result<Op<T>>
pub(super) fn submit_with<F>(data: T, f: F) -> io::Result<Self>
where
F: FnOnce(&mut T) -> squeue::Entry,
{
Expand All @@ -83,7 +103,7 @@ where
}

/// Try submitting an operation to uring
pub(super) fn try_submit_with<F>(data: T, f: F) -> io::Result<Op<T>>
pub(super) fn try_submit_with<F>(data: T, f: F) -> io::Result<Self>
where
F: FnOnce(&mut T) -> squeue::Entry,
{
Expand Down Expand Up @@ -122,11 +142,10 @@ where
Poll::Pending
}
Lifecycle::Ignored(..) => unreachable!(),
Lifecycle::Completed(result, flags) => {
Lifecycle::Completed(cqe) => {
inner.ops.remove(me.index);
me.index = usize::MAX;

Poll::Ready(me.data.take().unwrap().complete(result, flags))
Poll::Ready(me.data.take().unwrap().complete(cqe))
}
}
}
Expand All @@ -153,16 +172,16 @@ impl<T> Drop for Op<T> {
}

impl Lifecycle {
pub(super) fn complete(&mut self, result: io::Result<u32>, flags: u32) -> bool {
pub(super) fn complete(&mut self, cqe: CqeResult) -> bool {
use std::mem;

match mem::replace(self, Lifecycle::Submitted) {
Lifecycle::Submitted => {
*self = Lifecycle::Completed(result, flags);
*self = Lifecycle::Completed(cqe);
false
}
Lifecycle::Waiting(waker) => {
*self = Lifecycle::Completed(result, flags);
*self = Lifecycle::Completed(cqe);
waker.wake();
false
}
Expand Down Expand Up @@ -190,10 +209,10 @@ mod test {
impl Completable for Rc<()> {
type Output = Completion;

fn complete(self, result: io::Result<u32>, flags: u32) -> Self::Output {
fn complete(self, cqe: CqeResult) -> Self::Output {
Completion {
result,
flags,
result: cqe.result,
flags: cqe.flags,
data: self.clone(),
}
}
Expand Down Expand Up @@ -304,7 +323,11 @@ mod test {
assert_eq!(2, Rc::strong_count(&data));

assert_eq!(1, driver.num_operations());
driver.inner.borrow_mut().ops.complete(index, Ok(1), 0);
let cqe = CqeResult {
result: Ok(1),
flags: 0,
};
driver.inner.borrow_mut().ops.complete(index, cqe);
assert_eq!(1, Rc::strong_count(&data));
assert_eq!(0, driver.num_operations());
release(driver);
Expand All @@ -326,11 +349,12 @@ mod test {
}

fn complete(op: &Op<Rc<()>>, result: io::Result<u32>) {
op.driver.borrow_mut().ops.complete(op.index, result, 0);
let cqe = CqeResult { result, flags: 0 };
op.driver.borrow_mut().ops.complete(op.index, cqe);
}

fn release(driver: crate::driver::Driver) {
// Clear ops, we aren't really doing any I/O
driver.inner.borrow_mut().ops.0.clear();
driver.inner.borrow_mut().ops.lifecycle.clear();
}
}
6 changes: 3 additions & 3 deletions src/driver/open.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::driver::{self, Op, SharedFd};
use crate::fs::{File, OpenOptions};

use crate::driver::op::Completable;
use crate::driver::op::{self, Completable};
use std::ffi::CString;
use std::io;
use std::path::Path;
Expand Down Expand Up @@ -40,7 +40,7 @@ impl Op<Open> {
impl Completable for Open {
type Output = io::Result<File>;

fn complete(self, result: io::Result<u32>, _flags: u32) -> Self::Output {
Ok(File::from_shared_fd(SharedFd::new(result? as _)))
fn complete(self, cqe: op::CqeResult) -> Self::Output {
Ok(File::from_shared_fd(SharedFd::new(cqe.result? as _)))
}
}
6 changes: 3 additions & 3 deletions src/driver/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::buf::IoBufMut;
use crate::driver::{Op, SharedFd};
use crate::BufResult;

use crate::driver::op::Completable;
use crate::driver::op::{self, Completable};
use std::io;

pub(crate) struct Read<T> {
Expand Down Expand Up @@ -42,9 +42,9 @@ where
{
type Output = BufResult<usize, T>;

fn complete(self, result: io::Result<u32>, _flags: u32) -> Self::Output {
fn complete(self, cqe: op::CqeResult) -> Self::Output {
// Convert the operation result to `usize`
let res = result.map(|v| v as usize);
let res = cqe.result.map(|v| v as usize);
// Recover the buffer
let mut buf = self.buf;

Expand Down
6 changes: 3 additions & 3 deletions src/driver/readv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::buf::IoBufMut;
use crate::driver::{Op, SharedFd};
use crate::BufResult;

use crate::driver::op::Completable;
use crate::driver::op::{self, Completable};
use libc::iovec;
use std::io;

Expand Down Expand Up @@ -61,9 +61,9 @@ where
{
type Output = BufResult<usize, Vec<T>>;

fn complete(self, result: io::Result<u32>, _flags: u32) -> Self::Output {
fn complete(self, cqe: op::CqeResult) -> Self::Output {
// Convert the operation result to `usize`
let res = result.map(|v| v as usize);
let res = cqe.result.map(|v| v as usize);
// Recover the buffer
let mut bufs = self.bufs;

Expand Down
Loading

0 comments on commit 2297ed7

Please sign in to comment.