Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Stageless: close the finish channel so executor doesn't deadlock #7448

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 46 additions & 21 deletions crates/bevy_ecs/src/schedule_v3/executor/multi_threaded.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::panic::AssertUnwindSafe;
Copy link
Member

@cart cart Feb 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One alternative that wasn't listed that wouldn't affect (per system) perf: we could add a scope.spawn_last() function that calls await on those tasks only after normal tasks have finished. It would mean adding another ConcurrentQueue to Scope which would increase the size of Scope, adds another bleh transmute into the equation, and would allocate unless we use ConcurrentQueue::bounded(1)

Copy link
Member

@cart cart Feb 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm clearly missing something because this doesn't work as I expected when I implement spawn_last with a ConcurrentQueue and use scope.spawn_last(executor):

let get_results = async {
    let mut results =
        Vec::with_capacity(spawned_ref.len() + spawned_last_ref.len());
    let mut index = 0;
    let mut task_failed = false;
    while let Ok(task) = spawned_ref.pop() {
        if task_failed {
            task.cancel().await;
        } else {
            if let Some(result) = task.await {
                unsafe { *results.get_unchecked_mut(index) = result };
            } else {
                task_failed = true;
            }
        }
        index += 1;
    }
    // this detects _no_ failed tasks when a system panics. index is 0
    // prints "false 0"
    println!("{task_failed} {index}");

    while let Ok(task) = spawned_last_ref.pop() {
        if task_failed {
            task.cancel().await;
        } else {
            if let Some(result) = task.await {
                unsafe { *results.get_unchecked_mut(index) = result };
            } else {
                task_failed = true;
            }
        }
        index += 1;
    }

    // this does not run because we hit the deadlock above for the executor task
    println!("{task_failed} {index}");
    
    if task_failed {
        panic!("At least one task failed");
    }
    unsafe { results.set_len(index) };
    results
};

Copy link
Member

@cart cart Feb 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently this scope isn't registering the system task at all?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I answered my own question on discord:
image

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think spawn_last is an option here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're using the stage less branch, this looks very wrong as the first time you enter get_results the only task might be the executor task. When that task runs it then spawns the system tasks. So get_results needs to be able to handle more tasks spawning while it runs.


use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor};
use bevy_utils::default;
use bevy_utils::syncunsafecell::SyncUnsafeCell;
Expand Down Expand Up @@ -175,11 +177,10 @@ impl SystemExecutor for MultiThreadedExecutor {

if self.num_running_systems > 0 {
// wait for systems to complete
let index = self
.receiver
.recv()
.await
.unwrap_or_else(|error| unreachable!("{}", error));
let index =
self.receiver.recv().await.expect(
"A system has panicked so the executor cannot continue.",
);

self.finish_system_and_signal_dependents(index);

Expand Down Expand Up @@ -429,14 +430,22 @@ impl MultiThreadedExecutor {
let task = async move {
#[cfg(feature = "trace")]
let system_guard = system_span.enter();
// SAFETY: access is compatible
unsafe { system.run_unsafe((), world) };
let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
// SAFETY: access is compatible
unsafe { system.run_unsafe((), world) };
}));
#[cfg(feature = "trace")]
drop(system_guard);
sender
.send(system_index)
.await
.unwrap_or_else(|error| unreachable!("{}", error));
if res.is_err() {
// close the channel to propagate the error to the
// multithreaded executor
sender.close();
} else {
sender
.send(system_index)
.await
.unwrap_or_else(|error| unreachable!("{}", error));
}
};

#[cfg(feature = "trace")]
Expand Down Expand Up @@ -479,13 +488,21 @@ impl MultiThreadedExecutor {
let task = async move {
#[cfg(feature = "trace")]
let system_guard = system_span.enter();
apply_system_buffers(&unapplied_systems, systems, world);
let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
apply_system_buffers(&unapplied_systems, systems, world);
}));
#[cfg(feature = "trace")]
drop(system_guard);
sender
.send(system_index)
.await
.unwrap_or_else(|error| unreachable!("{}", error));
if res.is_err() {
// close the channel to propagate the error to the
// multithreaded executor
sender.close();
} else {
sender
.send(system_index)
.await
.unwrap_or_else(|error| unreachable!("{}", error));
}
};

#[cfg(feature = "trace")]
Expand All @@ -495,13 +512,21 @@ impl MultiThreadedExecutor {
let task = async move {
#[cfg(feature = "trace")]
let system_guard = system_span.enter();
system.run((), world);
let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
system.run((), world);
}));
#[cfg(feature = "trace")]
drop(system_guard);
sender
.send(system_index)
.await
.unwrap_or_else(|error| unreachable!("{}", error));
if res.is_err() {
// close the channel to propagate the error to the
// multithreaded executor
sender.close();
} else {
sender
.send(system_index)
.await
.unwrap_or_else(|error| unreachable!("{}", error));
}
};

#[cfg(feature = "trace")]
Expand Down