From 96ad6339b1f367c070725a44ec56a28ac9f6fb0d Mon Sep 17 00:00:00 2001 From: FrankReh Date: Thu, 9 Feb 2023 22:44:24 -0500 Subject: [PATCH] fix: rework SharedFd (#232) This allows a 'file.close.wait' to return a true close return value. And it fixes the problem of indefinitely hanging when the SharedFd was still held by an in-flight operation. It simplifies the low level close logic, using the async uring close operation when the caller is using file.close.wait but using the synchronous close mechanism provided by the std library when the SharedFd is allowed to go out of scope without having been closed first. Manual closing of the file should be encouraged because it allows the user to see the return code. In order to let the file owner call close and not hang when an in-flight was still in progress, this reworks the future to return when it sees the strong reference count drop to 1. Fixes #122. --- src/fs/file.rs | 20 ++--- src/future.rs | 2 + src/io/mod.rs | 1 - src/io/shared_fd.rs | 166 +++++++++++++++++++++-------------------- src/runtime/context.rs | 1 + tests/fs_file.rs | 2 +- 6 files changed, 99 insertions(+), 93 deletions(-) diff --git a/src/fs/file.rs b/src/fs/file.rs index c25ce30b..d1e55d9a 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -824,14 +824,17 @@ impl File { Op::statx(&self.fd)?.await } - /// Closes the file. + /// Closes the file using the uring asynchronous close operation and returns the possible error + /// as described in the close(2) man page. /// - /// The method completes once the close operation has completed, - /// guaranteeing that resources associated with the file have been released. + /// The programmer has the choice of calling this asynchronous close and waiting for the result + /// or letting the library close the file automatically and simply letting the file go out of + /// scope and having the library close the file descriptor automatically and synchronously. /// - /// If `close` is not called before dropping the file, the file is closed in - /// the background, but there is no guarantee as to **when** the close - /// operation will complete. + /// Calling this asynchronous close is to be preferred because it returns the close result + /// which as the man page points out, should not be ignored. This asynchronous close also + /// avoids the synchronous close system call and may result in better throughput as the thread + /// is not blocked during the close. /// /// # Examples /// @@ -849,9 +852,8 @@ impl File { /// }) /// } /// ``` - pub async fn close(self) -> io::Result<()> { - self.fd.close().await; - Ok(()) + pub async fn close(mut self) -> io::Result<()> { + self.fd.close().await } } diff --git a/src/future.rs b/src/future.rs index 1f48623b..54aa94f1 100644 --- a/src/future.rs +++ b/src/future.rs @@ -1,3 +1,5 @@ +// TODO see about removing or just commenting out. +#[allow(unused_macros)] macro_rules! ready { ($e:expr $(,)?) => { match $e { diff --git a/src/io/mod.rs b/src/io/mod.rs index fe30e99f..7fe2233d 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -1,7 +1,6 @@ mod accept; mod close; -pub(crate) use close::Close; mod connect; diff --git a/src/io/shared_fd.rs b/src/io/shared_fd.rs index d573265f..e3423238 100644 --- a/src/io/shared_fd.rs +++ b/src/io/shared_fd.rs @@ -1,19 +1,23 @@ -use crate::io::Close; use std::future::poll_fn; -use std::cell::RefCell; -use std::os::unix::io::{FromRawFd, RawFd}; -use std::rc::Rc; -use std::task::Waker; +use std::{ + cell::RefCell, + io, + os::unix::io::{FromRawFd, RawFd}, + rc::Rc, + task::Waker, +}; use crate::runtime::driver::op::Op; -use crate::runtime::CONTEXT; // Tracks in-flight operations on a file descriptor. Ensures all in-flight // operations complete before submitting the close. // -// If the runtime is unavailable, will fall back to synchronous Close to ensure -// File resources are not leaked. +// When closing the file descriptor because it is going out of scope, a synchronous close is +// employed. +// +// The closed state is tracked so close calls after the first are ignored. +// Only the first close call returns the true result of closing the file descriptor. #[derive(Clone)] pub(crate) struct SharedFd { inner: Rc, @@ -23,7 +27,8 @@ struct Inner { // Open file descriptor fd: RawFd, - // Waker to notify when the close operation completes. + // Track the sharing state of the file descriptor: + // normal, being waited on to allow a close by the parent's owner, or already closed. state: RefCell, } @@ -31,13 +36,10 @@ enum State { /// Initial state Init, - /// Waiting for all in-flight operation to complete. - Waiting(Option), - - /// The FD is closing - Closing(Op), + /// Waiting for the number of strong Rc pointers to drop to 1. + WaitingForUniqueness(Waker), - /// The FD is fully closed + /// The close has been triggered by the parent owner. Closed, } @@ -60,84 +62,43 @@ impl SharedFd { /// This prevents bugs where in-flight reads could operate on the incorrect /// file descriptor. /// - /// TO model this, if there are no in-flight operations, then - pub(crate) async fn close(mut self) { - // Get a mutable reference to Inner, indicating there are no - // in-flight operations on the FD. - if let Some(inner) = Rc::get_mut(&mut self.inner) { - // Submit the close operation - inner.submit_close_op(); - } - - self.inner.closed().await; - } -} - -impl Inner { - /// If there are no in-flight operations, submit the operation. - fn submit_close_op(&mut self) { - // Close the FD - let state = RefCell::get_mut(&mut self.state); - - // Submit a close operation - // If either: - // - runtime has already closed, or - // - submitting the Close operation fails - // we fall back on a synchronous `close`. This is safe as, at this point, - // we guarantee all in-flight operations have completed. The most - // common cause for an error is attempting to close the FD while - // off runtime. - // - // This is done by initializing a `File` with the FD and - // dropping it. - // - // TODO: Should we warn? - *state = match CONTEXT.try_with(|cx| cx.is_set()) { - Ok(true) => match Op::close(self.fd) { - Ok(op) => State::Closing(op), - Err(_) => { - let _ = unsafe { std::fs::File::from_raw_fd(self.fd) }; - State::Closed - } - }, - _ => { - let _ = unsafe { std::fs::File::from_raw_fd(self.fd) }; - State::Closed + pub(crate) async fn close(&mut self) -> io::Result<()> { + loop { + // Get a mutable reference to Inner, indicating there are no + // in-flight operations on the FD. + if let Some(inner) = Rc::get_mut(&mut self.inner) { + // Wait for the close operation. + return inner.async_close_op().await; } - }; + + self.sharedfd_is_unique().await; + } } - /// Completes when the FD has been closed. - async fn closed(&self) { - use std::future::Future; - use std::pin::Pin; + /// Completes when the SharedFd's Inner Rc strong count is 1. + /// Gets polled any time a SharedFd is dropped. + async fn sharedfd_is_unique(&self) { use std::task::Poll; poll_fn(|cx| { - let mut state = self.state.borrow_mut(); + if Rc::::strong_count(&self.inner) == 1 { + return Poll::Ready(()); + } + + let mut state = self.inner.state.borrow_mut(); match &mut *state { State::Init => { - *state = State::Waiting(Some(cx.waker().clone())); + *state = State::WaitingForUniqueness(cx.waker().clone()); Poll::Pending } - State::Waiting(Some(waker)) => { + State::WaitingForUniqueness(waker) => { if !waker.will_wake(cx.waker()) { *waker = cx.waker().clone(); } Poll::Pending } - State::Waiting(None) => { - *state = State::Waiting(Some(cx.waker().clone())); - Poll::Pending - } - State::Closing(op) => { - // Nothing to do if the close opeation failed. - let _ = ready!(Pin::new(op).poll(cx)); - *state = State::Closed; - Poll::Ready(()) - } State::Closed => Poll::Ready(()), } }) @@ -145,14 +106,55 @@ impl Inner { } } -impl Drop for Inner { +impl Inner { + async fn async_close_op(&mut self) -> io::Result<()> { + // &mut self implies there are no outstanding operations. + // If state already closed, the user closed multiple times; simply return Ok. + // Otherwise, set state to closed and then submit and await the uring close operation. + { + // Release state guard before await. + let state = RefCell::get_mut(&mut self.state); + + if let State::Closed = *state { + return Ok(()); + } + + *state = State::Closed; + } + Op::close(self.fd)?.await + } +} + +impl Drop for SharedFd { fn drop(&mut self) { - // Submit the close operation, if needed - match RefCell::get_mut(&mut self.state) { - State::Init | State::Waiting(..) => { - self.submit_close_op(); + // If the SharedFd state is Waiting + // The job of the SharedFd's drop is to possibly wake a task that is waiting for the + // reference count to go down. + use std::mem; + + let mut state = self.inner.state.borrow_mut(); + if let State::WaitingForUniqueness(_) = *state { + let state = &mut *state; + if let State::WaitingForUniqueness(waker) = mem::replace(state, State::Init) { + // Wake the task wanting to close this SharedFd and let it try again. If it finds + // there are no more outstanding clones, it will succeed. Otherwise it will start a new + // Future, waiting for another SharedFd to be dropped. + waker.wake() } - _ => {} } } } + +impl Drop for Inner { + fn drop(&mut self) { + // If the inner state isn't `Closed`, the user hasn't called close().await + // so do it synchronously. + + let state = self.state.borrow_mut(); + + if let State::Closed = *state { + return; + } + let _ = unsafe { std::fs::File::from_raw_fd(self.fd) }; + } +} diff --git a/src/runtime/context.rs b/src/runtime/context.rs index 4014d38d..c194c510 100644 --- a/src/runtime/context.rs +++ b/src/runtime/context.rs @@ -33,6 +33,7 @@ impl RuntimeContext { } /// Check if driver is initialized + #[allow(dead_code)] pub(crate) fn is_set(&self) -> bool { self.driver .try_borrow() diff --git a/tests/fs_file.rs b/tests/fs_file.rs index d9cb7f7e..739fea56 100644 --- a/tests/fs_file.rs +++ b/tests/fs_file.rs @@ -342,6 +342,6 @@ fn assert_invalid_fd(fd: RawFd) { match f.read_to_end(&mut buf) { Err(ref e) if e.raw_os_error() == Some(libc::EBADF) => {} - res => panic!("{:?}", res), + res => panic!("assert_invalid_fd finds for fd {:?}, res = {:?}", fd, res), } }