Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent niche-optimization on futures to avoid triggering miri #6744

Merged
merged 5 commits into from
Aug 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions tokio/src/future/maybe_done.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pin_project! {
#[derive(Debug)]
#[project = MaybeDoneProj]
#[project_replace = MaybeDoneProjReplace]
#[repr(C)] // https://github.com/rust-lang/miri/issues/3780
pub enum MaybeDone<Fut: Future> {
/// A not-yet-completed future.
Future { #[pin] future: Fut },
Expand Down Expand Up @@ -69,3 +70,54 @@ impl<Fut: Future> Future for MaybeDone<Fut> {
Poll::Ready(())
}
}

// Test for https://github.com/tokio-rs/tokio/issues/6729
#[cfg(test)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this also have #[cfg(miri)] on it? or do we also want these tests to run outside of miri?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, it doesn't really hurt to run it outside of miri too. 🤷‍♀️

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair enough!

mod miri_tests {
use super::maybe_done;

use std::{
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll, Wake},
};

struct ThingAdder<'a> {
thing: &'a mut String,
}

impl Future for ThingAdder<'_> {
type Output = ();

fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
unsafe {
*self.get_unchecked_mut().thing += ", world";
}
Poll::Pending
}
}

#[test]
fn maybe_done_miri() {
let mut thing = "hello".to_owned();

// The async block is necessary to trigger the miri failure.
#[allow(clippy::redundant_async_block)]
let fut = async move { ThingAdder { thing: &mut thing }.await };

let mut fut = maybe_done(fut);
let mut fut = unsafe { Pin::new_unchecked(&mut fut) };

let waker = Arc::new(DummyWaker).into();
let mut ctx = Context::from_waker(&waker);
assert_eq!(fut.as_mut().poll(&mut ctx), Poll::Pending);
assert_eq!(fut.as_mut().poll(&mut ctx), Poll::Pending);
}

struct DummyWaker;

impl Wake for DummyWaker {
fn wake(self: Arc<Self>) {}
}
}
1 change: 1 addition & 0 deletions tokio/src/runtime/task/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ generate_addr_of_methods! {
}

/// Either the future or the output.
#[repr(C)] // https://github.com/rust-lang/miri/issues/3780
pub(super) enum Stage<T: Future> {
Running(T),
Finished(super::Result<T::Output>),
Expand Down
94 changes: 94 additions & 0 deletions tokio/src/runtime/tests/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,100 @@ fn shutdown_immediately() {
})
}

// Test for https://github.com/tokio-rs/tokio/issues/6729
#[test]
fn spawn_niche_in_task() {
use crate::future::poll_fn;
use std::task::{Context, Poll, Waker};

with(|rt| {
let state = Arc::new(Mutex::new(State::new()));

let mut subscriber = Subscriber::new(Arc::clone(&state), 1);
rt.spawn(async move {
subscriber.wait().await;
subscriber.wait().await;
});

rt.spawn(async move {
state.lock().unwrap().set_version(2);
state.lock().unwrap().set_version(0);
});

rt.tick_max(10);
assert!(rt.is_empty());
rt.shutdown();
});

pub(crate) struct Subscriber {
state: Arc<Mutex<State>>,
observed_version: u64,
waker_key: Option<usize>,
}

impl Subscriber {
pub(crate) fn new(state: Arc<Mutex<State>>, version: u64) -> Self {
Self {
state,
observed_version: version,
waker_key: None,
}
}

pub(crate) async fn wait(&mut self) {
poll_fn(|cx| {
self.state
.lock()
.unwrap()
.poll_update(&mut self.observed_version, &mut self.waker_key, cx)
.map(|_| ())
})
.await;
}
}

struct State {
version: u64,
wakers: Vec<Waker>,
}

impl State {
pub(crate) fn new() -> Self {
Self {
version: 1,
wakers: Vec::new(),
}
}

pub(crate) fn poll_update(
&mut self,
observed_version: &mut u64,
waker_key: &mut Option<usize>,
cx: &Context<'_>,
) -> Poll<Option<()>> {
if self.version == 0 {
*waker_key = None;
Poll::Ready(None)
} else if *observed_version < self.version {
*waker_key = None;
*observed_version = self.version;
Poll::Ready(Some(()))
} else {
self.wakers.push(cx.waker().clone());
*waker_key = Some(self.wakers.len());
Poll::Pending
}
}

pub(crate) fn set_version(&mut self, version: u64) {
self.version = version;
for waker in self.wakers.drain(..) {
waker.wake();
}
}
}
}

#[test]
fn spawn_during_shutdown() {
static DID_SPAWN: AtomicBool = AtomicBool::new(false);
Expand Down
2 changes: 1 addition & 1 deletion tokio/tests/macros_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ fn join_size() {
let ready2 = future::ready(0i32);
tokio::join!(ready1, ready2)
};
assert_eq!(mem::size_of_val(&fut), 40);
assert_eq!(mem::size_of_val(&fut), 48);
}

async fn non_cooperative_task(permits: Arc<Semaphore>) -> usize {
Expand Down
Loading