Skip to content

Commit

Permalink
Revert "Stageless: close the finish channel so executor doesn't deadl…
Browse files Browse the repository at this point in the history
…ock (bevyengine#7448)"

This reverts commit ff7d5ff.
  • Loading branch information
maniwani committed Feb 6, 2023
1 parent 2e20d04 commit 466d133
Showing 1 changed file with 21 additions and 46 deletions.
67 changes: 21 additions & 46 deletions crates/bevy_ecs/src/schedule/executor/multi_threaded.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::sync::Arc;

use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor};
use bevy_utils::default;
use bevy_utils::syncunsafecell::SyncUnsafeCell;
Expand Down Expand Up @@ -186,10 +184,11 @@ impl SystemExecutor for MultiThreadedExecutor {

if self.num_running_systems > 0 {
// wait for systems to complete
let index =
self.receiver.recv().await.expect(
"A system has panicked so the executor cannot continue.",
);
let index = self
.receiver
.recv()
.await
.unwrap_or_else(|error| unreachable!("{}", error));

self.finish_system_and_signal_dependents(index);

Expand Down Expand Up @@ -439,22 +438,14 @@ impl MultiThreadedExecutor {
let task = async move {
#[cfg(feature = "trace")]
let system_guard = system_span.enter();
let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
// SAFETY: access is compatible
unsafe { system.run_unsafe((), world) };
}));
// SAFETY: access is compatible
unsafe { system.run_unsafe((), world) };
#[cfg(feature = "trace")]
drop(system_guard);
if res.is_err() {
// close the channel to propagate the error to the
// multithreaded executor
sender.close();
} else {
sender
.send(system_index)
.await
.unwrap_or_else(|error| unreachable!("{}", error));
}
sender
.send(system_index)
.await
.unwrap_or_else(|error| unreachable!("{}", error));
};

#[cfg(feature = "trace")]
Expand Down Expand Up @@ -497,21 +488,13 @@ impl MultiThreadedExecutor {
let task = async move {
#[cfg(feature = "trace")]
let system_guard = system_span.enter();
let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
apply_system_buffers(&unapplied_systems, systems, world);
}));
apply_system_buffers(&unapplied_systems, systems, world);
#[cfg(feature = "trace")]
drop(system_guard);
if res.is_err() {
// close the channel to propagate the error to the
// multithreaded executor
sender.close();
} else {
sender
.send(system_index)
.await
.unwrap_or_else(|error| unreachable!("{}", error));
}
sender
.send(system_index)
.await
.unwrap_or_else(|error| unreachable!("{}", error));
};

#[cfg(feature = "trace")]
Expand All @@ -521,21 +504,13 @@ impl MultiThreadedExecutor {
let task = async move {
#[cfg(feature = "trace")]
let system_guard = system_span.enter();
let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
system.run((), world);
}));
system.run((), world);
#[cfg(feature = "trace")]
drop(system_guard);
if res.is_err() {
// close the channel to propagate the error to the
// multithreaded executor
sender.close();
} else {
sender
.send(system_index)
.await
.unwrap_or_else(|error| unreachable!("{}", error));
}
sender
.send(system_index)
.await
.unwrap_or_else(|error| unreachable!("{}", error));
};

#[cfg(feature = "trace")]
Expand Down

0 comments on commit 466d133

Please sign in to comment.