From 621a06118ce185976db2a91b97fd03b0f6393e4a Mon Sep 17 00:00:00 2001 From: cssivision Date: Sun, 27 Dec 2020 15:34:47 +0800 Subject: [PATCH] chore: Replace deprecated compare_and_swap with compare_exchange --- tokio/src/io/split.rs | 6 ++++- tokio/src/runtime/queue.rs | 5 ++-- tokio/src/sync/mpsc/block.rs | 37 +++++++++++++---------------- tokio/src/sync/mpsc/list.rs | 17 +++++++------ tokio/src/sync/task/atomic_waker.rs | 11 +++++---- 5 files changed, 42 insertions(+), 34 deletions(-) diff --git a/tokio/src/io/split.rs b/tokio/src/io/split.rs index fd3273ee28a..732eb3b3aa0 100644 --- a/tokio/src/io/split.rs +++ b/tokio/src/io/split.rs @@ -131,7 +131,11 @@ impl AsyncWrite for WriteHalf { impl Inner { fn poll_lock(&self, cx: &mut Context<'_>) -> Poll> { - if !self.locked.compare_and_swap(false, true, Acquire) { + if self + .locked + .compare_exchange(false, true, Acquire, Acquire) + .is_ok() + { Poll::Ready(Guard { inner: self }) } else { // Spin... but investigate a better strategy diff --git a/tokio/src/runtime/queue.rs b/tokio/src/runtime/queue.rs index cdf4009c08b..e3504d6d7c4 100644 --- a/tokio/src/runtime/queue.rs +++ b/tokio/src/runtime/queue.rs @@ -194,13 +194,14 @@ impl Local { // work. This is because all tasks are pushed into the queue from the // current thread (or memory has been acquired if the local queue handle // moved). - let actual = self.inner.head.compare_and_swap( + let actual = self.inner.head.compare_exchange( prev, pack(head.wrapping_add(n), head.wrapping_add(n)), Release, + Release, ); - if actual != prev { + if actual.is_err() { // We failed to claim the tasks, losing the race. Return out of // this function and try the full `push` routine again. The queue // may not be full anymore. diff --git a/tokio/src/sync/mpsc/block.rs b/tokio/src/sync/mpsc/block.rs index e062f2b7303..68337405a10 100644 --- a/tokio/src/sync/mpsc/block.rs +++ b/tokio/src/sync/mpsc/block.rs @@ -258,17 +258,16 @@ impl Block { pub(crate) unsafe fn try_push( &self, block: &mut NonNull>, - ordering: Ordering, ) -> Result<(), NonNull>> { block.as_mut().start_index = self.start_index.wrapping_add(BLOCK_CAP); let next_ptr = self .next - .compare_and_swap(ptr::null_mut(), block.as_ptr(), ordering); + .compare_exchange(ptr::null_mut(), block.as_ptr(), AcqRel, Acquire); - match NonNull::new(next_ptr) { - Some(next_ptr) => Err(next_ptr), - None => Ok(()), + match next_ptr { + Ok(_) => Ok(()), + Err(v) => Err(NonNull::new_unchecked(v)), } } @@ -306,20 +305,18 @@ impl Block { // // `Release` ensures that the newly allocated block is available to // other threads acquiring the next pointer. - let next = NonNull::new(self.next.compare_and_swap( - ptr::null_mut(), - new_block.as_ptr(), - AcqRel, - )); - - let next = match next { - Some(next) => next, - None => { - // The compare-and-swap succeeded and the newly allocated block - // is successfully pushed. - return new_block; - } - }; + let next = + match self + .next + .compare_exchange(ptr::null_mut(), new_block.as_ptr(), AcqRel, Acquire) + { + Ok(_) => { + // The compare-and-swap succeeded and the newly allocated block + // is successfully pushed. + return new_block; + } + Err(v) => unsafe { NonNull::new_unchecked(v) }, + }; // There already is a next block in the linked list. The newly allocated // block could be dropped and the discovered next block returned; @@ -333,7 +330,7 @@ impl Block { // TODO: Should this iteration be capped? loop { - let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel) }; + let actual = unsafe { curr.as_ref().try_push(&mut new_block) }; curr = match actual { Ok(_) => { diff --git a/tokio/src/sync/mpsc/list.rs b/tokio/src/sync/mpsc/list.rs index 2f4c532a7d7..f5a51e3f816 100644 --- a/tokio/src/sync/mpsc/list.rs +++ b/tokio/src/sync/mpsc/list.rs @@ -6,7 +6,7 @@ use crate::sync::mpsc::block::{self, Block}; use std::fmt; use std::ptr::NonNull; -use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; +use std::sync::atomic::Ordering::{Acquire, Relaxed, Release}; /// List queue transmit handle pub(crate) struct Tx { @@ -140,11 +140,14 @@ impl Tx { // // Acquire is not needed as any "actual" value is not accessed. // At this point, the linked list is walked to acquire blocks. - let actual = - self.block_tail - .compare_and_swap(block_ptr, next_block.as_ptr(), Release); - - if actual == block_ptr { + let actual = self.block_tail.compare_exchange( + block_ptr, + next_block.as_ptr(), + Release, + Release, + ); + + if actual.is_ok() { // Synchronize with any senders let tail_position = self.tail_position.fetch_add(0, Release); @@ -191,7 +194,7 @@ impl Tx { // TODO: Unify this logic with Block::grow for _ in 0..3 { - match curr.as_ref().try_push(&mut block, AcqRel) { + match curr.as_ref().try_push(&mut block) { Ok(_) => { reused = true; break; diff --git a/tokio/src/sync/task/atomic_waker.rs b/tokio/src/sync/task/atomic_waker.rs index ae4cac7c247..14ee44e1193 100644 --- a/tokio/src/sync/task/atomic_waker.rs +++ b/tokio/src/sync/task/atomic_waker.rs @@ -171,8 +171,11 @@ impl AtomicWaker { where W: WakerRef, { - match self.state.compare_and_swap(WAITING, REGISTERING, Acquire) { - WAITING => { + match self + .state + .compare_exchange(WAITING, REGISTERING, Acquire, Acquire) + { + Ok(WAITING) => { unsafe { // Locked acquired, update the waker cell self.waker.with_mut(|t| *t = Some(waker.into_waker())); @@ -212,7 +215,7 @@ impl AtomicWaker { } } } - WAKING => { + Err(WAKING) => { // Currently in the process of waking the task, i.e., // `wake` is currently being called on the old waker. // So, we call wake on the new waker. @@ -221,7 +224,7 @@ impl AtomicWaker { // This is equivalent to a spin lock, so use a spin hint. atomic::spin_loop_hint(); } - state => { + Err(state) => { // In this case, a concurrent thread is holding the // "registering" lock. This probably indicates a bug in the // caller's code as racing to call `register` doesn't make much