Skip to content

Commit

Permalink
fix: rework SharedFd (#232)
Browse files Browse the repository at this point in the history
This allows a 'file.close.wait' to return a true close return value. And
it fixes the problem of indefinitely hanging when the SharedFd was still
held by an in-flight operation.

It simplifies the low level close logic, using the async uring close
operation when the caller is using file.close.wait but using the
synchronous close mechanism provided by the std library when the
SharedFd is allowed to go out of scope without having been closed first.

Manual closing of the file should be encouraged because it allows the
user to see the return code.

In order to let the file owner call close and not hang when an in-flight
was still in progress, this reworks the future to return when it sees
the strong reference count drop to 1.

Fixes #122.
  • Loading branch information
FrankReh authored Feb 10, 2023
1 parent afab7f2 commit 96ad633
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 93 deletions.
20 changes: 11 additions & 9 deletions src/fs/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -824,14 +824,17 @@ impl File {
Op::statx(&self.fd)?.await
}

/// Closes the file.
/// Closes the file using the uring asynchronous close operation and returns the possible error
/// as described in the close(2) man page.
///
/// The method completes once the close operation has completed,
/// guaranteeing that resources associated with the file have been released.
/// The programmer has the choice of calling this asynchronous close and waiting for the result
/// or letting the library close the file automatically and simply letting the file go out of
/// scope and having the library close the file descriptor automatically and synchronously.
///
/// If `close` is not called before dropping the file, the file is closed in
/// the background, but there is no guarantee as to **when** the close
/// operation will complete.
/// Calling this asynchronous close is to be preferred because it returns the close result
/// which as the man page points out, should not be ignored. This asynchronous close also
/// avoids the synchronous close system call and may result in better throughput as the thread
/// is not blocked during the close.
///
/// # Examples
///
Expand All @@ -849,9 +852,8 @@ impl File {
/// })
/// }
/// ```
pub async fn close(self) -> io::Result<()> {
self.fd.close().await;
Ok(())
pub async fn close(mut self) -> io::Result<()> {
self.fd.close().await
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/future.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// TODO see about removing or just commenting out.
#[allow(unused_macros)]
macro_rules! ready {
($e:expr $(,)?) => {
match $e {
Expand Down
1 change: 0 additions & 1 deletion src/io/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
mod accept;

mod close;
pub(crate) use close::Close;

mod connect;

Expand Down
166 changes: 84 additions & 82 deletions src/io/shared_fd.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
use crate::io::Close;
use std::future::poll_fn;

use std::cell::RefCell;
use std::os::unix::io::{FromRawFd, RawFd};
use std::rc::Rc;
use std::task::Waker;
use std::{
cell::RefCell,
io,
os::unix::io::{FromRawFd, RawFd},
rc::Rc,
task::Waker,
};

use crate::runtime::driver::op::Op;
use crate::runtime::CONTEXT;

// Tracks in-flight operations on a file descriptor. Ensures all in-flight
// operations complete before submitting the close.
//
// If the runtime is unavailable, will fall back to synchronous Close to ensure
// File resources are not leaked.
// When closing the file descriptor because it is going out of scope, a synchronous close is
// employed.
//
// The closed state is tracked so close calls after the first are ignored.
// Only the first close call returns the true result of closing the file descriptor.
#[derive(Clone)]
pub(crate) struct SharedFd {
inner: Rc<Inner>,
Expand All @@ -23,21 +27,19 @@ struct Inner {
// Open file descriptor
fd: RawFd,

// Waker to notify when the close operation completes.
// Track the sharing state of the file descriptor:
// normal, being waited on to allow a close by the parent's owner, or already closed.
state: RefCell<State>,
}

enum State {
/// Initial state
Init,

/// Waiting for all in-flight operation to complete.
Waiting(Option<Waker>),

/// The FD is closing
Closing(Op<Close>),
/// Waiting for the number of strong Rc pointers to drop to 1.
WaitingForUniqueness(Waker),

/// The FD is fully closed
/// The close has been triggered by the parent owner.
Closed,
}

Expand All @@ -60,99 +62,99 @@ impl SharedFd {
/// This prevents bugs where in-flight reads could operate on the incorrect
/// file descriptor.
///
/// TO model this, if there are no in-flight operations, then
pub(crate) async fn close(mut self) {
// Get a mutable reference to Inner, indicating there are no
// in-flight operations on the FD.
if let Some(inner) = Rc::get_mut(&mut self.inner) {
// Submit the close operation
inner.submit_close_op();
}

self.inner.closed().await;
}
}

impl Inner {
/// If there are no in-flight operations, submit the operation.
fn submit_close_op(&mut self) {
// Close the FD
let state = RefCell::get_mut(&mut self.state);

// Submit a close operation
// If either:
// - runtime has already closed, or
// - submitting the Close operation fails
// we fall back on a synchronous `close`. This is safe as, at this point,
// we guarantee all in-flight operations have completed. The most
// common cause for an error is attempting to close the FD while
// off runtime.
//
// This is done by initializing a `File` with the FD and
// dropping it.
//
// TODO: Should we warn?
*state = match CONTEXT.try_with(|cx| cx.is_set()) {
Ok(true) => match Op::close(self.fd) {
Ok(op) => State::Closing(op),
Err(_) => {
let _ = unsafe { std::fs::File::from_raw_fd(self.fd) };
State::Closed
}
},
_ => {
let _ = unsafe { std::fs::File::from_raw_fd(self.fd) };
State::Closed
pub(crate) async fn close(&mut self) -> io::Result<()> {
loop {
// Get a mutable reference to Inner, indicating there are no
// in-flight operations on the FD.
if let Some(inner) = Rc::get_mut(&mut self.inner) {
// Wait for the close operation.
return inner.async_close_op().await;
}
};

self.sharedfd_is_unique().await;
}
}

/// Completes when the FD has been closed.
async fn closed(&self) {
use std::future::Future;
use std::pin::Pin;
/// Completes when the SharedFd's Inner Rc strong count is 1.
/// Gets polled any time a SharedFd is dropped.
async fn sharedfd_is_unique(&self) {
use std::task::Poll;

poll_fn(|cx| {
let mut state = self.state.borrow_mut();
if Rc::<Inner>::strong_count(&self.inner) == 1 {
return Poll::Ready(());
}

let mut state = self.inner.state.borrow_mut();

match &mut *state {
State::Init => {
*state = State::Waiting(Some(cx.waker().clone()));
*state = State::WaitingForUniqueness(cx.waker().clone());
Poll::Pending
}
State::Waiting(Some(waker)) => {
State::WaitingForUniqueness(waker) => {
if !waker.will_wake(cx.waker()) {
*waker = cx.waker().clone();
}

Poll::Pending
}
State::Waiting(None) => {
*state = State::Waiting(Some(cx.waker().clone()));
Poll::Pending
}
State::Closing(op) => {
// Nothing to do if the close opeation failed.
let _ = ready!(Pin::new(op).poll(cx));
*state = State::Closed;
Poll::Ready(())
}
State::Closed => Poll::Ready(()),
}
})
.await;
}
}

impl Drop for Inner {
impl Inner {
async fn async_close_op(&mut self) -> io::Result<()> {
// &mut self implies there are no outstanding operations.
// If state already closed, the user closed multiple times; simply return Ok.
// Otherwise, set state to closed and then submit and await the uring close operation.
{
// Release state guard before await.
let state = RefCell::get_mut(&mut self.state);

if let State::Closed = *state {
return Ok(());
}

*state = State::Closed;
}
Op::close(self.fd)?.await
}
}

impl Drop for SharedFd {
fn drop(&mut self) {
// Submit the close operation, if needed
match RefCell::get_mut(&mut self.state) {
State::Init | State::Waiting(..) => {
self.submit_close_op();
// If the SharedFd state is Waiting
// The job of the SharedFd's drop is to possibly wake a task that is waiting for the
// reference count to go down.
use std::mem;

let mut state = self.inner.state.borrow_mut();
if let State::WaitingForUniqueness(_) = *state {
let state = &mut *state;
if let State::WaitingForUniqueness(waker) = mem::replace(state, State::Init) {
// Wake the task wanting to close this SharedFd and let it try again. If it finds
// there are no more outstanding clones, it will succeed. Otherwise it will start a new
// Future, waiting for another SharedFd to be dropped.
waker.wake()
}
_ => {}
}
}
}

impl Drop for Inner {
fn drop(&mut self) {
// If the inner state isn't `Closed`, the user hasn't called close().await
// so do it synchronously.

let state = self.state.borrow_mut();

if let State::Closed = *state {
return;
}
let _ = unsafe { std::fs::File::from_raw_fd(self.fd) };
}
}
1 change: 1 addition & 0 deletions src/runtime/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ impl RuntimeContext {
}

/// Check if driver is initialized
#[allow(dead_code)]
pub(crate) fn is_set(&self) -> bool {
self.driver
.try_borrow()
Expand Down
2 changes: 1 addition & 1 deletion tests/fs_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,6 @@ fn assert_invalid_fd(fd: RawFd) {

match f.read_to_end(&mut buf) {
Err(ref e) if e.raw_os_error() == Some(libc::EBADF) => {}
res => panic!("{:?}", res),
res => panic!("assert_invalid_fd finds for fd {:?}, res = {:?}", fd, res),
}
}

0 comments on commit 96ad633

Please sign in to comment.