diff --git a/Cargo.toml b/Cargo.toml index ff326c7a6..6521f4676 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ mio = { version = "0.8", features = ["os-poll", "os-ext"]} nix = "0.24" lazy_static = "1.4" tokio = { version = "1", optional = true } +tokio-uring = { version = "0.4.0", optional = true } vmm-sys-util = { version = "0.10", optional = true } vm-memory = { version = "0.9", features = ["backend-mmap"] } virtio-queue = { version = "0.4", optional = true } @@ -36,11 +37,7 @@ core-foundation-sys = { version = ">=0.8", optional = true } [target.'cfg(target_os = "linux")'.dependencies] caps = { version = "0.5", optional = true } -tokio-uring = "0.3" -io-uring = { version = "0.5", features = ["unstable"] } -socket2 = { version = "0.4.4", features = ["all"] } -scoped-tls = "1.0.0" -slab = "0.4.6" +tokio-uring = "0.4" [dev-dependencies] tokio-test = "0.4.2" @@ -49,7 +46,7 @@ vm-memory = { version = "0.9", features = ["backend-mmap", "backend-bitmap"] } [features] default = ["fusedev"] -async-io = ["async-trait", "tokio/fs", "tokio/net", "tokio/sync", "tokio/rt", "tokio/macros"] +async-io = ["async-trait", "tokio-uring", "tokio/fs", "tokio/net", "tokio/sync", "tokio/rt", "tokio/macros"] fusedev = ["vmm-sys-util", "caps", "core-foundation-sys"] virtiofs = ["virtio-queue", "caps"] vhost-user-fs = ["virtiofs", "vhost", "caps"] diff --git a/README.md b/README.md index 46c2f5b8f..8235816cf 100644 --- a/README.md +++ b/README.md @@ -93,10 +93,3 @@ impl FuseServer { This project is licensed under - [Apache License](http://www.apache.org/licenses/LICENSE-2.0), Version 2.0 - [BSD-3-Clause License](https://opensource.org/licenses/BSD-3-Clause) - -Source code under [src/tokio-uring] is temporarily copied from [tokio-uring](https://github.com/tokio-rs/tokio-uring) -with modifications, which is licensed under [MIT](https://github.com/tokio-rs/tokio-uring/blob/master/LICENSE). - -We will use `crate.io` directly instead of this temporary copy when the pendding PRs is merged. -https://github.com/tokio-rs/tokio-uring/pull/87 -https://github.com/tokio-rs/tokio-uring/pull/88 diff --git a/src/common/async_file.rs b/src/common/async_file.rs index b1653dfb4..82babf632 100644 --- a/src/common/async_file.rs +++ b/src/common/async_file.rs @@ -44,7 +44,7 @@ impl File { .await .map(File::Tokio), #[cfg(target_os = "linux")] - 2 => crate::tokio_uring::fs::OpenOptions::new() + 2 => tokio_uring::fs::OpenOptions::new() .read(true) .write(write) .create(create) @@ -79,7 +79,7 @@ impl File { // Safety: we rely on tokio_uring::fs::File internal implementation details. // It should be implemented as self.async_try_clone().await.unwrap().read_at, // but that causes two more syscalls. - let file = unsafe { crate::tokio_uring::fs::File::from_raw_fd(*fd) }; + let file = unsafe { tokio_uring::fs::File::from_raw_fd(*fd) }; let res = file.read_at(buf, offset).await; std::mem::forget(file); res @@ -105,7 +105,7 @@ impl File { // Safety: we rely on tokio_uring::fs::File internal implementation details. // It should be implemented as self.async_try_clone().await.unwrap().readv_at, // but that causes two more syscalls. - let file = unsafe { crate::tokio_uring::fs::File::from_raw_fd(*fd) }; + let file = unsafe { tokio_uring::fs::File::from_raw_fd(*fd) }; let res = file.readv_at(bufs, offset).await; std::mem::forget(file); res @@ -132,7 +132,7 @@ impl File { // Safety: we rely on tokio_uring::fs::File internal implementation details. // It should be implemented as self.async_try_clone().await.unwrap().write_at, // but that causes two more syscalls. - let file = unsafe { crate::tokio_uring::fs::File::from_raw_fd(*fd) }; + let file = unsafe { tokio_uring::fs::File::from_raw_fd(*fd) }; let res = file.write_at(buf, offset).await; std::mem::forget(file); res @@ -158,7 +158,7 @@ impl File { // Safety: we rely on tokio_uring::fs::File internal implementation details. // It should be implemented as self.async_try_clone().await.unwrap().writev_at, // but that causes two more syscalls. - let file = unsafe { crate::tokio_uring::fs::File::from_raw_fd(*fd) }; + let file = unsafe { tokio_uring::fs::File::from_raw_fd(*fd) }; let res = file.writev_at(bufs, offset).await; std::mem::forget(file); res @@ -214,7 +214,7 @@ impl Drop for File { fn drop(&mut self) { #[cfg(target_os = "linux")] if let File::Uring(fd) = self { - let _ = unsafe { crate::tokio_uring::fs::File::from_raw_fd(*fd) }; + let _ = unsafe { tokio_uring::fs::File::from_raw_fd(*fd) }; } } } diff --git a/src/common/async_runtime.rs b/src/common/async_runtime.rs index 99fc1f262..aab9d42b3 100644 --- a/src/common/async_runtime.rs +++ b/src/common/async_runtime.rs @@ -12,7 +12,7 @@ pub enum Runtime { Tokio(tokio::runtime::Runtime), #[cfg(target_os = "linux")] /// Tokio-uring Runtime. - Uring(std::sync::Mutex), + Uring(std::sync::Mutex), } impl Runtime { @@ -28,7 +28,7 @@ impl Runtime { #[cfg(target_os = "linux")] { // TODO: use io-uring probe to detect supported operations. - if let Ok(rt) = crate::tokio_uring::Runtime::new() { + if let Ok(rt) = tokio_uring::Runtime::new(&tokio_uring::builder()) { return Runtime::Uring(std::sync::Mutex::new(rt)); } } @@ -93,7 +93,7 @@ pub fn spawn(task: T) -> tokio::task::JoinHand CURRENT_RUNTIME.with(|rt| match rt { Runtime::Tokio(_) => tokio::task::spawn_local(task), #[cfg(target_os = "linux")] - Runtime::Uring(_) => crate::tokio_uring::spawn(task), + Runtime::Uring(_) => tokio_uring::spawn(task), }) } diff --git a/src/common/file_buf.rs b/src/common/file_buf.rs index d0d186938..17c73e826 100644 --- a/src/common/file_buf.rs +++ b/src/common/file_buf.rs @@ -354,14 +354,11 @@ impl FileVolatileBuf { } } -#[cfg(all(feature = "async-io", target_os = "linux"))] -pub use crate::tokio_uring::buf::{IoBuf, IoBufMut, Slice}; - #[cfg(all(feature = "async-io", target_os = "linux"))] mod async_io { use super::*; - unsafe impl crate::tokio_uring::buf::IoBuf for FileVolatileBuf { + unsafe impl tokio_uring::buf::IoBuf for FileVolatileBuf { fn stable_ptr(&self) -> *const u8 { self.addr as *const u8 } @@ -375,7 +372,7 @@ mod async_io { } } - unsafe impl crate::tokio_uring::buf::IoBufMut for FileVolatileBuf { + unsafe impl tokio_uring::buf::IoBufMut for FileVolatileBuf { fn stable_mut_ptr(&mut self) -> *mut u8 { self.addr as *mut u8 } @@ -388,7 +385,7 @@ mod async_io { #[cfg(test)] mod tests { use super::*; - use crate::tokio_uring::buf::{IoBuf, IoBufMut}; + use tokio_uring::buf::{IoBuf, IoBufMut}; #[test] fn test_new_file_volatile_buf() { diff --git a/src/common/file_traits.rs b/src/common/file_traits.rs index e21198b99..965925c5a 100644 --- a/src/common/file_traits.rs +++ b/src/common/file_traits.rs @@ -455,11 +455,11 @@ mod async_io { use std::sync::Arc; use tokio::join; + use tokio_uring::buf::IoBuf; use super::*; use crate::async_file::File; use crate::file_buf::FileVolatileBuf; - use crate::tokio_uring::buf::IoBuf; /// Extension of [FileReadWriteVolatile] to support io-uring based asynchronous IO. /// diff --git a/src/common/mod.rs b/src/common/mod.rs index 588243f85..a49cb794a 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -24,13 +24,6 @@ pub mod async_runtime; #[cfg(feature = "async-io")] pub mod mpmc; -// Temporarily include all source code tokio-uring. -// Will switch to upstream once our enhancement have been merged and new version available. -#[cfg(all(feature = "async-io", target_os = "linux"))] -pub mod tokio_uring; -#[cfg(all(feature = "async-io", target_os = "linux"))] -pub(crate) use self::tokio_uring::{buf, driver, fs, future, BufResult}; - #[cfg(target_os = "linux")] #[doc(hidden)] pub use libc::{off64_t, pread64, preadv64, pwrite64, pwritev64}; diff --git a/src/common/tokio_uring/buf/io_buf.rs b/src/common/tokio_uring/buf/io_buf.rs deleted file mode 100644 index b4676627a..000000000 --- a/src/common/tokio_uring/buf/io_buf.rs +++ /dev/null @@ -1,162 +0,0 @@ -use crate::buf::Slice; - -use std::ops; - -/// An `io-uring` compatible buffer. -/// -/// The `IoBuf` trait is implemented by buffer types that can be passed to -/// io-uring operations. Users will not need to use this trait directly, except -/// for the [`slice`] method. -/// -/// # Slicing -/// -/// Because buffers are passed by ownership to the runtime, Rust's slice API -/// (`&buf[..]`) cannot be used. Instead, `tokio-uring` provides an owned slice -/// API: [`slice()`]. The method takes ownership fo the buffer and returns a -/// `Slice` type that tracks the requested offset. -/// -/// # Implementation notes -/// -/// Buffers passed to `io-uring` operations must reference a stable memory -/// region. While the runtime holds ownership to a buffer, the pointer returned -/// by `stable_ptr` must remain valid even if the `IoBuf` value is moved. -/// -/// [`slice()`]: IoBuf::slice -pub unsafe trait IoBuf: Unpin + 'static { - /// Returns a raw pointer to the vector’s buffer. - /// - /// This method is to be used by the `tokio-uring` runtime and it is not - /// expected for users to call it directly. - /// - /// The implementation must ensure that, while the `tokio-uring` runtime - /// owns the value, the pointer returned by `stable_ptr` **does not** - /// change. - fn stable_ptr(&self) -> *const u8; - - /// Number of initialized bytes. - /// - /// This method is to be used by the `tokio-uring` runtime and it is not - /// expected for users to call it directly. - /// - /// For `Vec`, this is identical to `len()`. - fn bytes_init(&self) -> usize; - - /// Total size of the buffer, including uninitialized memory, if any. - /// - /// This method is to be used by the `tokio-uring` runtime and it is not - /// expected for users to call it directly. - /// - /// For `Vec`, this is identical to `capacity()`. - fn bytes_total(&self) -> usize; - - /// Returns a view of the buffer with the specified range. - /// - /// This method is similar to Rust's slicing (`&buf[..]`), but takes - /// ownership of the buffer. - /// - /// # Examples - /// - /// ``` - /// use tokio_uring::buf::IoBuf; - /// - /// let buf = b"hello world".to_vec(); - /// buf.slice(5..10); - /// ``` - fn slice(self, range: impl ops::RangeBounds) -> Slice - where - Self: Sized, - { - use core::ops::Bound; - - let begin = match range.start_bound() { - Bound::Included(&n) => n, - Bound::Excluded(&n) => n + 1, - Bound::Unbounded => 0, - }; - - assert!(begin < self.bytes_total()); - - let end = match range.end_bound() { - Bound::Included(&n) => n.checked_add(1).expect("out of range"), - Bound::Excluded(&n) => n, - Bound::Unbounded => self.bytes_total(), - }; - - assert!(end <= self.bytes_total()); - assert!(begin <= self.bytes_init()); - - Slice::new(self, begin, end) - } -} - -unsafe impl IoBuf for Vec { - fn stable_ptr(&self) -> *const u8 { - self.as_ptr() - } - - fn bytes_init(&self) -> usize { - self.len() - } - - fn bytes_total(&self) -> usize { - self.capacity() - } -} - -unsafe impl IoBuf for &'static [u8] { - fn stable_ptr(&self) -> *const u8 { - self.as_ptr() - } - - fn bytes_init(&self) -> usize { - <[u8]>::len(self) - } - - fn bytes_total(&self) -> usize { - self.bytes_init() - } -} - -unsafe impl IoBuf for &'static str { - fn stable_ptr(&self) -> *const u8 { - self.as_ptr() - } - - fn bytes_init(&self) -> usize { - ::len(self) - } - - fn bytes_total(&self) -> usize { - self.bytes_init() - } -} - -#[cfg(feature = "bytes")] -unsafe impl IoBuf for bytes::Bytes { - fn stable_ptr(&self) -> *const u8 { - self.as_ptr() - } - - fn bytes_init(&self) -> usize { - self.len() - } - - fn bytes_total(&self) -> usize { - self.len() - } -} - -#[cfg(feature = "bytes")] -unsafe impl IoBuf for bytes::BytesMut { - fn stable_ptr(&self) -> *const u8 { - self.as_ptr() - } - - fn bytes_init(&self) -> usize { - self.len() - } - - fn bytes_total(&self) -> usize { - self.capacity() - } -} diff --git a/src/common/tokio_uring/buf/io_buf_mut.rs b/src/common/tokio_uring/buf/io_buf_mut.rs deleted file mode 100644 index 905d34de3..000000000 --- a/src/common/tokio_uring/buf/io_buf_mut.rs +++ /dev/null @@ -1,59 +0,0 @@ -use crate::buf::IoBuf; - -/// A mutable`io-uring` compatible buffer. -/// -/// The `IoBufMut` trait is implemented by buffer types that can be passed to -/// io-uring operations. Users will not need to use this trait directly. -/// -/// # Implementation notes -/// -/// Buffers passed to `io-uring` operations must reference a stable memory -/// region. While the runtime holds ownership to a buffer, the pointer returned -/// by `stable_mut_ptr` must remain valid even if the `IoBufMut` value is moved. -pub unsafe trait IoBufMut: IoBuf { - /// Returns a raw mutable pointer to the vector’s buffer. - /// - /// This method is to be used by the `tokio-uring` runtime and it is not - /// expected for users to call it directly. - /// - /// The implementation must ensure that, while the `tokio-uring` runtime - /// owns the value, the pointer returned by `stable_mut_ptr` **does not** - /// change. - fn stable_mut_ptr(&mut self) -> *mut u8; - - /// Updates the number of initialized bytes. - /// - /// The specified `pos` becomes the new value returned by - /// `IoBuf::bytes_init`. - /// - /// # Safety - /// - /// The caller must ensure that all bytes starting at `stable_mut_ptr()` up - /// to `pos` are initialized and owned by the buffer. - unsafe fn set_init(&mut self, pos: usize); -} - -unsafe impl IoBufMut for Vec { - fn stable_mut_ptr(&mut self) -> *mut u8 { - self.as_mut_ptr() - } - - unsafe fn set_init(&mut self, init_len: usize) { - if self.len() < init_len { - self.set_len(init_len); - } - } -} - -#[cfg(feature = "bytes")] -unsafe impl IoBufMut for bytes::BytesMut { - fn stable_mut_ptr(&mut self) -> *mut u8 { - self.as_mut_ptr() - } - - unsafe fn set_init(&mut self, init_len: usize) { - if self.len() < init_len { - self.set_len(init_len); - } - } -} diff --git a/src/common/tokio_uring/buf/mod.rs b/src/common/tokio_uring/buf/mod.rs deleted file mode 100644 index 50c021d17..000000000 --- a/src/common/tokio_uring/buf/mod.rs +++ /dev/null @@ -1,26 +0,0 @@ -//! Utilities for working with buffers. -//! -//! `io-uring` APIs require passing ownership of buffers to the runtime. The -//! crate defines [`IoBuf`] and [`IoBufMut`] traits which are implemented by buffer -//! types that respect the `io-uring` contract. - -mod io_buf; -pub use io_buf::IoBuf; - -mod io_buf_mut; -pub use io_buf_mut::IoBufMut; - -mod slice; -pub use slice::Slice; - -pub(crate) fn deref(buf: &impl IoBuf) -> &[u8] { - // Safety: the `IoBuf` trait is marked as unsafe and is expected to be - // implemented correctly. - unsafe { std::slice::from_raw_parts(buf.stable_ptr(), buf.bytes_init()) } -} - -pub(crate) fn deref_mut(buf: &mut impl IoBufMut) -> &mut [u8] { - // Safety: the `IoBufMut` trait is marked as unsafe and is expected to be - // implemented correct. - unsafe { std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_init()) } -} diff --git a/src/common/tokio_uring/buf/slice.rs b/src/common/tokio_uring/buf/slice.rs deleted file mode 100644 index a76a78253..000000000 --- a/src/common/tokio_uring/buf/slice.rs +++ /dev/null @@ -1,167 +0,0 @@ -use crate::buf::{IoBuf, IoBufMut}; - -use std::cmp; -use std::ops; - -/// An owned view into a contiguous sequence of bytes. -/// -/// This is similar to Rust slices (`&buf[..]`) but owns the underlying buffer. -/// This type is useful for performing io-uring read and write operations using -/// a subset of a buffer. -/// -/// Slices are created using [`IoBuf::slice`]. -/// -/// # Examples -/// -/// Creating a slice -/// -/// ``` -/// use tokio_uring::buf::IoBuf; -/// -/// let buf = b"hello world".to_vec(); -/// let slice = buf.slice(..5); -/// -/// assert_eq!(&slice[..], b"hello"); -/// ``` -pub struct Slice { - buf: T, - begin: usize, - end: usize, -} - -impl Slice { - pub(crate) fn new(buf: T, begin: usize, end: usize) -> Slice { - Slice { buf, begin, end } - } - - /// Offset in the underlying buffer at which this slice starts. - /// - /// # Examples - /// - /// ``` - /// use tokio_uring::buf::IoBuf; - /// - /// let buf = b"hello world".to_vec(); - /// let slice = buf.slice(1..5); - /// - /// assert_eq!(1, slice.begin()); - /// ``` - pub fn begin(&self) -> usize { - self.begin - } - - /// Ofset in the underlying buffer at which this slice ends. - /// - /// # Examples - /// - /// ``` - /// use tokio_uring::buf::IoBuf; - /// - /// let buf = b"hello world".to_vec(); - /// let slice = buf.slice(1..5); - /// - /// assert_eq!(5, slice.end()); - /// ``` - pub fn end(&self) -> usize { - self.end - } - - /// Gets a reference to the underlying buffer. - /// - /// This method escapes the slice's view. - /// - /// # Examples - /// - /// ``` - /// use tokio_uring::buf::IoBuf; - /// - /// let buf = b"hello world".to_vec(); - /// let slice = buf.slice(..5); - /// - /// assert_eq!(slice.get_ref(), b"hello world"); - /// assert_eq!(&slice[..], b"hello"); - /// ``` - pub fn get_ref(&self) -> &T { - &self.buf - } - - /// Gets a mutable reference to the underlying buffer. - /// - /// This method escapes the slice's view. - /// - /// # Examples - /// - /// ``` - /// use tokio_uring::buf::IoBuf; - /// - /// let buf = b"hello world".to_vec(); - /// let mut slice = buf.slice(..5); - /// - /// slice.get_mut()[0] = b'b'; - /// - /// assert_eq!(slice.get_mut(), b"bello world"); - /// assert_eq!(&slice[..], b"bello"); - /// ``` - pub fn get_mut(&mut self) -> &mut T { - &mut self.buf - } - - /// Unwraps this `Slice`, returning the underlying buffer. - /// - /// # Examples - /// - /// ``` - /// use tokio_uring::buf::IoBuf; - /// - /// let buf = b"hello world".to_vec(); - /// let slice = buf.slice(..5); - /// - /// let buf = slice.into_inner(); - /// assert_eq!(buf, b"hello world"); - /// ``` - pub fn into_inner(self) -> T { - self.buf - } -} - -impl ops::Deref for Slice { - type Target = [u8]; - - fn deref(&self) -> &[u8] { - let buf_bytes = super::deref(&self.buf); - let end = cmp::min(self.end, buf_bytes.len()); - &buf_bytes[self.begin..end] - } -} - -impl ops::DerefMut for Slice { - fn deref_mut(&mut self) -> &mut [u8] { - let buf_bytes = super::deref_mut(&mut self.buf); - let end = cmp::min(self.end, buf_bytes.len()); - &mut buf_bytes[self.begin..end] - } -} - -unsafe impl IoBuf for Slice { - fn stable_ptr(&self) -> *const u8 { - super::deref(&self.buf)[self.begin..].as_ptr() - } - - fn bytes_init(&self) -> usize { - ops::Deref::deref(self).len() - } - - fn bytes_total(&self) -> usize { - self.end - self.begin - } -} - -unsafe impl IoBufMut for Slice { - fn stable_mut_ptr(&mut self) -> *mut u8 { - super::deref_mut(&mut self.buf)[self.begin..].as_mut_ptr() - } - - unsafe fn set_init(&mut self, pos: usize) { - self.buf.set_init(self.begin + pos); - } -} diff --git a/src/common/tokio_uring/buf/slice_mut.rs b/src/common/tokio_uring/buf/slice_mut.rs deleted file mode 100644 index a09f3e90a..000000000 --- a/src/common/tokio_uring/buf/slice_mut.rs +++ /dev/null @@ -1,78 +0,0 @@ -use crate::buf::{IoBuf, IoBufMut}; - -use std::ops; - -pub struct SliceMut { - buf: T, - begin: usize, - end: usize, -} - -impl SliceMut { - pub(crate) fn new(buf: T, begin: usize, end: usize) -> SliceMut { - SliceMut { - buf, - begin, - end, - } - } - - /// Offset in the underlying buffer at which this slice starts. - pub fn begin(&self) -> usize { - self.begin - } - - pub fn end(&self) -> usize { - self.end - } - - pub fn get_ref(&self) -> &T { - &self.buf - } - - pub fn get_mut(&mut self) -> &mut T { - &mut self.buf - } - - pub fn into_inner(self) -> T { - self.buf - } -} - -impl ops::Deref for SliceMut { - type Target = [u8]; - - fn deref(&self) -> &[u8] { - &super::deref(&self.buf)[self.begin..self.end] - } -} - -impl ops::DerefMut for SliceMut { - fn deref_mut(&mut self) -> &mut [u8] { - &mut super::deref_mut(&mut self.buf)[self.begin..self.end] - } -} - -unsafe impl IoBuf for SliceMut { - fn stable_ptr(&self) -> *const u8 { - ops::Deref::deref(self).as_ptr() - } - - fn len(&self) -> usize { - self.end - self.begin - } -} - -unsafe impl IoBufMut for SliceMut { - fn stable_mut_ptr(&mut self) -> *mut u8 { - ops::DerefMut::deref_mut(self).as_mut_ptr() - } - - fn capacity(&self) -> usize { - self.end - self.begin - } - - unsafe fn set_init(&mut self, pos: usize) { - self.buf.set_init(self.begin + pos); - } -} diff --git a/src/common/tokio_uring/driver/accept.rs b/src/common/tokio_uring/driver/accept.rs deleted file mode 100644 index 70f7d21a5..000000000 --- a/src/common/tokio_uring/driver/accept.rs +++ /dev/null @@ -1,33 +0,0 @@ -use crate::driver::{Op, SharedFd}; -use std::{boxed::Box, io}; - -pub(crate) struct Accept { - fd: SharedFd, - pub(crate) socketaddr: Box<(libc::sockaddr_storage, libc::socklen_t)>, -} - -impl Op { - pub(crate) fn accept(fd: &SharedFd) -> io::Result> { - use io_uring::{opcode, types}; - - let socketaddr = Box::new(( - unsafe { std::mem::zeroed() }, - std::mem::size_of::() as libc::socklen_t, - )); - Op::submit_with( - Accept { - fd: fd.clone(), - socketaddr, - }, - |accept| { - opcode::Accept::new( - types::Fd(accept.fd.raw_fd()), - &mut accept.socketaddr.0 as *mut _ as *mut _, - &mut accept.socketaddr.1, - ) - .flags(libc::O_CLOEXEC) - .build() - }, - ) - } -} diff --git a/src/common/tokio_uring/driver/bind.rs b/src/common/tokio_uring/driver/bind.rs deleted file mode 100644 index e69de29bb..000000000 diff --git a/src/common/tokio_uring/driver/close.rs b/src/common/tokio_uring/driver/close.rs deleted file mode 100644 index 35d37c6b2..000000000 --- a/src/common/tokio_uring/driver/close.rs +++ /dev/null @@ -1,18 +0,0 @@ -use crate::driver::Op; - -use std::io; -use std::os::unix::io::RawFd; - -pub(crate) struct Close { - fd: RawFd, -} - -impl Op { - pub(crate) fn close(fd: RawFd) -> io::Result> { - use io_uring::{opcode, types}; - - Op::try_submit_with(Close { fd }, |close| { - opcode::Close::new(types::Fd(close.fd)).build() - }) - } -} diff --git a/src/common/tokio_uring/driver/connect.rs b/src/common/tokio_uring/driver/connect.rs deleted file mode 100644 index d2ec5aec8..000000000 --- a/src/common/tokio_uring/driver/connect.rs +++ /dev/null @@ -1,31 +0,0 @@ -use crate::driver::{Op, SharedFd}; -use socket2::SockAddr; -use std::io; - -/// Open a file -pub(crate) struct Connect { - fd: SharedFd, - socket_addr: SockAddr, -} - -impl Op { - /// Submit a request to connect. - pub(crate) fn connect(fd: &SharedFd, socket_addr: SockAddr) -> io::Result> { - use io_uring::{opcode, types}; - - Op::submit_with( - Connect { - fd: fd.clone(), - socket_addr, - }, - |connect| { - opcode::Connect::new( - types::Fd(connect.fd.raw_fd()), - connect.socket_addr.as_ptr(), - connect.socket_addr.len(), - ) - .build() - }, - ) - } -} diff --git a/src/common/tokio_uring/driver/fsync.rs b/src/common/tokio_uring/driver/fsync.rs deleted file mode 100644 index bfea42036..000000000 --- a/src/common/tokio_uring/driver/fsync.rs +++ /dev/null @@ -1,25 +0,0 @@ -use crate::driver::{Op, SharedFd}; - -use std::io; - -use io_uring::{opcode, types}; - -pub(crate) struct Fsync { - fd: SharedFd, -} - -impl Op { - pub(crate) fn fsync(fd: &SharedFd) -> io::Result> { - Op::submit_with(Fsync { fd: fd.clone() }, |fsync| { - opcode::Fsync::new(types::Fd(fsync.fd.raw_fd())).build() - }) - } - - pub(crate) fn datasync(fd: &SharedFd) -> io::Result> { - Op::submit_with(Fsync { fd: fd.clone() }, |fsync| { - opcode::Fsync::new(types::Fd(fsync.fd.raw_fd())) - .flags(types::FsyncFlags::DATASYNC) - .build() - }) - } -} diff --git a/src/common/tokio_uring/driver/mod.rs b/src/common/tokio_uring/driver/mod.rs deleted file mode 100644 index 6cf4be7e8..000000000 --- a/src/common/tokio_uring/driver/mod.rs +++ /dev/null @@ -1,194 +0,0 @@ -mod accept; - -mod close; -pub(crate) use close::Close; - -mod connect; - -mod fsync; - -mod op; -pub(crate) use op::Op; - -mod open; - -mod read; - -mod readv; - -mod recv_from; - -mod send_to; - -mod shared_fd; -pub(crate) use shared_fd::SharedFd; - -mod socket; -pub(crate) use socket::Socket; - -mod unlink_at; - -mod util; - -mod write; - -mod writev; - -use io_uring::{cqueue, IoUring}; -use scoped_tls::scoped_thread_local; -use slab::Slab; -use std::cell::RefCell; -use std::io; -use std::os::unix::io::{AsRawFd, RawFd}; -use std::rc::Rc; - -pub(crate) struct Driver { - inner: Handle, -} - -type Handle = Rc>; - -pub(crate) struct Inner { - /// In-flight operations - ops: Ops, - - /// IoUring bindings - pub(crate) uring: IoUring, -} - -// When dropping the driver, all in-flight operations must have completed. This -// type wraps the slab and ensures that, on drop, the slab is empty. -struct Ops(Slab); - -scoped_thread_local!(pub(crate) static CURRENT: Rc>); - -impl Driver { - pub(crate) fn new() -> io::Result { - let uring = IoUring::new(256)?; - - let inner = Rc::new(RefCell::new(Inner { - ops: Ops::new(), - uring, - })); - - Ok(Driver { inner }) - } - - /// Enter the driver context. This enables using uring types. - pub(crate) fn with(&self, f: impl FnOnce() -> R) -> R { - CURRENT.set(&self.inner, || f()) - } - - pub(crate) fn tick(&self) { - let mut inner = self.inner.borrow_mut(); - inner.tick(); - } - - fn wait(&self) -> io::Result { - let mut inner = self.inner.borrow_mut(); - let inner = &mut *inner; - - inner.uring.submit_and_wait(1) - } - - fn num_operations(&self) -> usize { - let inner = self.inner.borrow(); - inner.ops.0.len() - } -} - -impl Inner { - fn tick(&mut self) { - let mut cq = self.uring.completion(); - cq.sync(); - - for cqe in cq { - if cqe.user_data() == u64::MAX { - // Result of the cancellation action. There isn't anything we - // need to do here. We must wait for the CQE for the operation - // that was canceled. - continue; - } - - let index = cqe.user_data() as _; - - self.ops.complete(index, resultify(&cqe), cqe.flags()); - } - } - - pub(crate) fn submit(&mut self) -> io::Result<()> { - loop { - match self.uring.submit() { - Ok(_) => { - self.uring.submission().sync(); - return Ok(()); - } - Err(ref e) if e.raw_os_error() == Some(libc::EBUSY) => { - self.tick(); - } - Err(e) => { - return Err(e); - } - } - } - } -} - -impl AsRawFd for Driver { - fn as_raw_fd(&self) -> RawFd { - self.inner.borrow().uring.as_raw_fd() - } -} - -impl Drop for Driver { - fn drop(&mut self) { - while self.num_operations() > 0 { - // If waiting fails, ignore the error. The wait will be attempted - // again on the next loop. - let _ = self.wait().unwrap(); - self.tick(); - } - } -} - -impl Ops { - fn new() -> Ops { - Ops(Slab::with_capacity(64)) - } - - fn get_mut(&mut self, index: usize) -> Option<&mut op::Lifecycle> { - self.0.get_mut(index) - } - - // Insert a new operation - fn insert(&mut self) -> usize { - self.0.insert(op::Lifecycle::Submitted) - } - - // Remove an operation - fn remove(&mut self, index: usize) { - self.0.remove(index); - } - - fn complete(&mut self, index: usize, result: io::Result, flags: u32) { - if self.0[index].complete(result, flags) { - self.0.remove(index); - } - } -} - -impl Drop for Ops { - fn drop(&mut self) { - assert!(self.0.is_empty()); - } -} - -fn resultify(cqe: &cqueue::Entry) -> io::Result { - let res = cqe.result(); - - if res >= 0 { - Ok(res as u32) - } else { - Err(io::Error::from_raw_os_error(-res)) - } -} diff --git a/src/common/tokio_uring/driver/op.rs b/src/common/tokio_uring/driver/op.rs deleted file mode 100644 index d244080e8..000000000 --- a/src/common/tokio_uring/driver/op.rs +++ /dev/null @@ -1,331 +0,0 @@ -use std::cell::RefCell; -use std::future::Future; -use std::io; -use std::pin::Pin; -use std::rc::Rc; -use std::task::{Context, Poll, Waker}; - -use io_uring::squeue; - -use crate::driver; - -/// In-flight operation -pub(crate) struct Op { - // Driver running the operation - pub(super) driver: Rc>, - - // Operation index in the slab - pub(super) index: usize, - - // Per-operation data - data: Option, -} - -/// Operation completion. Returns stored state with the result of the operation. -#[derive(Debug)] -pub(crate) struct Completion { - pub(crate) data: T, - pub(crate) result: io::Result, - // the field is currently only read in tests - #[cfg_attr(not(test), allow(dead_code))] - pub(crate) flags: u32, -} - -pub(crate) enum Lifecycle { - /// The operation has been submitted to uring and is currently in-flight - Submitted, - - /// The submitter is waiting for the completion of the operation - Waiting(Waker), - - /// The submitter no longer has interest in the operation result. The state - /// must be passed to the driver and held until the operation completes. - Ignored(Box), - - /// The operation has completed. - Completed(io::Result, u32), -} - -impl Op { - /// Create a new operation - fn new(data: T, inner: &mut driver::Inner, inner_rc: &Rc>) -> Op { - Op { - driver: inner_rc.clone(), - index: inner.ops.insert(), - data: Some(data), - } - } - - /// Submit an operation to uring. - /// - /// `state` is stored during the operation tracking any state submitted to - /// the kernel. - pub(super) fn submit_with(data: T, f: F) -> io::Result> - where - F: FnOnce(&mut T) -> squeue::Entry, - { - driver::CURRENT.with(|inner_rc| { - let mut inner_ref = inner_rc.borrow_mut(); - let inner = &mut *inner_ref; - - // If the submission queue is full, flush it to the kernel - if inner.uring.submission().is_full() { - inner.submit()?; - } - - // Create the operation - let mut op = Op::new(data, inner, inner_rc); - - // Configure the SQE - let sqe = f(op.data.as_mut().unwrap()).user_data(op.index as _); - - { - let mut sq = inner.uring.submission(); - - // Push the new operation - if unsafe { sq.push(&sqe).is_err() } { - unimplemented!("when is this hit?"); - } - } - - Ok(op) - }) - } - - /// Try submitting an operation to uring - pub(super) fn try_submit_with(data: T, f: F) -> io::Result> - where - F: FnOnce(&mut T) -> squeue::Entry, - { - if driver::CURRENT.is_set() { - Op::submit_with(data, f) - } else { - Err(io::ErrorKind::Other.into()) - } - } -} - -impl Future for Op -where - T: Unpin + 'static, -{ - type Output = Completion; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - use std::mem; - - let me = &mut *self; - let mut inner = me.driver.borrow_mut(); - let lifecycle = inner.ops.get_mut(me.index).expect("invalid internal state"); - - match mem::replace(lifecycle, Lifecycle::Submitted) { - Lifecycle::Submitted => { - *lifecycle = Lifecycle::Waiting(cx.waker().clone()); - Poll::Pending - } - Lifecycle::Waiting(waker) if !waker.will_wake(cx.waker()) => { - *lifecycle = Lifecycle::Waiting(cx.waker().clone()); - Poll::Pending - } - Lifecycle::Waiting(waker) => { - *lifecycle = Lifecycle::Waiting(waker); - Poll::Pending - } - Lifecycle::Ignored(..) => unreachable!(), - Lifecycle::Completed(result, flags) => { - inner.ops.remove(me.index); - me.index = usize::MAX; - - Poll::Ready(Completion { - data: me.data.take().expect("unexpected operation state"), - result, - flags, - }) - } - } - } -} - -impl Drop for Op { - fn drop(&mut self) { - let mut inner = self.driver.borrow_mut(); - let lifecycle = match inner.ops.get_mut(self.index) { - Some(lifecycle) => lifecycle, - None => return, - }; - - match lifecycle { - Lifecycle::Submitted | Lifecycle::Waiting(_) => { - *lifecycle = Lifecycle::Ignored(Box::new(self.data.take())); - } - Lifecycle::Completed(..) => { - inner.ops.remove(self.index); - } - Lifecycle::Ignored(..) => unreachable!(), - } - } -} - -impl Lifecycle { - pub(super) fn complete(&mut self, result: io::Result, flags: u32) -> bool { - use std::mem; - - match mem::replace(self, Lifecycle::Submitted) { - Lifecycle::Submitted => { - *self = Lifecycle::Completed(result, flags); - false - } - Lifecycle::Waiting(waker) => { - *self = Lifecycle::Completed(result, flags); - waker.wake(); - false - } - Lifecycle::Ignored(..) => true, - Lifecycle::Completed(..) => unreachable!("invalid operation state"), - } - } -} - -#[cfg(test)] -mod test { - use std::rc::Rc; - - use tokio_test::{assert_pending, assert_ready, task}; - - use super::*; - - #[test] - fn op_stays_in_slab_on_drop() { - let (op, driver, data) = init(); - drop(op); - - assert_eq!(2, Rc::strong_count(&data)); - - assert_eq!(1, driver.num_operations()); - release(driver); - } - - #[test] - fn poll_op_once() { - let (op, driver, data) = init(); - let mut op = task::spawn(op); - assert_pending!(op.poll()); - assert_eq!(2, Rc::strong_count(&data)); - - complete(&op, Ok(1)); - assert_eq!(1, driver.num_operations()); - assert_eq!(2, Rc::strong_count(&data)); - - assert!(op.is_woken()); - let Completion { - result, - flags, - data: d, - } = assert_ready!(op.poll()); - assert_eq!(2, Rc::strong_count(&data)); - assert_eq!(1, result.unwrap()); - assert_eq!(0, flags); - - drop(d); - assert_eq!(1, Rc::strong_count(&data)); - - drop(op); - assert_eq!(0, driver.num_operations()); - - release(driver); - } - - #[test] - fn poll_op_twice() { - let (op, driver, ..) = init(); - let mut op = task::spawn(op); - assert_pending!(op.poll()); - assert_pending!(op.poll()); - - complete(&op, Ok(1)); - - assert!(op.is_woken()); - let Completion { result, flags, .. } = assert_ready!(op.poll()); - assert_eq!(1, result.unwrap()); - assert_eq!(0, flags); - - release(driver); - } - - #[test] - fn poll_change_task() { - let (op, driver, ..) = init(); - let mut op = task::spawn(op); - assert_pending!(op.poll()); - - let op = op.into_inner(); - let mut op = task::spawn(op); - assert_pending!(op.poll()); - - complete(&op, Ok(1)); - - assert!(op.is_woken()); - let Completion { result, flags, .. } = assert_ready!(op.poll()); - assert_eq!(1, result.unwrap()); - assert_eq!(0, flags); - - release(driver); - } - - #[test] - fn complete_before_poll() { - let (op, driver, data) = init(); - let mut op = task::spawn(op); - complete(&op, Ok(1)); - assert_eq!(1, driver.num_operations()); - assert_eq!(2, Rc::strong_count(&data)); - - let Completion { result, flags, .. } = assert_ready!(op.poll()); - assert_eq!(1, result.unwrap()); - assert_eq!(0, flags); - - drop(op); - assert_eq!(0, driver.num_operations()); - - release(driver); - } - - #[test] - fn complete_after_drop() { - let (op, driver, data) = init(); - let index = op.index; - drop(op); - - assert_eq!(2, Rc::strong_count(&data)); - - assert_eq!(1, driver.num_operations()); - driver.inner.borrow_mut().ops.complete(index, Ok(1), 0); - assert_eq!(1, Rc::strong_count(&data)); - assert_eq!(0, driver.num_operations()); - release(driver); - } - - fn init() -> (Op>, crate::driver::Driver, Rc<()>) { - use crate::driver::Driver; - - let driver = Driver::new().unwrap(); - let handle = driver.inner.clone(); - let data = Rc::new(()); - - let op = { - let mut inner = handle.borrow_mut(); - Op::new(data.clone(), &mut inner, &handle) - }; - - (op, driver, data) - } - - fn complete(op: &Op>, result: io::Result) { - op.driver.borrow_mut().ops.complete(op.index, result, 0); - } - - fn release(driver: crate::driver::Driver) { - // Clear ops, we aren't really doing any I/O - driver.inner.borrow_mut().ops.0.clear(); - } -} diff --git a/src/common/tokio_uring/driver/open.rs b/src/common/tokio_uring/driver/open.rs deleted file mode 100644 index 454a40fcc..000000000 --- a/src/common/tokio_uring/driver/open.rs +++ /dev/null @@ -1,34 +0,0 @@ -use crate::driver::{self, Op}; -use crate::fs::OpenOptions; - -use std::ffi::CString; -use std::io; -use std::path::Path; - -/// Open a file -#[allow(dead_code)] -pub(crate) struct Open { - pub(crate) path: CString, - pub(crate) flags: libc::c_int, -} - -impl Op { - /// Submit a request to open a file. - pub(crate) fn open(path: &Path, options: &OpenOptions) -> io::Result> { - use io_uring::{opcode, types}; - let path = driver::util::cstr(path)?; - let flags = libc::O_CLOEXEC | options.access_mode()? | options.creation_mode()?; - - Op::submit_with(Open { path, flags }, |open| { - // Get a reference to the memory. The string will be held by the - // operation state and will not be accessed again until the operation - // completes. - let p_ref = open.path.as_c_str().as_ptr(); - - opcode::OpenAt::new(types::Fd(libc::AT_FDCWD), p_ref) - .flags(flags) - .mode(options.mode) - .build() - }) - } -} diff --git a/src/common/tokio_uring/driver/pool.rs b/src/common/tokio_uring/driver/pool.rs deleted file mode 100644 index 5fe25cee5..000000000 --- a/src/common/tokio_uring/driver/pool.rs +++ /dev/null @@ -1,78 +0,0 @@ -use crate::driver; - -use io_uring::{opcode, IoUring}; -use std::io; -use std::mem::ManuallyDrop; - -/// Buffer pool shared with kernel -pub(crate) struct Pool { - mem: *mut u8, - num: usize, - size: usize, -} - -pub(crate) struct ProvidedBuf { - buf: ManuallyDrop>, - driver: driver::Handle, -} - -impl Pool { - pub(super) fn new(num: usize, size: usize) -> Pool { - let total = num * size; - let mut mem = ManuallyDrop::new(Vec::::with_capacity(total)); - - assert_eq!(mem.capacity(), total); - - Pool { - mem: mem.as_mut_ptr(), - num, - size, - } - } - - pub(super) fn provide_buffers(&self, uring: &mut IoUring) -> io::Result<()> { - let op = opcode::ProvideBuffers::new(self.mem, self.size as _, self.num as _, 0, 0) - .build() - .user_data(0); - - // Scoped to ensure `sq` drops before trying to submit - { - let mut sq = uring.submission(); - - if unsafe { sq.push(&op) }.is_err() { - unimplemented!("when is this hit?"); - } - } - - uring.submit_and_wait(1)?; - - let mut cq = uring.completion(); - for cqe in &mut cq { - assert_eq!(cqe.user_data(), 0); - } - - Ok(()) - } -} - -impl ProvidedBuf {} - -impl Drop for ProvidedBuf { - fn drop(&mut self) { - let mut driver = self.driver.borrow_mut(); - let pool = &driver.pool; - - let ptr = self.buf.as_mut_ptr(); - let bid = (ptr as usize - pool.mem as usize) / pool.size; - - let op = opcode::ProvideBuffers::new(ptr, pool.size as _, 1, 0, bid as _) - .build() - .user_data(u64::MAX); - - let mut sq = driver.uring.submission(); - - if unsafe { sq.push(&op) }.is_err() { - unimplemented!(); - } - } -} diff --git a/src/common/tokio_uring/driver/read.rs b/src/common/tokio_uring/driver/read.rs deleted file mode 100644 index 931260951..000000000 --- a/src/common/tokio_uring/driver/read.rs +++ /dev/null @@ -1,63 +0,0 @@ -use crate::buf::IoBufMut; -use crate::driver::{Op, SharedFd}; -use crate::BufResult; - -use std::io; -use std::task::{Context, Poll}; - -pub(crate) struct Read { - /// Holds a strong ref to the FD, preventing the file from being closed - /// while the operation is in-flight. - #[allow(dead_code)] - fd: SharedFd, - - /// Reference to the in-flight buffer. - pub(crate) buf: T, -} - -impl Op> { - pub(crate) fn read_at(fd: &SharedFd, buf: T, offset: u64) -> io::Result>> { - use io_uring::{opcode, types}; - - Op::submit_with( - Read { - fd: fd.clone(), - buf, - }, - |read| { - // Get raw buffer info - let ptr = read.buf.stable_mut_ptr(); - let len = read.buf.bytes_total(); - opcode::Read::new(types::Fd(fd.raw_fd()), ptr, len as _) - .offset(offset as _) - .build() - }, - ) - } - - pub(crate) async fn read(mut self) -> BufResult { - crate::future::poll_fn(move |cx| self.poll_read(cx)).await - } - - pub(crate) fn poll_read(&mut self, cx: &mut Context<'_>) -> Poll> { - use std::future::Future; - use std::pin::Pin; - - let complete = ready!(Pin::new(self).poll(cx)); - - // Convert the operation result to `usize` - let res = complete.result.map(|v| v as usize); - // Recover the buffer - let mut buf = complete.data.buf; - - // If the operation was successful, advance the initialized cursor. - if let Ok(n) = res { - // Safety: the kernel wrote `n` bytes to the buffer. - unsafe { - buf.set_init(n); - } - } - - Poll::Ready((res, buf)) - } -} diff --git a/src/common/tokio_uring/driver/readv.rs b/src/common/tokio_uring/driver/readv.rs deleted file mode 100644 index faf5d95b8..000000000 --- a/src/common/tokio_uring/driver/readv.rs +++ /dev/null @@ -1,91 +0,0 @@ -use crate::buf::IoBufMut; -use crate::driver::{Op, SharedFd}; -use crate::BufResult; - -use libc::iovec; -use std::io; -use std::task::{Context, Poll}; - -pub(crate) struct Readv { - /// Holds a strong ref to the FD, preventing the file from being closed - /// while the operation is in-flight. - #[allow(dead_code)] - fd: SharedFd, - - /// Reference to the in-flight buffer. - pub(crate) bufs: Vec, - /// Parameter for `io_uring::op::readv`, referring `bufs`. - iovs: Vec, -} - -impl Op> { - pub(crate) fn readv_at( - fd: &SharedFd, - mut bufs: Vec, - offset: u64, - ) -> io::Result>> { - use io_uring::{opcode, types}; - - // Build `iovec` objects referring the provided `bufs` for `io_uring::opcode::Readv`. - let iovs: Vec = bufs - .iter_mut() - .map(|b| iovec { - // Safety guaranteed by `IoBufMut`. - iov_base: unsafe { b.stable_mut_ptr().add(b.bytes_init()) as *mut libc::c_void }, - iov_len: b.bytes_total() - b.bytes_init(), - }) - .collect(); - - Op::submit_with( - Readv { - fd: fd.clone(), - bufs, - iovs, - }, - |read| { - opcode::Readv::new( - types::Fd(fd.raw_fd()), - read.iovs.as_ptr(), - read.iovs.len() as u32, - ) - .offset(offset as _) - .build() - }, - ) - } - - pub(crate) async fn readv(mut self) -> BufResult> { - crate::future::poll_fn(move |cx| self.poll_readv(cx)).await - } - - pub(crate) fn poll_readv(&mut self, cx: &mut Context<'_>) -> Poll>> { - use std::future::Future; - use std::pin::Pin; - - let complete = ready!(Pin::new(self).poll(cx)); - - // Convert the operation result to `usize` - let res = complete.result.map(|v| v as usize); - // Recover the buffers - let mut bufs = complete.data.bufs; - - // If the operation was successful, advance the initialized cursor. - if let Ok(n) = res { - let mut count = n; - for b in bufs.iter_mut() { - let sz = std::cmp::min(count, b.bytes_total() - b.bytes_init()); - let pos = b.bytes_init() + sz; - // Safety: the kernel returns bytes written, and we have ensured that `pos` is - // valid for current buffer. - unsafe { b.set_init(pos) }; - count = count - sz; - if count == 0 { - break; - } - } - assert_eq!(count, 0); - } - - Poll::Ready((res, bufs)) - } -} diff --git a/src/common/tokio_uring/driver/recv_from.rs b/src/common/tokio_uring/driver/recv_from.rs deleted file mode 100644 index b260c8d25..000000000 --- a/src/common/tokio_uring/driver/recv_from.rs +++ /dev/null @@ -1,89 +0,0 @@ -use crate::{ - buf::IoBufMut, - driver::{Op, SharedFd}, - BufResult, -}; -use socket2::SockAddr; -use std::{ - io::IoSliceMut, - task::{Context, Poll}, - {boxed::Box, io, net::SocketAddr}, -}; - -#[allow(dead_code)] -pub(crate) struct RecvFrom { - fd: SharedFd, - pub(crate) buf: T, - io_slices: Vec>, - pub(crate) socket_addr: Box, - pub(crate) msghdr: Box, -} - -impl Op> { - pub(crate) fn recv_from(fd: &SharedFd, mut buf: T) -> io::Result>> { - use io_uring::{opcode, types}; - - let mut io_slices = vec![IoSliceMut::new(unsafe { - std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_total()) - })]; - - let socket_addr = Box::new(unsafe { SockAddr::init(|_, _| Ok(()))?.1 }); - - let mut msghdr: Box = Box::new(unsafe { std::mem::zeroed() }); - msghdr.msg_iov = io_slices.as_mut_ptr().cast(); - msghdr.msg_iovlen = io_slices.len() as _; - msghdr.msg_name = socket_addr.as_ptr() as *mut libc::c_void; - msghdr.msg_namelen = socket_addr.len(); - - Op::submit_with( - RecvFrom { - fd: fd.clone(), - buf, - io_slices, - socket_addr, - msghdr, - }, - |recv_from| { - opcode::RecvMsg::new( - types::Fd(recv_from.fd.raw_fd()), - recv_from.msghdr.as_mut() as *mut _, - ) - .build() - }, - ) - } - - pub(crate) async fn recv(mut self) -> BufResult<(usize, SocketAddr), T> { - use crate::future::poll_fn; - - poll_fn(move |cx| self.poll_recv_from(cx)).await - } - - pub(crate) fn poll_recv_from( - &mut self, - cx: &mut Context<'_>, - ) -> Poll> { - use std::future::Future; - use std::pin::Pin; - - let complete = ready!(Pin::new(self).poll(cx)); - - // Recover the buffer - let mut buf = complete.data.buf; - - let result = match complete.result { - Ok(v) => { - let v = v as usize; - let socket_addr: Option = (*complete.data.socket_addr).as_socket(); - // If the operation was successful, advance the initialized cursor. - // Safety: the kernel wrote `v` bytes to the buffer. - unsafe { - buf.set_init(v); - } - Ok((v, socket_addr.unwrap())) - } - Err(e) => Err(e), - }; - Poll::Ready((result, buf)) - } -} diff --git a/src/common/tokio_uring/driver/send_to.rs b/src/common/tokio_uring/driver/send_to.rs deleted file mode 100644 index 14974eb89..000000000 --- a/src/common/tokio_uring/driver/send_to.rs +++ /dev/null @@ -1,71 +0,0 @@ -use crate::buf::IoBuf; -use crate::driver::{Op, SharedFd}; -use crate::BufResult; -use socket2::SockAddr; -use std::io::IoSlice; -use std::task::{Context, Poll}; -use std::{boxed::Box, io, net::SocketAddr}; - -pub(crate) struct SendTo { - #[allow(dead_code)] - fd: SharedFd, - pub(crate) buf: T, - #[allow(dead_code)] - io_slices: Vec>, - #[allow(dead_code)] - socket_addr: Box, - pub(crate) msghdr: Box, -} - -impl Op> { - pub(crate) fn send_to( - fd: &SharedFd, - buf: T, - socket_addr: SocketAddr, - ) -> io::Result>> { - use io_uring::{opcode, types}; - - let io_slices = vec![IoSlice::new(unsafe { - std::slice::from_raw_parts(buf.stable_ptr(), buf.bytes_init()) - })]; - - let socket_addr = Box::new(SockAddr::from(socket_addr)); - - let mut msghdr: Box = Box::new(unsafe { std::mem::zeroed() }); - msghdr.msg_iov = io_slices.as_ptr() as *mut _; - msghdr.msg_iovlen = io_slices.len() as _; - msghdr.msg_name = socket_addr.as_ptr() as *mut libc::c_void; - msghdr.msg_namelen = socket_addr.len(); - - Op::submit_with( - SendTo { - fd: fd.clone(), - buf, - io_slices, - socket_addr, - msghdr, - }, - |send_to| { - opcode::SendMsg::new( - types::Fd(send_to.fd.raw_fd()), - send_to.msghdr.as_ref() as *const _, - ) - .build() - }, - ) - } - - pub(crate) async fn send(mut self) -> BufResult { - use crate::future::poll_fn; - - poll_fn(move |cx| self.poll_send(cx)).await - } - - pub(crate) fn poll_send(&mut self, cx: &mut Context<'_>) -> Poll> { - use std::future::Future; - use std::pin::Pin; - - let complete = ready!(Pin::new(self).poll(cx)); - Poll::Ready((complete.result.map(|v| v as _), complete.data.buf)) - } -} diff --git a/src/common/tokio_uring/driver/shared_fd.rs b/src/common/tokio_uring/driver/shared_fd.rs deleted file mode 100644 index c8b25cfff..000000000 --- a/src/common/tokio_uring/driver/shared_fd.rs +++ /dev/null @@ -1,145 +0,0 @@ -use crate::driver::{Close, Op}; -use crate::future::poll_fn; - -use std::cell::RefCell; -use std::os::unix::io::{FromRawFd, RawFd}; -use std::rc::Rc; -use std::task::Waker; - -// Tracks in-flight operations on a file descriptor. Ensures all in-flight -// operations complete before submitting the close. -#[derive(Clone)] -pub(crate) struct SharedFd { - inner: Rc, -} - -struct Inner { - // Open file descriptor - fd: RawFd, - - // Waker to notify when the close operation completes. - state: RefCell, -} - -enum State { - /// Initial state - Init, - - /// Waiting for all in-flight operation to complete. - Waiting(Option), - - /// The FD is closing - Closing(Op), - - /// The FD is fully closed - Closed, -} - -impl SharedFd { - pub(crate) fn new(fd: RawFd) -> SharedFd { - SharedFd { - inner: Rc::new(Inner { - fd, - state: RefCell::new(State::Init), - }), - } - } - - /// Returns the RawFd - pub(crate) fn raw_fd(&self) -> RawFd { - self.inner.fd - } - - /// An FD cannot be closed until all in-flight operation have completed. - /// This prevents bugs where in-flight reads could operate on the incorrect - /// file descriptor. - /// - /// TO model this, if there are no in-flight operations, then - pub(crate) async fn close(mut self) { - // Get a mutable reference to Inner, indicating there are no - // in-flight operations on the FD. - if let Some(inner) = Rc::get_mut(&mut self.inner) { - // Submit the close operation - inner.submit_close_op(); - } - - self.inner.closed().await; - } -} - -impl Inner { - /// If there are no in-flight operations, submit the operation. - fn submit_close_op(&mut self) { - // Close the FD - let state = RefCell::get_mut(&mut self.state); - - // Submit a close operation - *state = match Op::close(self.fd) { - Ok(op) => State::Closing(op), - Err(_) => { - // Submitting the operation failed, we fall back on a - // synchronous `close`. This is safe as, at this point, we - // guarantee all in-flight operations have completed. The most - // common cause for an error is attempting to close the FD while - // off runtime. - // - // This is done by initializing a `File` with the FD and - // dropping it. - // - // TODO: Should we warn? - let _ = unsafe { std::fs::File::from_raw_fd(self.fd) }; - - State::Closed - } - }; - } - - /// Completes when the FD has been closed. - async fn closed(&self) { - use std::future::Future; - use std::pin::Pin; - use std::task::Poll; - - poll_fn(|cx| { - let mut state = self.state.borrow_mut(); - - match &mut *state { - State::Init => { - *state = State::Waiting(Some(cx.waker().clone())); - Poll::Pending - } - State::Waiting(Some(waker)) => { - if !waker.will_wake(cx.waker()) { - *waker = cx.waker().clone(); - } - - Poll::Pending - } - State::Waiting(None) => { - *state = State::Waiting(Some(cx.waker().clone())); - Poll::Pending - } - State::Closing(op) => { - // Nothing to do if the close opeation failed. - let _ = ready!(Pin::new(op).poll(cx)); - *state = State::Closed; - Poll::Ready(()) - } - State::Closed => Poll::Ready(()), - } - }) - .await; - } -} - -impl Drop for Inner { - fn drop(&mut self) { - // Submit the close operation, if needed - match RefCell::get_mut(&mut self.state) { - State::Init | State::Waiting(..) => { - self.submit_close_op(); - } - _ => {} - } - } -} diff --git a/src/common/tokio_uring/driver/socket.rs b/src/common/tokio_uring/driver/socket.rs deleted file mode 100644 index ed028a5e3..000000000 --- a/src/common/tokio_uring/driver/socket.rs +++ /dev/null @@ -1,146 +0,0 @@ -use crate::{ - buf::{IoBuf, IoBufMut}, - driver::{Op, SharedFd}, -}; -use std::{ - io, - net::SocketAddr, - os::unix::io::{AsRawFd, IntoRawFd, RawFd}, - path::Path, -}; - -#[derive(Clone)] -pub(crate) struct Socket { - /// Open file descriptor - fd: SharedFd, -} - -pub(crate) fn get_domain(socket_addr: SocketAddr) -> libc::c_int { - match socket_addr { - SocketAddr::V4(_) => libc::AF_INET, - SocketAddr::V6(_) => libc::AF_INET6, - } -} - -impl Socket { - pub(crate) fn new(socket_addr: SocketAddr, socket_type: libc::c_int) -> io::Result { - let socket_type = socket_type | libc::SOCK_CLOEXEC; - let domain = get_domain(socket_addr); - let fd = socket2::Socket::new(domain.into(), socket_type.into(), None)?.into_raw_fd(); - let fd = SharedFd::new(fd); - Ok(Socket { fd }) - } - - pub(crate) fn new_unix(socket_type: libc::c_int) -> io::Result { - let socket_type = socket_type | libc::SOCK_CLOEXEC; - let domain = libc::AF_UNIX; - let fd = socket2::Socket::new(domain.into(), socket_type.into(), None)?.into_raw_fd(); - let fd = SharedFd::new(fd); - Ok(Socket { fd }) - } - - pub(crate) async fn write(&self, buf: T) -> crate::BufResult { - let op = Op::write_at(&self.fd, buf, 0).unwrap(); - op.write().await - } - - pub(crate) async fn send_to( - &self, - buf: T, - socket_addr: SocketAddr, - ) -> crate::BufResult { - let op = Op::send_to(&self.fd, buf, socket_addr).unwrap(); - op.send().await - } - - pub(crate) async fn read(&self, buf: T) -> crate::BufResult { - let op = Op::read_at(&self.fd, buf, 0).unwrap(); - op.read().await - } - - pub(crate) async fn recv_from( - &self, - buf: T, - ) -> crate::BufResult<(usize, SocketAddr), T> { - let op = Op::recv_from(&self.fd, buf).unwrap(); - op.recv().await - } - - pub(crate) async fn accept(&self) -> io::Result<(Socket, Option)> { - let op = Op::accept(&self.fd)?; - let completion = op.await; - let fd = completion.result?; - let fd = SharedFd::new(fd as i32); - let data = completion.data; - let socket = Socket { fd }; - let (_, addr) = unsafe { - socket2::SockAddr::init(move |addr_storage, len| { - *addr_storage = data.socketaddr.0.to_owned(); - *len = data.socketaddr.1; - Ok(()) - })? - }; - Ok((socket, addr.as_socket())) - } - - pub(crate) async fn connect(&self, socket_addr: socket2::SockAddr) -> io::Result<()> { - let op = Op::connect(&self.fd, socket_addr)?; - let completion = op.await; - completion.result?; - Ok(()) - } - - pub(crate) fn bind(socket_addr: SocketAddr, socket_type: libc::c_int) -> io::Result { - Self::bind_internal( - socket_addr.into(), - get_domain(socket_addr).into(), - socket_type.into(), - ) - } - - pub(crate) fn bind_unix>( - path: P, - socket_type: libc::c_int, - ) -> io::Result { - let addr = socket2::SockAddr::unix(path.as_ref())?; - Self::bind_internal(addr, libc::AF_UNIX.into(), socket_type.into()) - } - - pub(crate) fn from_std(socket: std::net::UdpSocket) -> Socket { - let fd = SharedFd::new(socket.into_raw_fd()); - Self { fd } - } - - fn bind_internal( - socket_addr: socket2::SockAddr, - domain: socket2::Domain, - socket_type: socket2::Type, - ) -> io::Result { - let sys_listener = socket2::Socket::new(domain, socket_type, None)?; - let addr = socket2::SockAddr::from(socket_addr); - - sys_listener.set_reuse_port(true)?; - sys_listener.set_reuse_address(true)?; - - // TODO: config for buffer sizes - // sys_listener.set_send_buffer_size(send_buf_size)?; - // sys_listener.set_recv_buffer_size(recv_buf_size)?; - - sys_listener.bind(&addr)?; - - let fd = SharedFd::new(sys_listener.into_raw_fd()); - - Ok(Self { fd }) - } - - pub(crate) fn listen(&self, backlog: libc::c_int) -> io::Result<()> { - syscall!(listen(self.as_raw_fd(), backlog))?; - Ok(()) - } -} - -impl AsRawFd for Socket { - fn as_raw_fd(&self) -> RawFd { - self.fd.raw_fd() - } -} diff --git a/src/common/tokio_uring/driver/unlink_at.rs b/src/common/tokio_uring/driver/unlink_at.rs deleted file mode 100644 index d33eba857..000000000 --- a/src/common/tokio_uring/driver/unlink_at.rs +++ /dev/null @@ -1,39 +0,0 @@ -use crate::driver::{self, Op}; - -use std::ffi::CString; -use std::io; -use std::path::Path; - -/// Unlink a path relative to the current working directory of the caller's process. -pub(crate) struct Unlink { - pub(crate) path: CString, -} - -impl Op { - /// Submit a request to unlink a directory with provided flags. - pub(crate) fn unlink_dir(path: &Path) -> io::Result> { - Self::unlink(path, libc::AT_REMOVEDIR) - } - - /// Submit a request to unlink a file with provided flags. - pub(crate) fn unlink_file(path: &Path) -> io::Result> { - Self::unlink(path, 0) - } - - /// Submit a request to unlink a specifed path with provided flags. - pub(crate) fn unlink(path: &Path, flags: i32) -> io::Result> { - use io_uring::{opcode, types}; - - let path = driver::util::cstr(path)?; - - Op::submit_with(Unlink { path }, |unlink| { - // Get a reference to the memory. The string will be held by the - // operation state and will not be accessed again until the operation - // completes. - let p_ref = unlink.path.as_c_str().as_ptr(); - opcode::UnlinkAt::new(types::Fd(libc::AT_FDCWD), p_ref) - .flags(flags) - .build() - }) - } -} diff --git a/src/common/tokio_uring/driver/util.rs b/src/common/tokio_uring/driver/util.rs deleted file mode 100644 index d15280a35..000000000 --- a/src/common/tokio_uring/driver/util.rs +++ /dev/null @@ -1,8 +0,0 @@ -use std::ffi::CString; -use std::io; -use std::path::Path; - -pub(super) fn cstr(p: &Path) -> io::Result { - use std::os::unix::ffi::OsStrExt; - Ok(CString::new(p.as_os_str().as_bytes())?) -} diff --git a/src/common/tokio_uring/driver/write.rs b/src/common/tokio_uring/driver/write.rs deleted file mode 100644 index 9a941ee0b..000000000 --- a/src/common/tokio_uring/driver/write.rs +++ /dev/null @@ -1,54 +0,0 @@ -use crate::{ - buf::IoBuf, - driver::{Op, SharedFd}, - BufResult, -}; -use std::{ - io, - task::{Context, Poll}, -}; - -pub(crate) struct Write { - /// Holds a strong ref to the FD, preventing the file from being closed - /// while the operation is in-flight. - #[allow(dead_code)] - fd: SharedFd, - - pub(crate) buf: T, -} - -impl Op> { - pub(crate) fn write_at(fd: &SharedFd, buf: T, offset: u64) -> io::Result>> { - use io_uring::{opcode, types}; - - Op::submit_with( - Write { - fd: fd.clone(), - buf, - }, - |write| { - // Get raw buffer info - let ptr = write.buf.stable_ptr(); - let len = write.buf.bytes_init(); - - opcode::Write::new(types::Fd(fd.raw_fd()), ptr, len as _) - .offset(offset as _) - .build() - }, - ) - } - - pub(crate) async fn write(mut self) -> BufResult { - use crate::future::poll_fn; - - poll_fn(move |cx| self.poll_write(cx)).await - } - - pub(crate) fn poll_write(&mut self, cx: &mut Context<'_>) -> Poll> { - use std::future::Future; - use std::pin::Pin; - - let complete = ready!(Pin::new(self).poll(cx)); - Poll::Ready((complete.result.map(|v| v as _), complete.data.buf)) - } -} diff --git a/src/common/tokio_uring/driver/writev.rs b/src/common/tokio_uring/driver/writev.rs deleted file mode 100644 index 7ed8b9c51..000000000 --- a/src/common/tokio_uring/driver/writev.rs +++ /dev/null @@ -1,72 +0,0 @@ -use crate::{ - buf::IoBuf, - driver::{Op, SharedFd}, - BufResult, -}; -use libc::iovec; -use std::{ - io, - task::{Context, Poll}, -}; - -pub(crate) struct Writev { - /// Holds a strong ref to the FD, preventing the file from being closed - /// while the operation is in-flight. - #[allow(dead_code)] - fd: SharedFd, - - pub(crate) bufs: Vec, - - /// Parameter for `io_uring::op::readv`, referring `bufs`. - iovs: Vec, -} - -impl Op> { - pub(crate) fn writev_at( - fd: &SharedFd, - mut bufs: Vec, - offset: u64, - ) -> io::Result>> { - use io_uring::{opcode, types}; - - // Build `iovec` objects referring the provided `bufs` for `io_uring::opcode::Readv`. - let iovs: Vec = bufs - .iter_mut() - .map(|b| iovec { - iov_base: b.stable_ptr() as *mut libc::c_void, - iov_len: b.bytes_init(), - }) - .collect(); - - Op::submit_with( - Writev { - fd: fd.clone(), - bufs, - iovs, - }, - |write| { - opcode::Writev::new( - types::Fd(fd.raw_fd()), - write.iovs.as_ptr(), - write.iovs.len() as u32, - ) - .offset(offset as _) - .build() - }, - ) - } - - pub(crate) async fn writev(mut self) -> BufResult> { - use crate::future::poll_fn; - - poll_fn(move |cx| self.poll_writev(cx)).await - } - - pub(crate) fn poll_writev(&mut self, cx: &mut Context<'_>) -> Poll>> { - use std::future::Future; - use std::pin::Pin; - - let complete = ready!(Pin::new(self).poll(cx)); - Poll::Ready((complete.result.map(|v| v as _), complete.data.bufs)) - } -} diff --git a/src/common/tokio_uring/fs/directory.rs b/src/common/tokio_uring/fs/directory.rs deleted file mode 100644 index 6fe76bdf7..000000000 --- a/src/common/tokio_uring/fs/directory.rs +++ /dev/null @@ -1,27 +0,0 @@ -use crate::driver::Op; - -use std::io; -use std::path::Path; - -/// Removes an empty directory. -/// -/// # Examples -/// -/// ```no_run -/// use tokio_uring::fs::remove_dir; -/// -/// fn main() -> Result<(), Box> { -/// tokio_uring::start(async { -/// remove_dir("/some/dir").await?; -/// Ok::<(), std::io::Error>(()) -/// })?; -/// Ok(()) -/// } -/// ``` -pub async fn remove_dir>(path: P) -> io::Result<()> { - let op = Op::unlink_dir(path.as_ref())?; - let completion = op.await; - completion.result?; - - Ok(()) -} diff --git a/src/common/tokio_uring/fs/file.rs b/src/common/tokio_uring/fs/file.rs deleted file mode 100644 index 21139a686..000000000 --- a/src/common/tokio_uring/fs/file.rs +++ /dev/null @@ -1,481 +0,0 @@ -use crate::buf::{IoBuf, IoBufMut}; -use crate::driver::{Op, SharedFd}; -use crate::fs::OpenOptions; - -use std::fmt; -use std::io; -use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; -use std::path::Path; - -/// A reference to an open file on the filesystem. -/// -/// An instance of a `File` can be read and/or written depending on what options -/// it was opened with. The `File` type provides **positional** read and write -/// operations. The file does not maintain an internal cursor. The caller is -/// required to specify an offset when issuing an operation. -/// -/// While files are automatically closed when they go out of scope, the -/// operation happens asynchronously in the background. It is recommended to -/// call the `close()` function in order to guarantee that the file successfully -/// closed before exiting the scope. Closing a file does not guarantee writes -/// have persisted to disk. Use [`sync_all`] to ensure all writes have reached -/// the filesystem. -/// -/// [`sync_all`]: File::sync_all -/// -/// # Examples -/// -/// Creates a new file and write data to it: -/// -/// ```no_run -/// use tokio_uring::fs::File; -/// -/// fn main() -> Result<(), Box> { -/// tokio_uring::start(async { -/// // Open a file -/// let file = File::create("hello.txt").await?; -/// -/// // Write some data -/// let (res, buf) = file.write_at(&b"hello world"[..], 0).await; -/// let n = res?; -/// -/// println!("wrote {} bytes", n); -/// -/// // Sync data to the file system. -/// file.sync_all().await?; -/// -/// // Close the file -/// file.close().await?; -/// -/// Ok(()) -/// }) -/// } -/// ``` -pub struct File { - /// Open file descriptor - fd: SharedFd, -} - -impl File { - /// Attempts to open a file in read-only mode. - /// - /// See the [`OpenOptions::open`] method for more details. - /// - /// # Errors - /// - /// This function will return an error if `path` does not already exist. - /// Other errors may also be returned according to [`OpenOptions::open`]. - /// - /// # Examples - /// - /// ```no_run - /// use tokio_uring::fs::File; - /// - /// fn main() -> Result<(), Box> { - /// tokio_uring::start(async { - /// let f = File::open("foo.txt").await?; - /// - /// // Close the file - /// f.close().await?; - /// Ok(()) - /// }) - /// } - /// ``` - pub async fn open(path: impl AsRef) -> io::Result { - OpenOptions::new().read(true).open(path).await - } - - /// Opens a file in write-only mode. - /// - /// This function will create a file if it does not exist, - /// and will truncate it if it does. - /// - /// See the [`OpenOptions::open`] function for more details. - /// - /// # Examples - /// - /// ```no_run - /// use tokio_uring::fs::File; - /// - /// fn main() -> Result<(), Box> { - /// tokio_uring::start(async { - /// let f = File::create("foo.txt").await?; - /// - /// // Close the file - /// f.close().await?; - /// Ok(()) - /// }) - /// } - /// ``` - pub async fn create(path: impl AsRef) -> io::Result { - OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(path) - .await - } - - pub(crate) fn from_shared_fd(fd: SharedFd) -> File { - File { fd } - } - - /// Read some bytes at the specified offset from the file into the specified - /// buffer, returning how many bytes were read. - /// - /// # Return - /// - /// The method returns the operation result and the same buffer value passed - /// as an argument. - /// - /// If the method returns [`Ok(n)`], then the read was successful. A nonzero - /// `n` value indicates that the buffer has been filled with `n` bytes of - /// data from the file. If `n` is `0`, then one of the following happened: - /// - /// 1. The specified offset is the end of the file. - /// 2. The buffer specified was 0 bytes in length. - /// - /// It is not an error if the returned value `n` is smaller than the buffer - /// size, even when the file contains enough data to fill the buffer. - /// - /// # Errors - /// - /// If this function encounters any form of I/O or other error, an error - /// variant will be returned. The buffer is returned on error. - /// - /// # Examples - /// - /// ```no_run - /// use tokio_uring::fs::File; - /// - /// fn main() -> Result<(), Box> { - /// tokio_uring::start(async { - /// let f = File::open("foo.txt").await?; - /// let buffer = vec![0; 10]; - /// - /// // Read up to 10 bytes - /// let (res, buffer) = f.read_at(buffer, 0).await; - /// let n = res?; - /// - /// println!("The bytes: {:?}", &buffer[..n]); - /// - /// // Close the file - /// f.close().await?; - /// Ok(()) - /// }) - /// } - /// ``` - pub async fn read_at(&self, buf: T, pos: u64) -> crate::BufResult { - // Submit the read operation - let op = Op::read_at(&self.fd, buf, pos).unwrap(); - op.read().await - } - - /// Read some bytes at the specified offset from the file into the specified - /// array of buffers, returning how many bytes were read. - /// - /// # Return - /// - /// The method returns the operation result and the same array of buffers - /// passed as an argument. - /// - /// If the method returns [`Ok(n)`], then the read was successful. A nonzero - /// `n` value indicates that the buffers have been filled with `n` bytes of - /// data from the file. If `n` is `0`, then one of the following happened: - /// - /// 1. The specified offset is the end of the file. - /// 2. The buffers specified were 0 bytes in length. - /// - /// It is not an error if the returned value `n` is smaller than the buffer - /// size, even when the file contains enough data to fill the buffer. - /// - /// # Errors - /// - /// If this function encounters any form of I/O or other error, an error - /// variant will be returned. The buffer is returned on error. - /// - /// # Examples - /// - /// ```ignore - /// use tokio_uring::fs::File; - /// - /// fn main() -> Result<(), Box> { - /// tokio_uring::start(async { - /// let f = File::open("foo.txt").await?; - /// let buffers = vec![Vec::::with_capacity(10), Vec::::with_capacity(10)]; - /// - /// // Read up to 20 bytes - /// let (res, buffer) = f.readv_at(buffers, 0).await; - /// let n = res?; - /// - /// println!("Read {} bytes", n); - /// - /// // Close the file - /// f.close().await?; - /// Ok(()) - /// }) - /// } - /// ``` - pub async fn readv_at( - &self, - bufs: Vec, - pos: u64, - ) -> crate::BufResult> { - // Submit the read operation - let op = Op::readv_at(&self.fd, bufs, pos).unwrap(); - op.readv().await - } - - /// Write data from buffers into this file at the specified offset, - /// returning how many bytes were written. - /// - /// This function will attempt to write the entire contents of `bufs`, but - /// the entire write may not succeed, or the write may also generate an - /// error. The bytes will be written starting at the specified offset. - /// - /// # Return - /// - /// The method returns the operation result and the same array of buffers passed - /// in as an argument. A return value of `0` typically means that the - /// underlying file is no longer able to accept bytes and will likely not be - /// able to in the future as well, or that the buffer provided is empty. - /// - /// # Errors - /// - /// Each call to `write` may generate an I/O error indicating that the - /// operation could not be completed. If an error is returned then no bytes - /// in the buffer were written to this writer. - /// - /// It is **not** considered an error if the entire buffer could not be - /// written to this writer. - /// - /// # Examples - /// - /// ```ignore - /// use tokio_uring::fs::File; - /// - /// fn main() -> Result<(), Box> { - /// tokio_uring::start(async { - /// let file = File::create("foo.txt").await?; - /// - /// // Writes some prefix of the byte string, not necessarily all of it. - /// let bufs = vec!["some".to_owned().into_bytes(), " bytes".to_owned().into_bytes()]; - /// let (res, _) = file.write_at(bufs, 0).await; - /// let n = res?; - /// - /// println!("wrote {} bytes", n); - /// - /// // Close the file - /// file.close().await?; - /// Ok(()) - /// }) - /// } - /// ``` - /// - /// [`Ok(n)`]: Ok - pub async fn writev_at( - &self, - buf: Vec, - pos: u64, - ) -> crate::BufResult> { - let op = Op::writev_at(&self.fd, buf, pos).unwrap(); - op.writev().await - } - - /// Write a buffer into this file at the specified offset, returning how - /// many bytes were written. - /// - /// This function will attempt to write the entire contents of `buf`, but - /// the entire write may not succeed, or the write may also generate an - /// error. The bytes will be written starting at the specified offset. - /// - /// # Return - /// - /// The method returns the operation result and the same buffer value passed - /// in as an argument. A return value of `0` typically means that the - /// underlying file is no longer able to accept bytes and will likely not be - /// able to in the future as well, or that the buffer provided is empty. - /// - /// # Errors - /// - /// Each call to `write` may generate an I/O error indicating that the - /// operation could not be completed. If an error is returned then no bytes - /// in the buffer were written to this writer. - /// - /// It is **not** considered an error if the entire buffer could not be - /// written to this writer. - /// - /// # Examples - /// - /// ```no_run - /// use tokio_uring::fs::File; - /// - /// fn main() -> Result<(), Box> { - /// tokio_uring::start(async { - /// let file = File::create("foo.txt").await?; - /// - /// // Writes some prefix of the byte string, not necessarily all of it. - /// let (res, _) = file.write_at(&b"some bytes"[..], 0).await; - /// let n = res?; - /// - /// println!("wrote {} bytes", n); - /// - /// // Close the file - /// file.close().await?; - /// Ok(()) - /// }) - /// } - /// ``` - /// - /// [`Ok(n)`]: Ok - pub async fn write_at(&self, buf: T, pos: u64) -> crate::BufResult { - let op = Op::write_at(&self.fd, buf, pos).unwrap(); - op.write().await - } - - /// Attempts to sync all OS-internal metadata to disk. - /// - /// This function will attempt to ensure that all in-memory data reaches the - /// filesystem before completing. - /// - /// This can be used to handle errors that would otherwise only be caught - /// when the `File` is closed. Dropping a file will ignore errors in - /// synchronizing this in-memory data. - /// - /// # Examples - /// - /// ```no_run - /// use tokio_uring::fs::File; - /// - /// fn main() -> Result<(), Box> { - /// tokio_uring::start(async { - /// let f = File::create("foo.txt").await?; - /// let (res, buf) = f.write_at(&b"Hello, world!"[..], 0).await; - /// let n = res?; - /// - /// f.sync_all().await?; - /// - /// // Close the file - /// f.close().await?; - /// Ok(()) - /// }) - /// } - /// ``` - pub async fn sync_all(&self) -> io::Result<()> { - let op = Op::fsync(&self.fd).unwrap(); - let completion = op.await; - - completion.result?; - Ok(()) - } - - /// Attempts to sync file data to disk. - /// - /// This method is similar to [`sync_all`], except that it may not - /// synchronize file metadata to the filesystem. - /// - /// This is intended for use cases that must synchronize content, but don't - /// need the metadata on disk. The goal of this method is to reduce disk - /// operations. - /// - /// Note that some platforms may simply implement this in terms of - /// [`sync_all`]. - /// - /// [`sync_all`]: File::sync_all - /// - /// # Examples - /// - /// ```no_run - /// use tokio_uring::fs::File; - /// - /// fn main() -> Result<(), Box> { - /// tokio_uring::start(async { - /// let f = File::create("foo.txt").await?; - /// let (res, buf) = f.write_at(&b"Hello, world!"[..], 0).await; - /// let n = res?; - /// - /// f.sync_data().await?; - /// - /// // Close the file - /// f.close().await?; - /// Ok(()) - /// }) - /// } - /// ``` - pub async fn sync_data(&self) -> io::Result<()> { - let op = Op::datasync(&self.fd).unwrap(); - let completion = op.await; - - completion.result?; - Ok(()) - } - - /// Closes the file. - /// - /// The method completes once the close operation has completed, - /// guaranteeing that resources associated with the file have been released. - /// - /// If `close` is not called before dropping the file, the file is closed in - /// the background, but there is no guarantee as to **when** the close - /// operation will complete. - /// - /// # Examples - /// - /// ```no_run - /// use tokio_uring::fs::File; - /// - /// fn main() -> Result<(), Box> { - /// tokio_uring::start(async { - /// // Open the file - /// let f = File::open("foo.txt").await?; - /// // Close the file - /// f.close().await?; - /// - /// Ok(()) - /// }) - /// } - /// ``` - pub async fn close(self) -> io::Result<()> { - self.fd.close().await; - Ok(()) - } -} - -impl FromRawFd for File { - unsafe fn from_raw_fd(fd: RawFd) -> Self { - File::from_shared_fd(SharedFd::new(fd)) - } -} - -impl AsRawFd for File { - fn as_raw_fd(&self) -> RawFd { - self.fd.raw_fd() - } -} - -impl fmt::Debug for File { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("File") - .field("fd", &self.fd.raw_fd()) - .finish() - } -} - -/// Removes a File -/// -/// # Examples -/// -/// ```no_run -/// use tokio_uring::fs::remove_file; -/// -/// fn main() -> Result<(), Box> { -/// tokio_uring::start(async { -/// remove_file("/some/file.txt").await?; -/// Ok::<(), std::io::Error>(()) -/// })?; -/// Ok(()) -/// } -/// ``` -pub async fn remove_file>(path: P) -> io::Result<()> { - Op::unlink_file(path.as_ref())?.await.result.map(|_| ()) -} diff --git a/src/common/tokio_uring/fs/mod.rs b/src/common/tokio_uring/fs/mod.rs deleted file mode 100644 index 2e4777792..000000000 --- a/src/common/tokio_uring/fs/mod.rs +++ /dev/null @@ -1,11 +0,0 @@ -//! Filesystem manipulation operations. - -mod directory; -pub use directory::remove_dir; - -mod file; -pub use file::remove_file; -pub use file::File; - -mod open_options; -pub use open_options::OpenOptions; diff --git a/src/common/tokio_uring/fs/open_options.rs b/src/common/tokio_uring/fs/open_options.rs deleted file mode 100644 index 1b6846dc3..000000000 --- a/src/common/tokio_uring/fs/open_options.rs +++ /dev/null @@ -1,376 +0,0 @@ -use crate::driver::{Op, SharedFd}; -use crate::fs::File; - -use std::io; -use std::path::Path; - -/// Options and flags which can be used to configure how a file is opened. -/// -/// This builder exposes the ability to configure how a [`File`] is opened and -/// what operations are permitted on the open file. The [`File::open`] and -/// [`File::create`] methods are aliases for commonly used options using this -/// builder. -/// -/// Generally speaking, when using `OpenOptions`, you'll first call -/// [`OpenOptions::new`], then chain calls to methods to set each option, then -/// call [`OpenOptions::open`], passing the path of the file you're trying to -/// open. This will give you a [`io::Result`] with a [`File`] inside that you -/// can further operate on. -/// -/// # Examples -/// -/// Opening a file to read: -/// -/// ```no_run -/// use tokio_uring::fs::OpenOptions; -/// -/// fn main() -> Result<(), Box> { -/// tokio_uring::start(async { -/// let file = OpenOptions::new() -/// .read(true) -/// .open("foo.txt") -/// .await?; -/// Ok(()) -/// }) -/// } -/// ``` -/// -/// Opening a file for both reading and writing, as well as creating it if it -/// doesn't exist: -/// -/// ```no_run -/// use tokio_uring::fs::OpenOptions; -/// -/// fn main() -> Result<(), Box> { -/// tokio_uring::start(async { -/// let file = OpenOptions::new() -/// .read(true) -/// .write(true) -/// .create(true) -/// .open("foo.txt") -/// .await?; -/// Ok(()) -/// }) -/// } -/// ``` -#[derive(Debug, Clone)] -pub struct OpenOptions { - read: bool, - write: bool, - append: bool, - truncate: bool, - create: bool, - create_new: bool, - pub(crate) mode: libc::mode_t, -} - -impl OpenOptions { - /// Creates a blank new set of options ready for configuration. - /// - /// All options are initially set to `false`. - /// - /// # Examples - /// - /// ```no_run - /// use tokio_uring::fs::OpenOptions; - /// - /// fn main() -> Result<(), Box> { - /// tokio_uring::start(async { - /// let file = OpenOptions::new() - /// .read(true) - /// .open("foo.txt") - /// .await?; - /// Ok(()) - /// }) - /// } - /// ``` - pub fn new() -> OpenOptions { - OpenOptions { - // generic - read: false, - write: false, - append: false, - truncate: false, - create: false, - create_new: false, - mode: 0o666, - } - } - - /// Sets the option for read access. - /// - /// This option, when true, will indicate that the file should be - /// `read`-able if opened. - /// - /// # Examples - /// - /// ```no_run - /// use tokio_uring::fs::OpenOptions; - /// - /// fn main() -> Result<(), Box> { - /// tokio_uring::start(async { - /// let file = OpenOptions::new() - /// .read(true) - /// .open("foo.txt") - /// .await?; - /// Ok(()) - /// }) - /// } - /// ``` - pub fn read(&mut self, read: bool) -> &mut OpenOptions { - self.read = read; - self - } - - /// Sets the option for write access. - /// - /// This option, when true, will indicate that the file should be - /// `write`-able if opened. - /// - /// If the file already exists, any write calls on it will overwrite its - /// contents, without truncating it. - /// - /// # Examples - /// - /// ```no_run - /// use tokio_uring::fs::OpenOptions; - /// - /// fn main() -> Result<(), Box> { - /// tokio_uring::start(async { - /// let file = OpenOptions::new() - /// .write(true) - /// .open("foo.txt") - /// .await?; - /// Ok(()) - /// }) - /// } - /// ``` - pub fn write(&mut self, write: bool) -> &mut OpenOptions { - self.write = write; - self - } - - /// Sets the option for the append mode. - /// - /// This option, when true, means that writes will append to a file instead - /// of overwriting previous contents. Note that setting - /// `.write(true).append(true)` has the same effect as setting only - /// `.append(true)`. - /// - /// For most filesystems, the operating system guarantees that all writes - /// are atomic: no writes get mangled because another process writes at the - /// same time. - /// - /// ## Note - /// - /// This function doesn't create the file if it doesn't exist. Use the - /// [`OpenOptions::create`] method to do so. - /// - /// # Examples - /// - /// ```no_run - /// use tokio_uring::fs::OpenOptions; - /// - /// fn main() -> Result<(), Box> { - /// tokio_uring::start(async { - /// let file = OpenOptions::new() - /// .append(true) - /// .open("foo.txt") - /// .await?; - /// Ok(()) - /// }) - /// } - /// ``` - pub fn append(&mut self, append: bool) -> &mut OpenOptions { - self.append = append; - self - } - - /// Sets the option for truncating a previous file. - /// - /// If a file is successfully opened with this option set it will truncate - /// the file to 0 length if it already exists. - /// - /// The file must be opened with write access for truncate to work. - /// - /// # Examples - /// - /// ```no_run - /// use tokio_uring::fs::OpenOptions; - /// - /// fn main() -> Result<(), Box> { - /// tokio_uring::start(async { - /// let file = OpenOptions::new() - /// .write(true) - /// .truncate(true) - /// .open("foo.txt") - /// .await?; - /// Ok(()) - /// }) - /// } - /// ``` - pub fn truncate(&mut self, truncate: bool) -> &mut OpenOptions { - self.truncate = truncate; - self - } - - /// Sets the option to create a new file, or open it if it already exists. - /// - /// In order for the file to be created, [`OpenOptions::write`] or - /// [`OpenOptions::append`] access must be used. - /// - /// # Examples - /// - /// ```no_run - /// use tokio_uring::fs::OpenOptions; - /// - /// fn main() -> Result<(), Box> { - /// tokio_uring::start(async { - /// let file = OpenOptions::new() - /// .write(true) - /// .create(true) - /// .open("foo.txt") - /// .await?; - /// Ok(()) - /// }) - /// } - /// ``` - pub fn create(&mut self, create: bool) -> &mut OpenOptions { - self.create = create; - self - } - - /// Sets the option to create a new file, failing if it already exists. - /// - /// No file is allowed to exist at the target location, also no (dangling) symlink. In this - /// way, if the call succeeds, the file returned is guaranteed to be new. - /// - /// This option is useful because it is atomic. Otherwise between checking - /// whether a file exists and creating a new one, the file may have been - /// created by another process (a TOCTOU race condition / attack). - /// - /// If `.create_new(true)` is set, [`.create()`] and [`.truncate()`] are - /// ignored. - /// - /// The file must be opened with write or append access in order to create - /// a new file. - /// - /// [`.create()`]: OpenOptions::create - /// [`.truncate()`]: OpenOptions::truncate - /// - /// # Examples - /// - /// ```no_run - /// use tokio_uring::fs::OpenOptions; - /// - /// fn main() -> Result<(), Box> { - /// tokio_uring::start(async { - /// let file = OpenOptions::new() - /// .write(true) - /// .create_new(true) - /// .open("foo.txt") - /// .await?; - /// Ok(()) - /// }) - /// } - /// ``` - pub fn create_new(&mut self, create_new: bool) -> &mut OpenOptions { - self.create_new = create_new; - self - } - - /// Opens a file at `path` with the options specified by `self`. - /// - /// # Errors - /// - /// This function will return an error under a number of different - /// circumstances. Some of these error conditions are listed here, together - /// with their [`io::ErrorKind`]. The mapping to [`io::ErrorKind`]s is not - /// part of the compatibility contract of the function, especially the - /// [`Other`] kind might change to more specific kinds in the future. - /// - /// * [`NotFound`]: The specified file does not exist and neither `create` - /// or `create_new` is set. - /// * [`NotFound`]: One of the directory components of the file path does - /// not exist. - /// * [`PermissionDenied`]: The user lacks permission to get the specified - /// access rights for the file. - /// * [`PermissionDenied`]: The user lacks permission to open one of the - /// directory components of the specified path. - /// * [`AlreadyExists`]: `create_new` was specified and the file already - /// exists. - /// * [`InvalidInput`]: Invalid combinations of open options (truncate - /// without write access, no access mode set, etc.). - /// * [`Other`]: One of the directory components of the specified file path - /// was not, in fact, a directory. - /// * [`Other`]: Filesystem-level errors: full disk, write permission - /// requested on a read-only file system, exceeded disk quota, too many - /// open files, too long filename, too many symbolic links in the - /// specified path (Unix-like systems only), etc. - /// - /// # Examples - /// - /// ```no_run - /// use tokio_uring::fs::OpenOptions; - /// - /// fn main() -> Result<(), Box> { - /// tokio_uring::start(async { - /// let file = OpenOptions::new() - /// .read(true) - /// .open("foo.txt") - /// .await?; - /// Ok(()) - /// }) - /// } - /// ``` - /// - /// [`AlreadyExists`]: io::ErrorKind::AlreadyExists - /// [`InvalidInput`]: io::ErrorKind::InvalidInput - /// [`NotFound`]: io::ErrorKind::NotFound - /// [`Other`]: io::ErrorKind::Other - /// [`PermissionDenied`]: io::ErrorKind::PermissionDenied - pub async fn open(&self, path: impl AsRef) -> io::Result { - let op = Op::open(path.as_ref(), self)?; - - // Await the completion of the event - let completion = op.await; - - // The file is open - Ok(File::from_shared_fd(SharedFd::new(completion.result? as _))) - } - - pub(crate) fn access_mode(&self) -> io::Result { - match (self.read, self.write, self.append) { - (true, false, false) => Ok(libc::O_RDONLY), - (false, true, false) => Ok(libc::O_WRONLY), - (true, true, false) => Ok(libc::O_RDWR), - (false, _, true) => Ok(libc::O_WRONLY | libc::O_APPEND), - (true, _, true) => Ok(libc::O_RDWR | libc::O_APPEND), - (false, false, false) => Err(io::Error::from_raw_os_error(libc::EINVAL)), - } - } - - pub(crate) fn creation_mode(&self) -> io::Result { - match (self.write, self.append) { - (true, false) => {} - (false, false) => { - if self.truncate || self.create || self.create_new { - return Err(io::Error::from_raw_os_error(libc::EINVAL)); - } - } - (_, true) => { - if self.truncate && !self.create_new { - return Err(io::Error::from_raw_os_error(libc::EINVAL)); - } - } - } - - Ok(match (self.create, self.truncate, self.create_new) { - (false, false, false) => 0, - (true, false, false) => libc::O_CREAT, - (false, true, false) => libc::O_TRUNC, - (true, true, false) => libc::O_CREAT | libc::O_TRUNC, - (_, _, true) => libc::O_CREAT | libc::O_EXCL, - }) - } -} diff --git a/src/common/tokio_uring/future.rs b/src/common/tokio_uring/future.rs deleted file mode 100644 index 1fd4d0e2c..000000000 --- a/src/common/tokio_uring/future.rs +++ /dev/null @@ -1,37 +0,0 @@ -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; - -macro_rules! ready { - ($e:expr $(,)?) => { - match $e { - std::task::Poll::Ready(t) => t, - std::task::Poll::Pending => return std::task::Poll::Pending, - } - }; -} - -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub(crate) struct PollFn { - f: F, -} - -impl Unpin for PollFn {} - -pub(crate) fn poll_fn(f: F) -> PollFn -where - F: FnMut(&mut Context<'_>) -> Poll, -{ - PollFn { f } -} - -impl Future for PollFn -where - F: FnMut(&mut Context<'_>) -> Poll, -{ - type Output = T; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - (&mut self.f)(cx) - } -} diff --git a/src/common/tokio_uring/mod.rs b/src/common/tokio_uring/mod.rs deleted file mode 100644 index 55d06c827..000000000 --- a/src/common/tokio_uring/mod.rs +++ /dev/null @@ -1,186 +0,0 @@ -//! Tokio-uring provides a safe [io-uring] interface for the Tokio runtime. The -//! library requires Linux kernel 5.10 or later. -//! -//! [io-uring]: https://kernel.dk/io_uring.pdf -//! -//! # Getting started -//! -//! Using `tokio-uring` requires starting a [`tokio-uring`] runtime. This -//! runtime internally manages the main Tokio runtime and a `io-uring` driver. -//! -//! ```no_run -//! use tokio_uring::fs::File; -//! -//! fn main() -> Result<(), Box> { -//! tokio_uring::start(async { -//! // Open a file -//! let file = File::open("hello.txt").await?; -//! -//! let buf = vec![0; 4096]; -//! // Read some data, the buffer is passed by ownership and -//! // submitted to the kernel. When the operation completes, -//! // we get the buffer back. -//! let (res, buf) = file.read_at(buf, 0).await; -//! let n = res?; -//! -//! // Display the contents -//! println!("{:?}", &buf[..n]); -//! -//! Ok(()) -//! }) -//! } -//! ``` -//! -//! Under the hood, `tokio_uring::start` starts a [`current-thread`] Runtime. -//! For concurrency, spawn multiple threads, each with a `tokio-uring` runtime. -//! The `tokio-uring` resource types are optimized for single-threaded usage and -//! most are `!Sync`. -//! -//! # Submit-based operations -//! -//! Unlike Tokio proper, `io-uring` is based on submission based operations. -//! Ownership of resources are passed to the kernel, which then performs the -//! operation. When the operation completes, ownership is passed back to the -//! caller. Because of this difference, the `tokio-uring` APIs diverge. -//! -//! For example, in the above example, reading from a `File` requires passing -//! ownership of the buffer. -//! -//! # Closing resources -//! -//! With `io-uring`, closing a resource (e.g. a file) is an asynchronous -//! operation. Because Rust does not support asynchronous drop yet, resource -//! types provide an explicit `close()` function. If the `close()` function is -//! not called, the resource will still be closed on drop, but the operation -//! will happen in the background. There is no guarantee as to **when** the -//! implicit close-on-drop operation happens, so it is recommended to explicitly -//! call `close()`. - -#![warn(missing_docs)] -#![allow(clippy::needless_borrow)] -#![allow(clippy::assign_op_pattern)] -#![allow(clippy::useless_conversion)] -#![allow(clippy::redundant_closure)] -#![allow(clippy::needless_doctest_main)] -#![allow(clippy::missing_safety_doc)] -#![allow(clippy::new_without_default)] -#![allow(clippy::needless_return)] - -macro_rules! syscall { - ($fn: ident ( $($arg: expr),* $(,)* ) ) => {{ - let res = unsafe { libc::$fn($($arg, )*) }; - if res == -1 { - Err(std::io::Error::last_os_error()) - } else { - Ok(res) - } - }}; -} - -#[macro_use] -pub(crate) mod future; -pub(crate) mod driver; -mod runtime; - -pub mod buf; -pub mod fs; -pub mod net; - -pub use runtime::{spawn, Runtime}; - -use std::future::Future; - -/// Start an `io_uring` enabled Tokio runtime. -/// -/// All `tokio-uring` resource types must be used from within the context of a -/// runtime. The `start` method initializes the runtime and runs it for the -/// duration of `future`. -/// -/// The `tokio-uring` runtime is compatible with all Tokio, so it is possible to -/// run Tokio based libraries (e.g. hyper) from within the tokio-uring runtime. -/// A `tokio-uring` runtime consists of a Tokio `current_thread` runtime and an -/// `io-uring` driver. All tasks spawned on the `tokio-uring` runtime are -/// executed on the current thread. To add concurrency, spawn multiple threads, -/// each with a `tokio-uring` runtime. -/// -/// # Examples -/// -/// Basic usage -/// -/// ```no_run -/// use tokio_uring::fs::File; -/// -/// fn main() -> Result<(), Box> { -/// tokio_uring::start(async { -/// // Open a file -/// let file = File::open("hello.txt").await?; -/// -/// let buf = vec![0; 4096]; -/// // Read some data, the buffer is passed by ownership and -/// // submitted to the kernel. When the operation completes, -/// // we get the buffer back. -/// let (res, buf) = file.read_at(buf, 0).await; -/// let n = res?; -/// -/// // Display the contents -/// println!("{:?}", &buf[..n]); -/// -/// Ok(()) -/// }) -/// } -/// ``` -/// -/// Using Tokio types from the `tokio-uring` runtime -/// -/// -/// ```no_run -/// use tokio::net::TcpListener; -/// -/// fn main() -> Result<(), Box> { -/// tokio_uring::start(async { -/// let listener = TcpListener::bind("127.0.0.1:8080").await?; -/// -/// loop { -/// let (socket, _) = listener.accept().await?; -/// // process socket -/// } -/// }) -/// } -/// ``` -pub fn start(future: F) -> F::Output { - let mut rt = runtime::Runtime::new().unwrap(); - rt.block_on(future) -} - -/// A specialized `Result` type for `io-uring` operations with buffers. -/// -/// This type is used as a return value for asynchronous `io-uring` methods that -/// require passing ownership of a buffer to the runtime. When the operation -/// completes, the buffer is returned whether or not the operation completed -/// successfully. -/// -/// # Examples -/// -/// ```no_run -/// use tokio_uring::fs::File; -/// -/// fn main() -> Result<(), Box> { -/// tokio_uring::start(async { -/// // Open a file -/// let file = File::open("hello.txt").await?; -/// -/// let buf = vec![0; 4096]; -/// // Read some data, the buffer is passed by ownership and -/// // submitted to the kernel. When the operation completes, -/// // we get the buffer back. -/// let (res, buf) = file.read_at(buf, 0).await; -/// let n = res?; -/// -/// // Display the contents -/// println!("{:?}", &buf[..n]); -/// -/// Ok(()) -/// }) -/// } -/// ``` -pub type BufResult = (std::io::Result, B); diff --git a/src/common/tokio_uring/net/mod.rs b/src/common/tokio_uring/net/mod.rs deleted file mode 100644 index 8d596fdf4..000000000 --- a/src/common/tokio_uring/net/mod.rs +++ /dev/null @@ -1,22 +0,0 @@ -//! TCP/UDP bindings for `tokio-uring`. -//! -//! This module contains the TCP/UDP networking types, similar to the standard -//! library, which can be used to implement networking protocols. -//! -//! # Organization -//! -//! * [`TcpListener`] and [`TcpStream`] provide functionality for communication over TCP -//! * [`UdpSocket`] provides functionality for communication over UDP - -//! -//! [`TcpListener`]: TcpListener -//! [`TcpStream`]: TcpStream -//! [`UdpSocket`]: UdpSocket - -mod tcp; -mod udp; -mod unix; - -pub use tcp::{TcpListener, TcpStream}; -pub use udp::UdpSocket; -pub use unix::{UnixListener, UnixStream}; diff --git a/src/common/tokio_uring/net/tcp/listener.rs b/src/common/tokio_uring/net/tcp/listener.rs deleted file mode 100644 index b23c6d908..000000000 --- a/src/common/tokio_uring/net/tcp/listener.rs +++ /dev/null @@ -1,69 +0,0 @@ -use super::TcpStream; -use crate::driver::Socket; -use std::{io, net::SocketAddr}; - -/// A TCP socket server, listening for connections. -/// -/// You can accept a new connection by using the [`accept`](`TcpListener::accept`) -/// method. -/// -/// # Examples -/// -/// ``` -/// use tokio_uring::net::TcpListener; -/// use tokio_uring::net::TcpStream; -/// -/// fn main() { -/// let listener = TcpListener::bind("127.0.0.1:2345".parse().unwrap()).unwrap(); -/// -/// tokio_uring::start(async move { -/// let tx_fut = TcpStream::connect("127.0.0.1:2345".parse().unwrap()); -/// -/// let rx_fut = listener.accept(); -/// -/// let (tx, (rx, _)) = tokio::try_join!(tx_fut, rx_fut).unwrap(); -/// -/// tx.write(b"test" as &'static [u8]).await.0.unwrap(); -/// -/// let (_, buf) = rx.read(vec![0; 4]).await; -/// -/// assert_eq!(buf, b"test"); -/// }); -/// } -/// ``` -pub struct TcpListener { - inner: Socket, -} - -impl TcpListener { - /// Creates a new TcpListener, which will be bound to the specified address. - /// - /// The returned listener is ready for accepting connections. - /// - /// Binding with a port number of 0 will request that the OS assigns a port - /// to this listener. - /// - /// In the future, the port allocated can be queried via a (blocking) `local_addr` - /// method. - pub fn bind(addr: SocketAddr) -> io::Result { - let socket = Socket::bind(addr, libc::SOCK_STREAM)?; - socket.listen(1024)?; - return Ok(TcpListener { inner: socket }); - } - - /// Accepts a new incoming connection from this listener. - /// - /// This function will yield once a new TCP connection is established. When - /// established, the corresponding [`TcpStream`] and the remote peer's - /// address will be returned. - /// - /// [`TcpStream`]: struct@crate::net::TcpStream - pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> { - let (socket, socket_addr) = self.inner.accept().await?; - let stream = TcpStream { inner: socket }; - let socket_addr = socket_addr.ok_or_else(|| { - io::Error::new(io::ErrorKind::Other, "Could not get socket IP address") - })?; - Ok((stream, socket_addr)) - } -} diff --git a/src/common/tokio_uring/net/tcp/mod.rs b/src/common/tokio_uring/net/tcp/mod.rs deleted file mode 100644 index 5374b1d14..000000000 --- a/src/common/tokio_uring/net/tcp/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -mod listener; -pub use listener::TcpListener; - -mod stream; -pub use stream::TcpStream; diff --git a/src/common/tokio_uring/net/tcp/stream.rs b/src/common/tokio_uring/net/tcp/stream.rs deleted file mode 100644 index 21eef90ea..000000000 --- a/src/common/tokio_uring/net/tcp/stream.rs +++ /dev/null @@ -1,60 +0,0 @@ -use std::{io, net::SocketAddr}; - -use crate::{ - buf::{IoBuf, IoBufMut}, - driver::Socket, -}; - -/// A TCP stream between a local and a remote socket. -/// -/// A TCP stream can either be created by connecting to an endpoint, via the -/// [`connect`] method, or by [`accepting`] a connection from a [`listener`]. -/// -/// # Examples -/// -/// ```no_run -/// use tokio_uring::net::TcpStream; -/// use std::net::ToSocketAddrs; -/// -/// fn main() -> std::io::Result<()> { -/// tokio_uring::start(async { -/// // Connect to a peer -/// let mut stream = TcpStream::connect("127.0.0.1:8080".parse().unwrap()).await?; -/// -/// // Write some data. -/// let (result, _) = stream.write(b"hello world!".as_slice()).await; -/// result.unwrap(); -/// -/// Ok(()) -/// }) -/// } -/// ``` -/// -/// [`connect`]: TcpStream::connect -/// [`accepting`]: crate::net::TcpListener::accept -/// [`listener`]: crate::net::TcpListener -pub struct TcpStream { - pub(super) inner: Socket, -} - -impl TcpStream { - /// Opens a TCP connection to a remote host at the given `SocketAddr` - pub async fn connect(addr: SocketAddr) -> io::Result { - let socket = Socket::new(addr, libc::SOCK_STREAM)?; - socket.connect(socket2::SockAddr::from(addr)).await?; - let tcp_stream = TcpStream { inner: socket }; - return Ok(tcp_stream); - } - - /// Read some data from the stream into the buffer, returning the original buffer and - /// quantity of data read. - pub async fn read(&self, buf: T) -> crate::BufResult { - self.inner.read(buf).await - } - - /// Write some data to the stream from the buffer, returning the original buffer and - /// quantity of data written. - pub async fn write(&self, buf: T) -> crate::BufResult { - self.inner.write(buf).await - } -} diff --git a/src/common/tokio_uring/net/udp.rs b/src/common/tokio_uring/net/udp.rs deleted file mode 100644 index 7bebf3679..000000000 --- a/src/common/tokio_uring/net/udp.rs +++ /dev/null @@ -1,189 +0,0 @@ -use crate::{ - buf::{IoBuf, IoBufMut}, - driver::Socket, -}; -use socket2::SockAddr; -use std::{io, net::SocketAddr}; - -/// A UDP socket. -/// -/// UDP is "connectionless", unlike TCP. Meaning, regardless of what address you've bound to, a `UdpSocket` -/// is free to communicate with many different remotes. In tokio there are basically two main ways to use `UdpSocket`: -/// -/// * one to many: [`bind`](`UdpSocket::bind`) and use [`send_to`](`UdpSocket::send_to`) -/// and [`recv_from`](`UdpSocket::recv_from`) to communicate with many different addresses -/// * one to one: [`connect`](`UdpSocket::connect`) and associate with a single address, using [`write`](`UdpSocket::write`) -/// and [`read`](`UdpSocket::read`) to communicate only with that remote address -/// -/// # Examples -/// Bind and connect a pair of sockets and send a packet: -/// -/// ``` -/// use tokio_uring::net::UdpSocket; -/// use std::net::SocketAddr; -/// fn main() -> std::io::Result<()> { -/// tokio_uring::start(async { -/// let first_addr: SocketAddr = "127.0.0.1:2401".parse().unwrap(); -/// let second_addr: SocketAddr = "127.0.0.1:8080".parse().unwrap(); -/// -/// // bind sockets -/// let socket = UdpSocket::bind(first_addr.clone()).await?; -/// let other_socket = UdpSocket::bind(second_addr.clone()).await?; -/// -/// // connect sockets -/// socket.connect(second_addr).await.unwrap(); -/// other_socket.connect(first_addr).await.unwrap(); -/// -/// let buf = vec![0; 32]; -/// -/// // write data -/// let (result, _) = socket.write(b"hello world".as_slice()).await; -/// result.unwrap(); -/// -/// // read data -/// let (result, buf) = other_socket.read(buf).await; -/// let n_bytes = result.unwrap(); -/// -/// assert_eq!(b"hello world", &buf[..n_bytes]); -/// -/// Ok(()) -/// }) -/// } -/// ``` -/// Send and receive packets without connecting: -/// -/// ``` -/// use tokio_uring::net::UdpSocket; -/// use std::net::SocketAddr; -/// fn main() -> std::io::Result<()> { -/// tokio_uring::start(async { -/// let first_addr: SocketAddr = "127.0.0.1:2401".parse().unwrap(); -/// let second_addr: SocketAddr = "127.0.0.1:8080".parse().unwrap(); -/// -/// // bind sockets -/// let socket = UdpSocket::bind(first_addr.clone()).await?; -/// let other_socket = UdpSocket::bind(second_addr.clone()).await?; -/// -/// let buf = vec![0; 32]; -/// -/// // write data -/// let (result, _) = socket.send_to(b"hello world".as_slice(), second_addr).await; -/// result.unwrap(); -/// -/// // read data -/// let (result, buf) = other_socket.recv_from(buf).await; -/// let (n_bytes, addr) = result.unwrap(); -/// -/// assert_eq!(addr, first_addr); -/// assert_eq!(b"hello world", &buf[..n_bytes]); -/// -/// Ok(()) -/// }) -/// } -/// ``` -pub struct UdpSocket { - pub(super) inner: Socket, -} - -impl UdpSocket { - /// Creates a new UDP socket and attempt to bind it to the addr provided. - pub async fn bind(socket_addr: SocketAddr) -> io::Result { - let socket = Socket::bind(socket_addr, libc::SOCK_DGRAM)?; - Ok(UdpSocket { inner: socket }) - } - - /// Creates new `UdpSocket` from a previously bound `std::net::UdpSocket`. - /// - /// This function is intended to be used to wrap a UDP socket from the - /// standard library in the Tokio equivalent. The conversion assumes nothing - /// about the underlying socket; it is left up to the user to decide what socket - /// options are appropriate for their use case. - /// - /// This can be used in conjunction with socket2's `Socket` interface to - /// configure a socket before it's handed off, such as setting options like - /// `reuse_address` or binding to multiple addresses. - /// - /// # Example - /// - /// ```ignored - /// use socket2::{Protocol, Socket, Type}; - /// use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; - /// use tokio_uring::net::UdpSocket; - /// - /// fn main() -> std::io::Result<()> { - /// tokio_uring::start(async { - /// let std_addr: SocketAddr = "127.0.0.1:2401".parse().unwrap(); - /// let second_addr: SocketAddr = "127.0.0.1:8080".parse().unwrap(); - /// let sock = Socket::new(socket2::Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?; - /// sock.set_reuse_port(true)?; - /// sock.set_nonblocking(true)?; - /// sock.bind(&std_addr.into())?; - /// - /// let std_socket = UdpSocket::from_std(sock.into()); - /// let other_socket = UdpSocket::bind(second_addr).await?; - /// - /// let buf = vec![0; 32]; - /// - /// // write data - /// let (result, _) = std_socket - /// .send_to(b"hello world".as_slice(), second_addr) - /// .await; - /// result.unwrap(); - /// - /// // read data - /// let (result, buf) = other_socket.recv_from(buf).await; - /// let (n_bytes, addr) = result.unwrap(); - /// - /// assert_eq!(addr, std_addr); - /// assert_eq!(b"hello world", &buf[..n_bytes]); - /// - /// Ok(()) - /// }) - /// } - /// ``` - pub fn from_std(socket: std::net::UdpSocket) -> UdpSocket { - let inner_socket = Socket::from_std(socket); - Self { - inner: inner_socket, - } - } - - /// Connects this UDP socket to a remote address, allowing the `write` and - /// `read` syscalls to be used to send data and also applies filters to only - /// receive data from the specified address. - /// - /// Note that usually, a successful `connect` call does not specify - /// that there is a remote server listening on the port, rather, such an - /// error would only be detected after the first send. - pub async fn connect(&self, socket_addr: SocketAddr) -> io::Result<()> { - self.inner.connect(SockAddr::from(socket_addr)).await - } - - /// Sends data on the socket to the given address. On success, returns the - /// number of bytes written. - pub async fn send_to( - &self, - buf: T, - socket_addr: SocketAddr, - ) -> crate::BufResult { - self.inner.send_to(buf, socket_addr).await - } - - /// Receives a single datagram message on the socket. On success, returns - /// the number of bytes read and the origin. - pub async fn recv_from(&self, buf: T) -> crate::BufResult<(usize, SocketAddr), T> { - self.inner.recv_from(buf).await - } - - /// Read a packet of data from the socket into the buffer, returning the original buffer and - /// quantity of data read. - pub async fn read(&self, buf: T) -> crate::BufResult { - self.inner.read(buf).await - } - - /// Write some data to the socket from the buffer, returning the original buffer and - /// quantity of data written. - pub async fn write(&self, buf: T) -> crate::BufResult { - self.inner.write(buf).await - } -} diff --git a/src/common/tokio_uring/net/unix/listener.rs b/src/common/tokio_uring/net/unix/listener.rs deleted file mode 100644 index d0f81c270..000000000 --- a/src/common/tokio_uring/net/unix/listener.rs +++ /dev/null @@ -1,61 +0,0 @@ -use super::UnixStream; -use crate::driver::Socket; -use std::{io, path::Path}; - -/// A Unix socket server, listening for connections. -/// -/// You can accept a new connection by using the [`accept`](`UnixListener::accept`) -/// method. -/// -/// # Examples -/// -/// ``` -/// use tokio_uring::net::UnixListener; -/// use tokio_uring::net::UnixStream; -/// -/// fn main() { -/// let sock_file = "/tmp/tokio-uring-unix-test.sock"; -/// let listener = UnixListener::bind(&sock_file).unwrap(); -/// -/// tokio_uring::start(async move { -/// let tx_fut = UnixStream::connect(&sock_file); -/// -/// let rx_fut = listener.accept(); -/// -/// let (tx, rx) = tokio::try_join!(tx_fut, rx_fut).unwrap(); -/// -/// tx.write(b"test" as &'static [u8]).await.0.unwrap(); -/// -/// let (_, buf) = rx.read(vec![0; 4]).await; -/// -/// assert_eq!(buf, b"test"); -/// }); -/// std::fs::remove_file(&sock_file); -/// } -/// ``` -pub struct UnixListener { - inner: Socket, -} - -impl UnixListener { - /// Creates a new UnixListener, which will be bound to the specified file path. - /// The file path cannnot yet exist, and will be cleaned up upon dropping `UnixListener` - pub fn bind>(path: P) -> io::Result { - let socket = Socket::bind_unix(path, libc::SOCK_STREAM)?; - socket.listen(1024)?; - Ok(UnixListener { inner: socket }) - } - - /// Accepts a new incoming connection from this listener. - /// - /// This function will yield once a new Unix domain socket connection - /// is established. When established, the corresponding [`UnixStream`] and - /// will be returned. - /// - /// [`UnixStream`]: struct@crate::net::UnixStream - pub async fn accept(&self) -> io::Result { - let (socket, _) = self.inner.accept().await?; - let stream = UnixStream { inner: socket }; - Ok(stream) - } -} diff --git a/src/common/tokio_uring/net/unix/mod.rs b/src/common/tokio_uring/net/unix/mod.rs deleted file mode 100644 index 1e30ce357..000000000 --- a/src/common/tokio_uring/net/unix/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -mod listener; -pub use listener::UnixListener; - -mod stream; -pub use stream::UnixStream; diff --git a/src/common/tokio_uring/net/unix/stream.rs b/src/common/tokio_uring/net/unix/stream.rs deleted file mode 100644 index f312af938..000000000 --- a/src/common/tokio_uring/net/unix/stream.rs +++ /dev/null @@ -1,62 +0,0 @@ -use crate::{ - buf::{IoBuf, IoBufMut}, - driver::Socket, -}; -use socket2::SockAddr; -use std::{io, path::Path}; - -/// A Unix stream between two local sockets on a Unix OS. -/// -/// A Unix stream can either be created by connecting to an endpoint, via the -/// [`connect`] method, or by [`accepting`] a connection from a [`listener`]. -/// -/// # Examples -/// -/// ```no_run -/// use tokio_uring::net::UnixStream; -/// use std::net::ToSocketAddrs; -/// -/// fn main() -> std::io::Result<()> { -/// tokio_uring::start(async { -/// // Connect to a peer -/// let mut stream = UnixStream::connect("/tmp/tokio-uring-unix-test.sock").await?; -/// -/// // Write some data. -/// let (result, _) = stream.write(b"hello world!".as_slice()).await; -/// result.unwrap(); -/// -/// Ok(()) -/// }) -/// } -/// ``` -/// -/// [`connect`]: UnixStream::connect -/// [`accepting`]: crate::net::UnixListener::accept -/// [`listener`]: crate::net::UnixListener -pub struct UnixStream { - pub(super) inner: Socket, -} - -impl UnixStream { - /// Opens a Unix connection to the specified file path. There must be a - /// `UnixListener` or equivalent listening on the corresponding Unix domain socket - /// to successfully connect and return a `UnixStream`. - pub async fn connect>(path: P) -> io::Result { - let socket = Socket::new_unix(libc::SOCK_STREAM)?; - socket.connect(SockAddr::unix(path)?).await?; - let unix_stream = UnixStream { inner: socket }; - Ok(unix_stream) - } - - /// Read some data from the stream into the buffer, returning the original buffer and - /// quantity of data read. - pub async fn read(&self, buf: T) -> crate::BufResult { - self.inner.read(buf).await - } - - /// Write some data to the stream from the buffer, returning the original buffer and - /// quantity of data written. - pub async fn write(&self, buf: T) -> crate::BufResult { - self.inner.write(buf).await - } -} diff --git a/src/common/tokio_uring/runtime.rs b/src/common/tokio_uring/runtime.rs deleted file mode 100644 index 97f56d4df..000000000 --- a/src/common/tokio_uring/runtime.rs +++ /dev/null @@ -1,104 +0,0 @@ -use crate::driver::{Driver, CURRENT}; -use std::cell::RefCell; - -use std::future::Future; -use std::io; -use tokio::io::unix::AsyncFd; -use tokio::task::LocalSet; - -/// The tokio-uring runtime based on the Tokio current thread runtime. -pub struct Runtime { - /// io-uring driver - driver: AsyncFd, - - /// LocalSet for !Send tasks - local: LocalSet, - - /// Tokio runtime, always current-thread - rt: tokio::runtime::Runtime, -} - -/// Spawns a new asynchronous task, returning a [`JoinHandle`] for it. -/// -/// Spawning a task enables the task to execute concurrently to other tasks. -/// There is no guarantee that a spawned task will execute to completion. When a -/// runtime is shutdown, all outstanding tasks are dropped, regardless of the -/// lifecycle of that task. -/// -/// This function must be called from the context of a `tokio-uring` runtime. -/// -/// [`JoinHandle`]: tokio::task::JoinHandle -/// -/// # Examples -/// -/// In this example, a server is started and `spawn` is used to start a new task -/// that processes each received connection. -/// -/// ```no_run -/// fn main() { -/// tokio_uring::start(async { -/// let handle = tokio_uring::spawn(async { -/// println!("hello from a background task"); -/// }); -/// -/// // Let the task complete -/// handle.await.unwrap(); -/// }); -/// } -/// ``` -pub fn spawn(task: T) -> tokio::task::JoinHandle { - tokio::task::spawn_local(task) -} - -impl Runtime { - /// Create a new tokio-uring [Runtime] object. - pub fn new() -> io::Result { - let rt = tokio::runtime::Builder::new_current_thread() - .on_thread_park(|| { - CURRENT.with(|x| { - let _ = RefCell::borrow_mut(x).uring.submit(); - }); - }) - .enable_all() - .build()?; - - let local = LocalSet::new(); - - let driver = { - let _guard = rt.enter(); - AsyncFd::new(Driver::new()?)? - }; - - Ok(Runtime { driver, local, rt }) - } - - /// Runs a future to completion on the Tokio-uring runtime. - /// - /// This runs the given future on the current thread, blocking until it is - /// complete, and yielding its resolved result. Any tasks or timers - /// which the future spawns internally will be executed on the runtime. - pub fn block_on(&mut self, future: F) -> F::Output - where - F: Future, - { - self.driver.get_ref().with(|| { - let drive = async { - loop { - // Wait for read-readiness - let mut guard = self.driver.readable().await.unwrap(); - self.driver.get_ref().tick(); - guard.clear_ready(); - } - }; - - tokio::pin!(drive); - tokio::pin!(future); - - self.rt - .block_on(self.local.run_until(crate::future::poll_fn(|cx| { - assert!(drive.as_mut().poll(cx).is_pending()); - future.as_mut().poll(cx) - }))) - }) - } -}