Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow ?Sized types in some methods and structs #1647

Merged
merged 1 commit into from
Jun 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion futures-util/src/future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ pub trait FutureExt: Future {

/// A convenience for calling `Future::poll` on `Unpin` future types.
fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Self::Output>
where Self: Unpin + Sized
where Self: Unpin
{
Pin::new(self).poll(cx)
}
Expand Down
40 changes: 21 additions & 19 deletions futures-util/src/lock/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ use std::sync::Mutex as StdMutex;
use std::sync::atomic::{AtomicUsize, Ordering};

/// A futures-aware mutex.
pub struct Mutex<T> {
pub struct Mutex<T: ?Sized> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we want to reveal that this is allocated separately-- I had imagined at some point we might want to switch to using e.g. parking_lot's Mutex type which prevents an allocation here. I don't feel super strongly about it, though, since futures-util should be okay to make breaking changes to in the future. Your choice :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cramertj
I'm sorry, I'm not sure what is the problem.

  • In this pr's implementation, if T is !Sized, then Mutex itself becomes !Sized.
    use futures::lock::Mutex;
    use std::sync::Arc;
    fn foo() -> Arc<Foo<dyn Fn()>> {
        let m = Foo::new(|| {});
        Arc::new(m) as Arc<_>
    }
    fn bar() -> Foo<dyn Fn()> { //~ ERROR the size for values of type `(dyn std::ops::Fn() + 'static)` cannot be known at compilation time
        let m = Foo::new(|| {});
        m as _
    }
  • parking_lot's Mutex allows T: ?Sized, and if T is !Sized, the behavior is the same as this pr's implementation: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=a4ecc3eecc9fa5cdce41bebd5f97cf5e.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see-- I was thinking that the Mutex here was still Sized even if T: ?Sized.

state: AtomicUsize,
value: UnsafeCell<T>,
waiters: StdMutex<Slab<Waiter>>,
value: UnsafeCell<T>,
}

impl<T> fmt::Debug for Mutex<T> {
impl<T: ?Sized> fmt::Debug for Mutex<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let state = self.state.load(Ordering::SeqCst);
f.debug_struct("Mutex")
Expand Down Expand Up @@ -65,7 +65,9 @@ impl<T> Mutex<T> {
waiters: StdMutex::new(Slab::new()),
}
}
}

impl<T: ?Sized> Mutex<T> {
/// Attempt to acquire the lock immediately.
///
/// If the lock is currently held, this will return `None`.
Expand Down Expand Up @@ -116,13 +118,13 @@ impl<T> Mutex<T> {
const WAIT_KEY_NONE: usize = usize::MAX;

/// A future which resolves when the target mutex has been successfully acquired.
pub struct MutexLockFuture<'a, T> {
pub struct MutexLockFuture<'a, T: ?Sized> {
// `None` indicates that the mutex was successfully acquired.
mutex: Option<&'a Mutex<T>>,
wait_key: usize,
}

impl<T> fmt::Debug for MutexLockFuture<'_, T> {
impl<T: ?Sized> fmt::Debug for MutexLockFuture<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("MutexLockFuture")
.field("was_acquired", &self.mutex.is_none())
Expand All @@ -138,13 +140,13 @@ impl<T> fmt::Debug for MutexLockFuture<'_, T> {
}
}

impl<T> FusedFuture for MutexLockFuture<'_, T> {
impl<T: ?Sized> FusedFuture for MutexLockFuture<'_, T> {
fn is_terminated(&self) -> bool {
self.mutex.is_none()
}
}

impl<'a, T> Future for MutexLockFuture<'a, T> {
impl<'a, T: ?Sized> Future for MutexLockFuture<'a, T> {
type Output = MutexGuard<'a, T>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Expand Down Expand Up @@ -180,7 +182,7 @@ impl<'a, T> Future for MutexLockFuture<'a, T> {
}
}

impl<T> Drop for MutexLockFuture<'_, T> {
impl<T: ?Sized> Drop for MutexLockFuture<'_, T> {
fn drop(&mut self) {
if let Some(mutex) = self.mutex {
// This future was dropped before it acquired the mutex.
Expand All @@ -195,11 +197,11 @@ impl<T> Drop for MutexLockFuture<'_, T> {
/// An RAII guard returned by the `lock` and `try_lock` methods.
/// When this structure is dropped (falls out of scope), the lock will be
/// unlocked.
pub struct MutexGuard<'a, T> {
pub struct MutexGuard<'a, T: ?Sized> {
mutex: &'a Mutex<T>,
}

impl<T: fmt::Debug> fmt::Debug for MutexGuard<'_, T> {
impl<T: ?Sized + fmt::Debug> fmt::Debug for MutexGuard<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("MutexGuard")
.field("value", &*self)
Expand All @@ -208,7 +210,7 @@ impl<T: fmt::Debug> fmt::Debug for MutexGuard<'_, T> {
}
}

impl<T> Drop for MutexGuard<'_, T> {
impl<T: ?Sized> Drop for MutexGuard<'_, T> {
fn drop(&mut self) {
let old_state = self.mutex.state.fetch_and(!IS_LOCKED, Ordering::AcqRel);
if (old_state & HAS_WAITERS) != 0 {
Expand All @@ -220,31 +222,31 @@ impl<T> Drop for MutexGuard<'_, T> {
}
}

impl<T> Deref for MutexGuard<'_, T> {
impl<T: ?Sized> Deref for MutexGuard<'_, T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.mutex.value.get() }
}
}

impl<T> DerefMut for MutexGuard<'_, T> {
impl<T: ?Sized> DerefMut for MutexGuard<'_, T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.mutex.value.get() }
}
}

// Mutexes can be moved freely between threads and acquired on any thread so long
// as the inner value can be safely sent between threads.
unsafe impl<T: Send> Send for Mutex<T> {}
unsafe impl<T: Send> Sync for Mutex<T> {}
unsafe impl<T: ?Sized + Send> Send for Mutex<T> {}
unsafe impl<T: ?Sized + Send> Sync for Mutex<T> {}

// It's safe to switch which thread the acquire is being attempted on so long as
// `T` can be accessed on that thread.
unsafe impl<T: Send> Send for MutexLockFuture<'_, T> {}
unsafe impl<T: ?Sized + Send> Send for MutexLockFuture<'_, T> {}
// doesn't have any interesting `&self` methods (only Debug)
unsafe impl<T> Sync for MutexLockFuture<'_, T> {}
unsafe impl<T: ?Sized> Sync for MutexLockFuture<'_, T> {}

// Safe to send since we don't track any thread-specific details-- the inner
// lock is essentially spinlock-equivalent (attempt to flip an atomic bool)
unsafe impl<T: Send> Send for MutexGuard<'_, T> {}
unsafe impl<T: Sync> Sync for MutexGuard<'_, T> {}
unsafe impl<T: ?Sized + Send> Send for MutexGuard<'_, T> {}
unsafe impl<T: ?Sized + Sync> Sync for MutexGuard<'_, T> {}
10 changes: 4 additions & 6 deletions futures-util/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ pub trait StreamExt: Stream {
/// # });
/// ```
fn next(&mut self) -> Next<'_, Self>
where Self: Sized + Unpin,
where Self: Unpin,
{
Next::new(self)
}
Expand Down Expand Up @@ -830,9 +830,7 @@ pub trait StreamExt: Stream {
/// assert_eq!(sum, 7);
/// # });
/// ```
fn by_ref(&mut self) -> &mut Self
where Self: Sized
{
fn by_ref(&mut self) -> &mut Self {
self
}

Expand Down Expand Up @@ -1142,7 +1140,7 @@ pub trait StreamExt: Stream {
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>>
where Self: Unpin + Sized
where Self: Unpin
{
Pin::new(self).poll_next(cx)
}
Expand Down Expand Up @@ -1196,7 +1194,7 @@ pub trait StreamExt: Stream {
/// assert_eq!(total, 6);
/// # });
/// ```
fn select_next_some(&mut self) -> SelectNextSome<'_, Self> where Self: Sized + Unpin + FusedStream {
fn select_next_some(&mut self) -> SelectNextSome<'_, Self> where Self: Unpin + FusedStream {
SelectNextSome::new(self)
}
}
10 changes: 5 additions & 5 deletions futures-util/src/stream/next.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,25 @@ use futures_core::task::{Context, Poll};
/// Future for the [`next`](super::StreamExt::next) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Next<'a, St> {
pub struct Next<'a, St: ?Sized> {
stream: &'a mut St,
}

impl<St: Stream + Unpin> Unpin for Next<'_, St> {}
impl<St: ?Sized + Stream + Unpin> Unpin for Next<'_, St> {}

impl<'a, St: Stream + Unpin> Next<'a, St> {
impl<'a, St: ?Sized + Stream + Unpin> Next<'a, St> {
pub(super) fn new(stream: &'a mut St) -> Self {
Next { stream }
}
}

impl<St: FusedStream> FusedFuture for Next<'_, St> {
impl<St: ?Sized + FusedStream> FusedFuture for Next<'_, St> {
fn is_terminated(&self) -> bool {
self.stream.is_terminated()
}
}

impl<St: Stream + Unpin> Future for Next<'_, St> {
impl<St: ?Sized + Stream + Unpin> Future for Next<'_, St> {
type Output = Option<St::Item>;

fn poll(
Expand Down
8 changes: 4 additions & 4 deletions futures-util/src/stream/select_next_some.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,23 @@ use crate::stream::StreamExt;
/// method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct SelectNextSome<'a, St> {
pub struct SelectNextSome<'a, St: ?Sized> {
stream: &'a mut St,
}

impl<'a, St> SelectNextSome<'a, St> {
impl<'a, St: ?Sized> SelectNextSome<'a, St> {
pub(super) fn new(stream: &'a mut St) -> Self {
SelectNextSome { stream }
}
}

impl<St: FusedStream> FusedFuture for SelectNextSome<'_, St> {
impl<St: ?Sized + FusedStream> FusedFuture for SelectNextSome<'_, St> {
fn is_terminated(&self) -> bool {
self.stream.is_terminated()
}
}

impl<St: Stream + FusedStream + Unpin> Future for SelectNextSome<'_, St> {
impl<St: ?Sized + Stream + FusedStream + Unpin> Future for SelectNextSome<'_, St> {
type Output = St::Item;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/try_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ pub trait TryStreamExt: TryStream {
/// # })
/// ```
fn try_next(&mut self) -> TryNext<'_, Self>
where Self: Sized + Unpin,
where Self: Unpin,
{
TryNext::new(self)
}
Expand Down
10 changes: 5 additions & 5 deletions futures-util/src/try_stream/try_next.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,25 @@ use futures_core::task::{Context, Poll};
/// Future for the [`try_next`](super::TryStreamExt::try_next) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct TryNext<'a, St: Unpin> {
pub struct TryNext<'a, St: ?Sized + Unpin> {
stream: &'a mut St,
}

impl<St: Unpin> Unpin for TryNext<'_, St> {}
impl<St: ?Sized + Unpin> Unpin for TryNext<'_, St> {}

impl<'a, St: TryStream + Unpin> TryNext<'a, St> {
impl<'a, St: ?Sized + TryStream + Unpin> TryNext<'a, St> {
pub(super) fn new(stream: &'a mut St) -> Self {
TryNext { stream }
}
}

impl<St: Unpin + FusedStream> FusedFuture for TryNext<'_, St> {
impl<St: ?Sized + Unpin + FusedStream> FusedFuture for TryNext<'_, St> {
fn is_terminated(&self) -> bool {
self.stream.is_terminated()
}
}

impl<St: TryStream + Unpin> Future for TryNext<'_, St> {
impl<St: ?Sized + TryStream + Unpin> Future for TryNext<'_, St> {
type Output = Result<Option<St::Ok>, St::Error>;

fn poll(
Expand Down