Skip to content

Strange atomic behavior (aarch64-apple-darwin) #146719

@frostyplanet

Description

@frostyplanet

I was analyzing a suspected deadlock problem in my async channel library, with tokio runtime on Arm aach64 platform.
I opened an issue to tokio previously tokio-rs/tokio#7589 , and an PR tokio-rs/tokio#7622 to fix behavior on Miri,
but the exact problem has not been concluded on Arm, it looks like there's still problem not pinpoint.

Found unexpected behavior in the test log, some waker still not waked.

The Code
The related code is as follows:
https://github.com/frostyplanet/crossfire-rs/blob/62fda2bf5238016b3b49aa7187fc8fc4637405e4/src/waker_registry.rs#L344

  struct RegistryMultiInner<P> {
    queue: VecDeque<Weak<WakerInner<P>>>,
}

pub struct RegistryMulti<P> {
    is_empty: AtomicBool,
    inner: Mutex<RegistryMultiInner<P>>,  // it's protected with parking_lot::Mutex
    seq: AtomicUsize,
}
impl<P> RegistryMulti<P> {
    #[inline(always)]
    fn reg_waker(&self, waker: &ChannelWaker<P>) {
        let weak = waker.weak();
        let mut guard = self.inner.lock();
        let seq = self.seq.fetch_add(1, Ordering::Release);
        waker.set_seq(seq);
        if guard.queue.is_empty() {
            self.is_empty.store(false, Ordering::SeqCst);
        }
        guard.queue.push_back(weak);
    }

    #[inline(always)]
    fn pop(&self) -> Option<ChannelWaker<P>> {
        if self.is_empty.load(Ordering::SeqCst) {
            return None;
        }
        let mut guard = self.inner.lock();
        let mut waker = None;
        loop {
            if let Some(weak) = guard.queue.pop_front() {
                if let Some(inner) = weak.upgrade() {
                    waker = Some(ChannelWaker::from_arc(inner));
                    break;
                }
            } else {
                break;
            }
        }
        if guard.queue.is_empty() {
            self.is_empty.store(true, Ordering::SeqCst);
        }
        return waker;
    }
    #[inline(always)]
    fn fire<F>(&self, handle: F, _tag: &str) -> WakeResult
    where
        F: Fn(&ChannelWaker<P>) -> WakeResult,
    {
        if let Some(waker) = self.pop() {
            let r = handle(&waker);
            trace_log!("wake {} {:?} {:?}", _tag, waker, r);
            if r.is_done() {
                return r;
            }
            let seq = self.seq.load(Ordering::SeqCst);
            while let Some(waker) = self.pop() {
                let r = handle(&waker);
                trace_log!("wake {} {:?} {:?}", _tag, waker, r);
                if r.is_done() {
                    return r;
                }
                // The latest seq in RegistryMulti is always last_waker.get_seq() +1
                // Because some waker (issued by sink / stream) might be INIT all the time,
                // prevent to dead loop situation when they are wake up and re-register again.
                if waker.get_seq().wrapping_add(1) >= seq {
                    trace_log!("stop {} wake at {}", _tag, seq);
                    return WakeResult::Next;
                }
            }
        }
        WakeResult::Next
    }

https://github.com/frostyplanet/crossfire-rs/blob/62fda2bf5238016b3b49aa7187fc8fc4637405e4/src/locked_waker.rs#L112

pub struct WakerInner<P> {
    state: AtomicU8,
    seq: AtomicUsize,
    waker: UnsafeCell<WakerType>,
    #[allow(dead_code)]
    payload: UnsafeCell<P>,
}
impl<P> WakerInner<P> {
    #[inline(always)]
    pub fn get_seq(&self) -> usize {
        self.seq.load(Ordering::Relaxed)
    }

    #[inline(always)]
    pub fn set_seq(&self, seq: usize) {
        self.seq.store(seq, Ordering::Relaxed);
    }
    #[inline(always)]
    pub fn commit_waiting(&self) -> u8 {
        if let Err(s) = self.try_change_state(WakerState::Init, WakerState::Waiting) {
            return s;
        } else {
            return WakerState::Waiting as u8;
        }
    }
    #[inline(always)]
    pub fn try_change_state(&self, cur: WakerState, new_state: WakerState) -> Result<(), u8> {
        if let Err(s) = self.state.compare_exchange(
            cur as u8,
            new_state as u8,
            Ordering::SeqCst,
            Ordering::Acquire,
        ) {
            return Err(s);
        }
        return Ok(());
    }

    #[inline(always)]
    pub fn wake(&self) -> WakeResult {
        // This is after we get waker from waker_registry, which already happen before relationship.
        // both >= WakerState::Waiting is certain
        let mut state = self.get_state_relaxed();
        loop {
            if state >= WakerState::Waked as u8 {
                return WakeResult::Skip;
            } else if state == WakerState::Waiting as u8 {
                self.state.store(WakerState::Waked as u8, Ordering::SeqCst);
                self._wake_nolock();
                return WakeResult::Waked;
            } else {
                match self.state.compare_exchange_weak(
                    WakerState::Init as u8,
                    WakerState::Waked as u8,
                    Ordering::SeqCst,
                    Ordering::Acquire,
                ) {
                    Ok(_) => {
                        self._wake_nolock();
                        return WakeResult::Next;
                    }
                    Err(s) => {
                        state = s;
                    }
                }
            }
        }
    }
}
impl<P> fmt::Debug for WakerInner<P> {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "waker({})", self.get_seq())
    }
}

The Log
But I got a log file captured from test workflow in github macos-15 (with arm cpu) runner:
https://github.com/frostyplanet/crossfire-rs/actions/runs/17823475038
original log can be downloaded from artifact.

in test case. https://github.com/frostyplanet/crossfire-rs/blob/62fda2bf5238016b3b49aa7187fc8fc4637405e4/src/tests/test_async.rs#L734

(5 sender tokio task,  one receiver tokio task)
...
[2025-09-18 09:00:00.870621][DEBUG][ThreadId(5)][waker_registry.rs:50] tx: reg waker(3774)
[2025-09-18 09:00:00.870622][DEBUG][ThreadId(5)][waker_registry.rs:84] tx: cancel_reuse waker(3774) Done
[2025-09-18 09:00:00.870623][DEBUG][ThreadId(5)][async_tx.rs:274] tx: sender_reg_and_try Some(waker(3774)) state=5
[2025-09-18 09:00:00.870624][DEBUG][ThreadId(5)][async_tx.rs:280] tx: send None done
[2025-09-18 09:00:00.870623][DEBUG][ThreadId(2)][state.rs:267] waker.wake_by_ref (running)
[2025-09-18 09:00:00.870625][DEBUG][ThreadId(5)][waker_registry.rs:50] tx: reg waker(3775)
[2025-09-18 09:00:00.870627][DEBUG][ThreadId(5)][async_tx.rs:274] tx: sender_reg_and_try Some(waker(3775)) state=1
[2025-09-18 09:00:00.870626][DEBUG][ThreadId(2)][state.rs:267] waker.wake_by_ref (running)
[2025-09-18 09:00:00.870638][DEBUG][ThreadId(2)][waker_registry.rs:385] wake tx waker(3774) Next
[2025-09-18 09:00:00.870642][DEBUG][ThreadId(5)][waker_registry.rs:50] tx: reg waker(3776)
[2025-09-18 09:00:00.870641][DEBUG][ThreadId(2)][state.rs:259] waker.wake_by_ref already notified
[2025-09-18 09:00:00.870644][DEBUG][ThreadId(5)][async_tx.rs:274] tx: sender_reg_and_try Some(waker(3776)) state=1
[2025-09-18 09:00:00.870645][DEBUG][ThreadId(2)][waker_registry.rs:392] wake tx waker(3776) Waked
[2025-09-18 09:00:00.870712][DEBUG][ThreadId(2)][async_rx.rs:235] rx: recv
[2025-09-18 09:00:00.870723][DEBUG][ThreadId(2)][waker_registry.rs:385] wake tx waker(3776) Waked

My analyse

sender A (in tokio worker ThreadId 5):

[2025-09-18 09:00:00.870625][DEBUG][ThreadId(5)][waker_registry.rs:50] tx: reg waker(3775)
[2025-09-18 09:00:00.870627][DEBUG][ThreadId(5)][async_tx.rs:274] tx: sender_reg_and_try Some(waker(3775)) state=1
// have set waker state to Waiting with commit_waiting()

sender B (in tokio worker ThreadId 5)

[2025-09-18 09:00:00.870642][DEBUG][ThreadId(5)][waker_registry.rs:50] tx: reg waker(3776)
[2025-09-18 09:00:00.870644][DEBUG][ThreadId(5)][async_tx.rs:274] tx: sender_reg_and_try Some(waker(3776)) state=1
// have set waker state to Waiting with commit_waiting()

sender A and B run in sequence in the same tokio worker, they have increased RegistryMulti::seq with fetch_add(1, Release) and set to it's waker with seq.store(Relaxed),
with 3775 and 3776, which looks fine.
Because these instructions are protected with parking_lot::Mutex, I think they are not allowed to be optimised to outside the scope of mutex guard.

receiver (in tokio worker thread ThreadId(2)])
first poll()

// triggered by on_recv
[2025-09-18 09:00:00.870638][DEBUG][ThreadId(2)][waker_registry.rs:385] wake tx waker(3774) Next
[2025-09-18 09:00:00.870645][DEBUG][ThreadId(2)][waker_registry.rs:392] wake tx waker(3776) Waked   // which is wrong, expected to be 3775

The above is unexpected, should be 3775.
second poll() pop() another waker 3776.

[2025-09-18 09:00:00.870712][DEBUG][ThreadId(2)][async_rx.rs:235] rx: recv
// triggered by on_recv
[2025-09-18 09:00:00.870723][DEBUG][ThreadId(2)][waker_registry.rs:385] wake tx waker(3776) Waked

NOTE:

Possible reason by my guess:
(I cannot confirm the assembly due to not having an arm machine, other than just using github runner)
in the sender thread, trace_log macros and get_seq() was optimised to non-atomic, bypassed get_seq but using some CPU cache result from reg_wake(). somehow the receiver thread use cache from a different core. get_seq() does not read from memory.

Metadata

Metadata

Assignees

No one assigned

    Labels

    A-concurrencyArea: ConcurrencyA-macrosArea: All kinds of macros (custom derive, macro_rules!, proc macros, ..)C-discussionCategory: Discussion or questions that doesn't represent real issues.T-compilerRelevant to the compiler team, which will review and decide on the PR/issue.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions