diff --git a/may_queue/Cargo.toml b/may_queue/Cargo.toml index ee64378e..bebb1fd5 100644 --- a/may_queue/Cargo.toml +++ b/may_queue/Cargo.toml @@ -16,6 +16,9 @@ build = "build.rs" smallvec = "1" crossbeam-utils = "0.8" +[dev-dependencies] +crossbeam-queue = "0.3" + [build-dependencies] rustversion = "1.0" diff --git a/may_queue/src/mpsc.rs b/may_queue/src/mpsc.rs index ef0eb2a8..467c3a77 100644 --- a/may_queue/src/mpsc.rs +++ b/may_queue/src/mpsc.rs @@ -448,6 +448,18 @@ mod test { } } + impl ScBlockPop for crossbeam_queue::SegQueue { + fn block_pop(&self) -> T { + let backoff = Backoff::new(); + loop { + match self.pop() { + Some(v) => return v, + None => backoff.snooze(), + } + } + } + } + #[test] fn queue_sanity() { let q = Queue::::new(); @@ -497,6 +509,27 @@ mod test { }); } + #[bench] + fn multi_crossbeam_1p1c_test(b: &mut Bencher) { + b.iter(|| { + let q = Arc::new(crossbeam_queue::SegQueue::new()); + let total_work: usize = 1_000_000; + // create worker threads that generate mono increasing index + let _q = q.clone(); + // in other thread the value should be still 100 + thread::spawn(move || { + for i in 0..total_work { + _q.push(i); + } + }); + + for i in 0..total_work { + let v = q.block_pop(); + assert_eq!(i, v); + } + }); + } + #[bench] fn multi_2p1c_test(b: &mut Bencher) { b.iter(|| { @@ -529,6 +562,38 @@ mod test { }); } + #[bench] + fn multi_crossbeam_2p1c_test(b: &mut Bencher) { + b.iter(|| { + let q = Arc::new(crossbeam_queue::SegQueue::new()); + let total_work: usize = 1_000_000; + // create worker threads that generate mono increasing index + // in other thread the value should be still 100 + let mut total = 0; + + thread::scope(|s| { + let threads = 20; + for i in 0..threads { + let q = q.clone(); + s.spawn(move || { + let len = total_work / threads; + let start = i * len; + for v in start..start + len { + q.push(v); + } + }); + } + s.spawn(|| { + for _i in 0..total_work { + total += q.block_pop(); + } + }); + }); + assert!(q.is_empty()); + assert_eq!(total, (0..total_work).sum::()); + }); + } + #[bench] fn bulk_pop_1p1c_bench(b: &mut Bencher) { b.iter(|| { diff --git a/may_queue/src/spmc.rs b/may_queue/src/spmc.rs index 542c985c..329a3fd9 100644 --- a/may_queue/src/spmc.rs +++ b/may_queue/src/spmc.rs @@ -615,6 +615,27 @@ mod test { }); } + #[bench] + fn multi_crossbeam_1p1c_test(b: &mut Bencher) { + b.iter(|| { + let q = Arc::new(crossbeam_queue::SegQueue::new()); + let total_work: usize = 1_000_000; + // create worker threads that generate mono increasing index + let _q = q.clone(); + // in other thread the value should be still 100 + thread::spawn(move || { + for i in 0..total_work { + _q.push(i); + } + }); + + for i in 0..total_work { + let v = q.block_pop(); + assert_eq!(i, v); + } + }); + } + #[bench] fn multi_1p2c_test(b: &mut Bencher) { b.iter(|| { @@ -644,6 +665,35 @@ mod test { }); } + #[bench] + fn multi_crossbeam_1p2c_test(b: &mut Bencher) { + b.iter(|| { + let q = Arc::new(crossbeam_queue::SegQueue::new()); + let total_work: usize = 1_000_000; + // create worker threads that generate mono increasing index + // in other thread the value should be still 100 + for i in 0..total_work { + q.push(i); + } + + let sum = AtomicUsize::new(0); + let threads = 20; + thread::scope(|s| { + (0..threads).for_each(|_| { + s.spawn(|| { + let mut total = 0; + for _i in 0..total_work / threads { + total += q.block_pop(); + } + sum.fetch_add(total, Ordering::Relaxed); + }); + }); + }); + assert!(q.is_empty()); + assert_eq!(sum.load(Ordering::Relaxed), (0..total_work).sum()); + }); + } + #[bench] fn bulk_1p2c_test(b: &mut Bencher) { b.iter(|| {