Skip to content

Commit

Permalink
rt: coop should yield using waker defer strategy (#7185)
Browse files Browse the repository at this point in the history
  • Loading branch information
carllerche authored Mar 4, 2025
1 parent a2b12bd commit 710bc80
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 4 deletions.
10 changes: 9 additions & 1 deletion tokio/src/task/coop/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ cfg_coop! {

Poll::Ready(restore)
} else {
cx.waker().wake_by_ref();
defer(cx);
Poll::Pending
}
}).unwrap_or(Poll::Ready(RestoreOnPending(Cell::new(Budget::unconstrained()))))
Expand All @@ -325,11 +325,19 @@ cfg_coop! {
#[inline(always)]
fn inc_budget_forced_yield_count() {}
}

fn defer(cx: &mut Context<'_>) {
context::defer(cx.waker());
}
}

cfg_not_rt! {
#[inline(always)]
fn inc_budget_forced_yield_count() {}

fn defer(cx: &mut Context<'_>) {
cx.waker().wake_by_ref();
}
}

impl Budget {
Expand Down
34 changes: 31 additions & 3 deletions tokio/tests/rt_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,25 @@ rt_test! {
#[cfg_attr(miri, ignore)] // No `socket` in miri.
fn yield_defers_until_park() {
for _ in 0..10 {
if yield_defers_until_park_inner() {
if yield_defers_until_park_inner(false) {
// test passed
return;
}

// Wait a bit and run the test again.
std::thread::sleep(std::time::Duration::from_secs(2));
}

panic!("yield_defers_until_park is failing consistently");
}

/// Same as above, but with cooperative scheduling.
#[test]
#[cfg(not(target_os="wasi"))]
#[cfg_attr(miri, ignore)] // No `socket` in miri.
fn coop_yield_defers_until_park() {
for _ in 0..10 {
if yield_defers_until_park_inner(true) {
// test passed
return;
}
Expand All @@ -760,10 +778,12 @@ rt_test! {
/// Implementation of `yield_defers_until_park` test. Returns `true` if the
/// test passed.
#[cfg(not(target_os="wasi"))]
fn yield_defers_until_park_inner() -> bool {
fn yield_defers_until_park_inner(use_coop: bool) -> bool {
use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
use std::sync::Barrier;

const BUDGET: usize = 128;

let rt = rt();

let flag = Arc::new(AtomicBool::new(false));
Expand Down Expand Up @@ -802,7 +822,15 @@ rt_test! {
// Yield until connected
let mut cnt = 0;
while !flag_clone.load(SeqCst){
tokio::task::yield_now().await;
if use_coop {
// Consume a good chunk of budget, which should
// force at least one yield.
for _ in 0..BUDGET {
tokio::task::consume_budget().await;
}
} else {
tokio::task::yield_now().await;
}
cnt += 1;

if cnt >= 10 {
Expand Down

0 comments on commit 710bc80

Please sign in to comment.