Skip to content

Commit

Permalink
Fixing par_read targetting 0.14 branch (#14014)
Browse files Browse the repository at this point in the history
Fix for par_read bugs retargetting 0.14.

Remakes #13836.
  • Loading branch information
BobG1983 authored Jun 25, 2024
1 parent b56a693 commit a41ed78
Showing 1 changed file with 33 additions and 16 deletions.
49 changes: 33 additions & 16 deletions crates/bevy_ecs/src/event.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Event handling types.
use crate as bevy_ecs;
#[cfg(feature = "multi_threaded")]
use crate::batching::BatchingStrategy;
use crate::change_detection::MutUntyped;
use crate::{
Expand Down Expand Up @@ -509,6 +510,7 @@ impl<'w, 's, E: Event> EventReader<'w, 's, E> {
/// assert_eq!(counter.into_inner(), 4950);
/// ```
///
#[cfg(feature = "multi_threaded")]
pub fn par_read(&mut self) -> EventParIter<'_, E> {
self.reader.par_read(&self.events)
}
Expand Down Expand Up @@ -722,6 +724,7 @@ impl<E: Event> ManualEventReader<E> {
}

/// See [`EventReader::par_read`]
#[cfg(feature = "multi_threaded")]
pub fn par_read<'a>(&'a mut self, events: &'a Events<E>) -> EventParIter<'a, E> {
EventParIter::new(self, events)
}
Expand Down Expand Up @@ -890,13 +893,16 @@ impl<'a, E: Event> ExactSizeIterator for EventIteratorWithId<'a, E> {
}

/// A parallel iterator over `Event`s.
#[cfg(feature = "multi_threaded")]
#[derive(Debug)]
pub struct EventParIter<'a, E: Event> {
reader: &'a mut ManualEventReader<E>,
slices: [&'a [EventInstance<E>]; 2],
batching_strategy: BatchingStrategy,
unread: usize,
}

#[cfg(feature = "multi_threaded")]
impl<'a, E: Event> EventParIter<'a, E> {
/// Creates a new parallel iterator over `events` that have not yet been seen by `reader`.
pub fn new(reader: &'a mut ManualEventReader<E>, events: &'a Events<E>) -> Self {
Expand All @@ -918,6 +924,7 @@ impl<'a, E: Event> EventParIter<'a, E> {
reader,
slices: [a, b],
batching_strategy: BatchingStrategy::default(),
unread: unread_count,
}
}

Expand Down Expand Up @@ -953,7 +960,7 @@ impl<'a, E: Event> EventParIter<'a, E> {
/// initialized and run from the ECS scheduler, this should never panic.
///
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
pub fn for_each_with_id<FN: Fn(&'a E, EventId<E>) + Send + Sync + Clone>(self, func: FN) {
pub fn for_each_with_id<FN: Fn(&'a E, EventId<E>) + Send + Sync + Clone>(mut self, func: FN) {
#[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
{
self.into_iter().for_each(|(e, i)| func(e, i));
Expand Down Expand Up @@ -983,6 +990,10 @@ impl<'a, E: Event> EventParIter<'a, E> {
});
}
});

// Events are guaranteed to be read at this point.
self.reader.last_event_count += self.unread;
self.unread = 0;
}
}

Expand All @@ -997,6 +1008,7 @@ impl<'a, E: Event> EventParIter<'a, E> {
}
}

#[cfg(feature = "multi_threaded")]
impl<'a, E: Event> IntoIterator for EventParIter<'a, E> {
type IntoIter = EventIteratorWithId<'a, E>;
type Item = <Self::IntoIter as Iterator>::Item;
Expand Down Expand Up @@ -1572,29 +1584,34 @@ mod tests {
#[cfg(feature = "multi_threaded")]
#[test]
fn test_events_par_iter() {
use std::{collections::HashSet, sync::mpsc};

use crate::prelude::*;
use std::sync::atomic::{AtomicUsize, Ordering};

#[derive(Resource)]
struct Counter(AtomicUsize);

let mut world = World::new();
world.init_resource::<Events<TestEvent>>();
for i in 0..100 {
world.send_event(TestEvent { i });
for _ in 0..100 {
world.send_event(TestEvent { i: 1 });
}

let mut schedule = Schedule::default();
schedule.add_systems(
|mut events: EventReader<TestEvent>, counter: ResMut<Counter>| {
events.par_read().for_each(|event| {
counter.0.fetch_add(event.i, Ordering::Relaxed);
});
},
);
world.insert_resource(Counter(AtomicUsize::new(0)));
schedule.run(&mut world);
let counter = world.remove_resource::<Counter>().unwrap();
assert_eq!(counter.0.into_inner(), 100);

schedule.add_systems(|mut events: EventReader<TestEvent>| {
let (tx, rx) = mpsc::channel();
events.par_read().for_each(|event| {
tx.send(event.i).unwrap();
});
drop(tx);

let observed: HashSet<_> = rx.into_iter().collect();
assert_eq!(observed, HashSet::from_iter(0..100));
});
world.insert_resource(Counter(AtomicUsize::new(0)));
schedule.run(&mut world);
let counter = world.remove_resource::<Counter>().unwrap();
assert_eq!(counter.0.into_inner(), 0);
}

#[test]
Expand Down

0 comments on commit a41ed78

Please sign in to comment.