Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve safety for the multi-threaded executor using UnsafeWorldCell #8292

Merged
merged 33 commits into from
May 29, 2023
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
6816767
use `UnsafeWorldCell` for system impls
JoJoJet Apr 2, 2023
6c25b97
add `ReadOnlySystem::run_read_only`
JoJoJet Apr 2, 2023
ea4f902
validate world in `run_read_only`
JoJoJet Apr 2, 2023
87f267f
use `UnsafeWorldCell` in the system executor
JoJoJet Apr 2, 2023
f7ff99d
use elided lifetimes in the system executor
JoJoJet Apr 2, 2023
bf82d9d
remove an unnecessary unsafe block
JoJoJet Apr 2, 2023
6f73a7d
improve docs for `run_read_only`
JoJoJet Apr 2, 2023
f026dd2
modify safety invariants for `System::run_unsafe`
JoJoJet Apr 2, 2023
46a44a5
tweak some phrasing
JoJoJet Apr 2, 2023
9b30215
simplify a condition-folding invariant
JoJoJet Apr 2, 2023
df64a3c
improve safety invariants in the executor
JoJoJet Apr 2, 2023
d8e70b4
use a more appropriate world access function
JoJoJet Apr 2, 2023
6e036cd
return `Tick` from `UnsafeWorldCell::increment_change_tick
JoJoJet Apr 2, 2023
0fb60ca
fix lifetimes
JoJoJet Apr 2, 2023
21066eb
improve docs for `System::run`
JoJoJet Apr 3, 2023
a1d47d3
make condition evaluation function unsafe
JoJoJet Apr 3, 2023
c621dc4
Merge remote-tracking branch 'upstream/main' into system-unsafe-world…
JoJoJet Apr 3, 2023
106ae64
Merge remote-tracking branch 'upstream/main' into system-unsafe-world…
JoJoJet Apr 4, 2023
0cd81e8
add `UnsafeWorldCell::id`
JoJoJet Apr 4, 2023
2da604f
use `UnsafeWorldCell` for `update_archetype_component_access`
JoJoJet Apr 4, 2023
3eedf5c
update callsites in doctests
JoJoJet Apr 4, 2023
9b934c0
split an unsafe block
JoJoJet Apr 4, 2023
432d6d7
update some benchmarks
JoJoJet Apr 4, 2023
07f9302
tweak a safety comment
JoJoJet Apr 4, 2023
c32b69b
Merge remote-tracking branch 'upstream/main' into system-unsafe-world…
JoJoJet Apr 12, 2023
59dd4b7
simplify `impl SystemParam for WorldId`
JoJoJet Apr 13, 2023
a3d537d
Merge remote-tracking branch 'upstream/main' into system-unsafe-world…
JoJoJet Apr 17, 2023
840dd95
fix formatting for a comment
JoJoJet Apr 17, 2023
02334b5
remove an apostrophe
JoJoJet Apr 19, 2023
2615867
rephrase a safety comment
JoJoJet Apr 19, 2023
702f09f
rephrase another safety comment
JoJoJet Apr 19, 2023
5e11ad5
Apply suggestions from code review
JoJoJet May 9, 2023
4e6630d
Update crates/bevy_ecs/src/schedule/executor/multi_threaded.rs
JoJoJet May 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benches/benches/bevy_ecs/iteration/heavy_compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub fn heavy_compute(c: &mut Criterion) {

let mut system = IntoSystem::into_system(sys);
system.initialize(&mut world);
system.update_archetype_component_access(&world);
system.update_archetype_component_access(world.as_unsafe_world_cell());

b.iter(move || system.run((), &mut world));
});
Expand Down
2 changes: 1 addition & 1 deletion benches/benches/bevy_ecs/iteration/iter_simple_system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl Benchmark {

let mut system = IntoSystem::into_system(query_system);
system.initialize(&mut world);
system.update_archetype_component_access(&world);
system.update_archetype_component_access(world.as_unsafe_world_cell());
Self(world, Box::new(system))
}

Expand Down
2 changes: 1 addition & 1 deletion benches/benches/bevy_ecs/world/world_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ pub fn query_get_component_simple(criterion: &mut Criterion) {

let mut system = IntoSystem::into_system(query_system);
system.initialize(&mut world);
system.update_archetype_component_access(&world);
system.update_archetype_component_access(world.as_unsafe_world_cell());

bencher.iter(|| system.run(entity, &mut world));
});
Expand Down
5 changes: 3 additions & 2 deletions crates/bevy_ecs/src/schedule/condition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::ops::Not;
use crate::component::{self, ComponentId};
use crate::query::Access;
use crate::system::{CombinatorSystem, Combine, IntoSystem, ReadOnlySystem, System};
use crate::world::unsafe_world_cell::UnsafeWorldCell;
use crate::world::World;

pub type BoxedCondition = Box<dyn ReadOnlySystem<In = (), Out = bool>>;
Expand Down Expand Up @@ -978,7 +979,7 @@ where
self.condition.is_exclusive()
}

unsafe fn run_unsafe(&mut self, input: Self::In, world: &World) -> Self::Out {
unsafe fn run_unsafe(&mut self, input: Self::In, world: UnsafeWorldCell) -> Self::Out {
// SAFETY: The inner condition system asserts its own safety.
!self.condition.run_unsafe(input, world)
}
Expand All @@ -995,7 +996,7 @@ where
self.condition.initialize(world);
}

fn update_archetype_component_access(&mut self, world: &World) {
fn update_archetype_component_access(&mut self, world: UnsafeWorldCell) {
self.condition.update_archetype_component_access(world);
}

Expand Down
89 changes: 55 additions & 34 deletions crates/bevy_ecs/src/schedule/executor/multi_threaded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
is_apply_system_buffers, BoxedCondition, ExecutorKind, SystemExecutor, SystemSchedule,
},
system::BoxedSystem,
world::World,
world::{unsafe_world_cell::UnsafeWorldCell, World},
};

use crate as bevy_ecs;
Expand Down Expand Up @@ -184,7 +184,6 @@ impl SystemExecutor for MultiThreadedExecutor {
.map(|e| e.0.clone());
let thread_executor = thread_executor.as_deref();

let world = SyncUnsafeCell::from_mut(world);
let SyncUnsafeSchedule {
systems,
mut conditions,
Expand All @@ -197,10 +196,13 @@ impl SystemExecutor for MultiThreadedExecutor {
// the executor itself is a `Send` future so that it can run
// alongside systems that claim the local thread
let executor = async {
let world_cell = world.as_unsafe_world_cell();
while self.num_completed_systems < self.num_systems {
// SAFETY: self.ready_systems does not contain running systems
// SAFETY:
// - self.ready_systems does not contain running systems.
// - `world_cell` has mutable access to the entire world.
unsafe {
self.spawn_system_tasks(scope, systems, &mut conditions, world);
self.spawn_system_tasks(scope, systems, &mut conditions, world_cell);
}

if self.num_running_systems > 0 {
Expand Down Expand Up @@ -231,7 +233,7 @@ impl SystemExecutor for MultiThreadedExecutor {
if self.apply_final_buffers {
// Do one final apply buffers after all systems have completed
// Commands should be applied while on the scope's thread, not the executor's thread
let res = apply_system_buffers(&self.unapplied_systems, systems, world.get_mut());
let res = apply_system_buffers(&self.unapplied_systems, systems, world);
if let Err(payload) = res {
let mut panic_payload = self.panic_payload.lock().unwrap();
*panic_payload = Some(payload);
Expand Down Expand Up @@ -283,14 +285,16 @@ impl MultiThreadedExecutor {
}

/// # Safety
/// Caller must ensure that `self.ready_systems` does not contain any systems that
/// have been mutably borrowed (such as the systems currently running).
/// - Caller must ensure that `self.ready_systems` does not contain any systems that
/// have been mutably borrowed (such as the systems currently running).
/// - `world_cell` must have permission to access all world data (not counting
/// any world data that is claimed by systems currently running on this executor).
unsafe fn spawn_system_tasks<'scope>(
&mut self,
scope: &Scope<'_, 'scope, ()>,
systems: &'scope [SyncUnsafeCell<BoxedSystem>],
conditions: &mut Conditions,
cell: &'scope SyncUnsafeCell<World>,
world_cell: UnsafeWorldCell<'scope>,
) {
if self.exclusive_running {
return;
Expand All @@ -307,10 +311,7 @@ impl MultiThreadedExecutor {
// Therefore, no other reference to this system exists and there is no aliasing.
let system = unsafe { &mut *systems[system_index].get() };

// SAFETY: No exclusive system is running.
// Therefore, there is no existing mutable reference to the world.
let world = unsafe { &*cell.get() };
if !self.can_run(system_index, system, conditions, world) {
if !self.can_run(system_index, system, conditions, world_cell) {
// NOTE: exclusive systems with ambiguities are susceptible to
// being significantly displaced here (compared to single-threaded order)
// if systems after them in topological order can run
Expand All @@ -320,9 +321,10 @@ impl MultiThreadedExecutor {

self.ready_systems.set(system_index, false);

// SAFETY: Since `self.can_run` returned true earlier, it must have called
// `update_archetype_component_access` for each run condition.
if !self.should_run(system_index, system, conditions, world) {
// SAFETY: `can_run` returned true, which means that:
// - It must have called `update_archetype_component_access` for each run condition.
// - There can be no systems running whose accesses would conflict with any conditions.
if !self.should_run(system_index, system, conditions, world_cell) {
self.skip_system_and_signal_dependents(system_index);
continue;
}
Expand All @@ -331,20 +333,23 @@ impl MultiThreadedExecutor {
self.num_running_systems += 1;

if self.system_task_metadata[system_index].is_exclusive {
// SAFETY: `can_run` confirmed that no systems are running.
// Therefore, there is no existing reference to the world.
// SAFETY: `can_run` returned true for this system, which means
// that no other systems currently have access to the world.
let world = unsafe { world_cell.world_mut() };
// SAFETY: `can_run` returned true for this system,
// which means no systems are currently borrowed.
unsafe {
let world = &mut *cell.get();
self.spawn_exclusive_system_task(scope, system_index, systems, world);
}
break;
}

// SAFETY:
// - No other reference to this system exists.
// - `self.can_run` has been called, which calls `update_archetype_component_access` with this system.
// - `can_run` has been called, which calls `update_archetype_component_access` with this system.
// - `can_run` returned true, so no systems with conflicing world access are running.
JoJoJet marked this conversation as resolved.
Show resolved Hide resolved
unsafe {
self.spawn_system_task(scope, system_index, systems, world);
self.spawn_system_task(scope, system_index, systems, world_cell);
}
}

Expand All @@ -357,7 +362,7 @@ impl MultiThreadedExecutor {
system_index: usize,
system: &mut BoxedSystem,
conditions: &mut Conditions,
world: &World,
world: UnsafeWorldCell,
) -> bool {
let system_meta = &self.system_task_metadata[system_index];
if system_meta.is_exclusive && self.num_running_systems > 0 {
Expand Down Expand Up @@ -413,15 +418,17 @@ impl MultiThreadedExecutor {
}

/// # Safety
///
/// `update_archetype_component` must have been called with `world`
/// for each run condition in `conditions`.
/// * `world` must have permission to read any world data required by
/// the system's conditions: this includes conditions for the system
/// itself, and conditions for any of the system's sets.
/// * `update_archetype_component` must have been called with `world`
/// for each run condition in `conditions`.
unsafe fn should_run(
&mut self,
system_index: usize,
_system: &BoxedSystem,
conditions: &mut Conditions,
world: &World,
world: UnsafeWorldCell,
) -> bool {
let mut should_run = !self.skipped_systems.contains(system_index);
for set_idx in conditions.sets_with_conditions_of_systems[system_index].ones() {
Expand All @@ -430,7 +437,10 @@ impl MultiThreadedExecutor {
}

// Evaluate the system set's conditions.
// SAFETY: `update_archetype_component_access` has been called for each run condition.
// SAFETY:
// - The caller ensures that `world` has permission to read any data
// required by the conditions.
// - `update_archetype_component_access` has been called for each run condition.
let set_conditions_met =
evaluate_and_fold_conditions(&mut conditions.set_conditions[set_idx], world);

Expand All @@ -444,7 +454,10 @@ impl MultiThreadedExecutor {
}

// Evaluate the system's conditions.
// SAFETY: `update_archetype_component_access` has been called for each run condition.
// SAFETY:
// - The caller ensures that `world` has permission to read any data
// required by the conditions.
// - `update_archetype_component_access` has been called for each run condition.
let system_conditions_met =
evaluate_and_fold_conditions(&mut conditions.system_conditions[system_index], world);

Expand All @@ -459,14 +472,16 @@ impl MultiThreadedExecutor {

/// # Safety
/// - Caller must not alias systems that are running.
/// - `world` must have permission to access the world data
/// used by the specified system.
/// - `update_archetype_component_access` must have been called with `world`
/// on the system assocaited with `system_index`.
unsafe fn spawn_system_task<'scope>(
&mut self,
scope: &Scope<'_, 'scope, ()>,
system_index: usize,
systems: &'scope [SyncUnsafeCell<BoxedSystem>],
world: &'scope World,
world: UnsafeWorldCell<'scope>,
) {
// SAFETY: this system is not running, no other reference exists
let system = unsafe { &mut *systems[system_index].get() };
Expand All @@ -483,7 +498,8 @@ impl MultiThreadedExecutor {
let system_guard = system_span.enter();
let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
// SAFETY:
// - Access: TODO.
// - The caller ensures that we have permission to
// access the world data used by the system.
// - `update_archetype_component_access` has been called.
unsafe { system.run_unsafe((), world) };
}));
Expand Down Expand Up @@ -688,18 +704,23 @@ fn apply_system_buffers(
}

/// # Safety
///
/// `update_archetype_component_access` must have been called
/// with `world` for each condition in `conditions`.
unsafe fn evaluate_and_fold_conditions(conditions: &mut [BoxedCondition], world: &World) -> bool {
/// - `world` must have permission to read any world data
/// required by `conditions`.
/// - `update_archetype_component_access` must have been called
/// with `world` for each condition in `conditions`.
unsafe fn evaluate_and_fold_conditions(
conditions: &mut [BoxedCondition],
world: UnsafeWorldCell,
) -> bool {
// not short-circuiting is intentional
#[allow(clippy::unnecessary_fold)]
conditions
.iter_mut()
.map(|condition| {
#[cfg(feature = "trace")]
let _condition_span = info_span!("condition", name = &*condition.name()).entered();
// SAFETY: caller ensures system access is compatible
// SAFETY: The caller ensures that `world` has permission to
// access any data required by the condition.
unsafe { condition.run_unsafe((), world) }
})
.fold(true, |acc, res| acc && res)
Expand Down
5 changes: 3 additions & 2 deletions crates/bevy_ecs/src/system/combinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
component::{ComponentId, Tick},
prelude::World,
query::Access,
world::unsafe_world_cell::UnsafeWorldCell,
};

use super::{ReadOnlySystem, System};
Expand Down Expand Up @@ -157,7 +158,7 @@ where
self.a.is_exclusive() || self.b.is_exclusive()
}

unsafe fn run_unsafe(&mut self, input: Self::In, world: &World) -> Self::Out {
unsafe fn run_unsafe(&mut self, input: Self::In, world: UnsafeWorldCell) -> Self::Out {
Func::combine(
input,
// SAFETY: The world accesses for both underlying systems have been registered,
Expand Down Expand Up @@ -198,7 +199,7 @@ where
self.component_access.extend(self.b.component_access());
}

fn update_archetype_component_access(&mut self, world: &World) {
fn update_archetype_component_access(&mut self, world: UnsafeWorldCell) {
self.a.update_archetype_component_access(world);
self.b.update_archetype_component_access(world);

Expand Down
6 changes: 3 additions & 3 deletions crates/bevy_ecs/src/system/exclusive_function_system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
check_system_change_tick, ExclusiveSystemParam, ExclusiveSystemParamItem, In, IntoSystem,
System, SystemMeta,
},
world::World,
world::{unsafe_world_cell::UnsafeWorldCell, World},
};

use bevy_utils::all_tuples;
Expand Down Expand Up @@ -86,7 +86,7 @@ where
}

#[inline]
unsafe fn run_unsafe(&mut self, _input: Self::In, _world: &World) -> Self::Out {
unsafe fn run_unsafe(&mut self, _input: Self::In, _world: UnsafeWorldCell) -> Self::Out {
panic!("Cannot run exclusive systems with a shared World reference");
}

Expand Down Expand Up @@ -134,7 +134,7 @@ where
self.param_state = Some(F::Param::init(world, &mut self.system_meta));
}

fn update_archetype_component_access(&mut self, _world: &World) {}
fn update_archetype_component_access(&mut self, _world: UnsafeWorldCell) {}

#[inline]
fn check_change_tick(&mut self, change_tick: Tick) {
Expand Down
8 changes: 4 additions & 4 deletions crates/bevy_ecs/src/system/function_system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
prelude::FromWorld,
query::{Access, FilteredAccessSet},
system::{check_system_change_tick, ReadOnlySystemParam, System, SystemParam, SystemParamItem},
world::{World, WorldId},
world::{unsafe_world_cell::UnsafeWorldCell, World, WorldId},
};

use bevy_utils::all_tuples;
Expand Down Expand Up @@ -476,7 +476,7 @@ where
}

#[inline]
unsafe fn run_unsafe(&mut self, input: Self::In, world: &World) -> Self::Out {
unsafe fn run_unsafe(&mut self, input: Self::In, world: UnsafeWorldCell) -> Self::Out {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really suprised we can elide the lifetime here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My preference is to always write UnsafeWorldCell<'_> to make it obvious this is happening.

let change_tick = world.increment_change_tick();

// SAFETY:
Expand All @@ -487,7 +487,7 @@ where
let params = F::Param::get_param(
self.param_state.as_mut().expect(Self::PARAM_MESSAGE),
&self.system_meta,
world.as_unsafe_world_cell_migration_internal(),
world,
change_tick,
);
let out = self.func.run(input, params);
Expand Down Expand Up @@ -516,7 +516,7 @@ where
self.param_state = Some(F::Param::init_state(world, &mut self.system_meta));
}

fn update_archetype_component_access(&mut self, world: &World) {
fn update_archetype_component_access(&mut self, world: UnsafeWorldCell) {
assert!(self.world_id == Some(world.id()), "Encountered a mismatched World. A System cannot be used with Worlds other than the one it was initialized with.");
let archetypes = world.archetypes();
let new_generation = archetypes.generation();
Expand Down
6 changes: 3 additions & 3 deletions crates/bevy_ecs/src/system/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1323,7 +1323,7 @@ mod tests {

// set up system and verify its access is empty
system.initialize(&mut world);
system.update_archetype_component_access(&world);
system.update_archetype_component_access(world.as_unsafe_world_cell());
assert_eq!(
system
.archetype_component_access()
Expand Down Expand Up @@ -1353,7 +1353,7 @@ mod tests {
world.spawn((B, C));

// update system and verify its accesses are correct
system.update_archetype_component_access(&world);
system.update_archetype_component_access(world.as_unsafe_world_cell());
assert_eq!(
system
.archetype_component_access()
Expand All @@ -1371,7 +1371,7 @@ mod tests {
.unwrap(),
);
world.spawn((A, B, D));
system.update_archetype_component_access(&world);
system.update_archetype_component_access(world.as_unsafe_world_cell());
assert_eq!(
system
.archetype_component_access()
Expand Down
Loading