diff --git a/src/queue/multilevel.rs b/src/queue/multilevel.rs index 0b0045a..a676149 100644 --- a/src/queue/multilevel.rs +++ b/src/queue/multilevel.rs @@ -405,7 +405,7 @@ impl LevelManager { std::cmp::min(total_tasks / level_0_tasks, LEVEL_MAX_QUEUE_MAX_STEAL_SIZE) }; self.max_level_queue_steal_size - .store(new_steal_count as usize, SeqCst); + .store(new_steal_count, SeqCst); for (i, c) in self.last_exec_tasks_per_level.iter().enumerate() { c.set(cur_total_tasks_per_level[i]); } @@ -457,7 +457,7 @@ pub(crate) struct ElapsedTime(AtomicU64); impl ElapsedTime { pub(crate) fn as_duration(&self) -> Duration { - Duration::from_micros(self.0.load(Relaxed) as u64) + Duration::from_micros(self.0.load(Relaxed)) } pub(super) fn inc_by(&self, t: Duration) { diff --git a/src/queue/priority.rs b/src/queue/priority.rs index a3b02c9..c4cab61 100644 --- a/src/queue/priority.rs +++ b/src/queue/priority.rs @@ -9,8 +9,9 @@ //! information. use std::{ + cell::RefCell, sync::{ - atomic::{AtomicPtr, AtomicU64, Ordering}, + atomic::{AtomicU64, Ordering}, Arc, }, time::{Duration, Instant}, @@ -156,32 +157,25 @@ impl QueueCore { } } -/// A holder to store task. We wrap the value in an atomic ptr because the return value of pop() -/// only provide readonly reference to this value, though in our can it's safe to just take it. +/// A holder to store task. Wrap the task in a RefCell becuase crossbeam-skip only provide +/// readonly acess to a popped Entry. struct Slot { - ptr: AtomicPtr, + //ptr: AtomicPtr, + value: RefCell>, } +// It is safe here because the value is only visited by the thread which calls `pop()`. +unsafe impl Sync for Slot {} + impl Slot { fn new(value: T) -> Self { Self { - ptr: AtomicPtr::new(Box::into_raw(Box::new(value))), + value: RefCell::new(Some(value)), } } fn take(&self) -> Option { - let raw_ptr = self.ptr.swap(std::ptr::null_mut(), Ordering::SeqCst); - if !raw_ptr.is_null() { - unsafe { Some(*Box::from_raw(raw_ptr)) } - } else { - None - } - } -} - -impl Drop for Slot { - fn drop(&mut self) { - self.take(); + self.value.take() } } @@ -350,7 +344,7 @@ impl Builder { .collect(); let injector = TaskInjector { - queue: queue, + queue, task_manager: self.manager, };