Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

std: use an event-flag-based thread parker on SOLID #97140

Merged
merged 4 commits into from
Jun 26, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 45 additions & 3 deletions library/std/src/sys/itron/abi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,32 @@ pub type ER = int_t;
/// Error code type, `ID` on success
pub type ER_ID = int_t;

/// Service call operational mode
pub type MODE = uint_t;

/// OR waiting condition for an eventflag
pub const TWF_ORW: MODE = 0x01;

/// Object attributes
pub type ATR = uint_t;

/// FIFO wait order
pub const TA_FIFO: ATR = 0;
/// Only one task is allowed to be in the waiting state for the eventflag
pub const TA_WSGL: ATR = 0;
/// The eventflag’s bit pattern is cleared when a task is released from the
/// waiting state for that eventflag.
pub const TA_CLR: ATR = 0x04;

/// Bit pattern of an eventflag
pub type FLGPTN = uint_t;

/// Task or interrupt priority
pub type PRI = int_t;

/// The special value of `PRI` representing the current task's priority.
pub const TPRI_SELF: PRI = 0;

/// Object attributes
pub type ATR = uint_t;

/// Use the priority inheritance protocol
#[cfg(target_os = "solid_asp3")]
pub const TA_INHERIT: ATR = 0x02;
Expand Down Expand Up @@ -90,6 +107,13 @@ pub struct T_CSEM {
pub maxsem: uint_t,
}

#[derive(Clone, Copy)]
#[repr(C)]
pub struct T_CFLG {
pub flgatr: ATR,
pub iflgptn: FLGPTN,
}

#[derive(Clone, Copy)]
#[repr(C)]
pub struct T_CMTX {
Expand Down Expand Up @@ -139,6 +163,24 @@ extern "C" {
pub fn sns_dsp() -> bool_t;
#[link_name = "__asp3_get_tim"]
pub fn get_tim(p_systim: *mut SYSTIM) -> ER;
#[link_name = "__asp3_acre_flg"]
pub fn acre_flg(pk_cflg: *const T_CFLG) -> ER_ID;
#[link_name = "__asp3_del_flg"]
pub fn del_flg(flgid: ID) -> ER;
#[link_name = "__asp3_set_flg"]
pub fn set_flg(flgid: ID, setptn: FLGPTN) -> ER;
#[link_name = "__asp3_clr_flg"]
pub fn clr_flg(flgid: ID, clrptn: FLGPTN) -> ER;
#[link_name = "__asp3_wai_flg"]
pub fn wai_flg(flgid: ID, waiptn: FLGPTN, wfmode: MODE, p_flgptn: *mut FLGPTN) -> ER;
#[link_name = "__asp3_twai_flg"]
pub fn twai_flg(
flgid: ID,
waiptn: FLGPTN,
wfmode: MODE,
p_flgptn: *mut FLGPTN,
tmout: TMO,
) -> ER;
#[link_name = "__asp3_acre_mtx"]
pub fn acre_mtx(pk_cmtx: *const T_CMTX) -> ER_ID;
#[link_name = "__asp3_del_mtx"]
Expand Down
72 changes: 72 additions & 0 deletions library/std/src/sys/itron/wait_flag.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use crate::mem::MaybeUninit;
use crate::time::Duration;

use super::{
abi,
error::{expect_success, fail},
time::with_tmos,
};

const CLEAR: abi::FLGPTN = 0;
const RAISED: abi::FLGPTN = 1;

/// A thread parking primitive that is not susceptible to race conditions,
/// but provides no atomic ordering guarantees and allows only one `raise` per wait.
pub struct WaitFlag {
flag: abi::ID,
}

impl WaitFlag {
/// Creates a new wait flag.
pub fn new() -> WaitFlag {
let flag = expect_success(
unsafe {
abi::acre_flg(&abi::T_CFLG {
flgatr: abi::TA_FIFO | abi::TA_WSGL | abi::TA_CLR,
iflgptn: CLEAR,
})
},
&"acre_flg",
);

WaitFlag { flag }
}

/// Wait for the wait flag to be raised.
pub fn wait(&self) {
let mut token = MaybeUninit::uninit();
expect_success(
unsafe { abi::wai_flg(self.flag, RAISED, abi::TWF_ORW, token.as_mut_ptr()) },
&"wai_flg",
);
}

/// Wait for the wait flag to be raised or the timeout to occur.
///
/// Returns whether the flag was raised (`true`) or the operation timed out (`false`).
pub fn wait_timeout(&self, dur: Duration) -> bool {
let mut token = MaybeUninit::uninit();
let res = with_tmos(dur, |tmout| unsafe {
abi::twai_flg(self.flag, RAISED, abi::TWF_ORW, token.as_mut_ptr(), tmout)
});

match res {
abi::E_OK => true,
abi::E_TMOUT => false,
error => fail(error, &"twai_flg"),
}
}

/// Raise the wait flag.
///
/// Calls to this function should be balanced with the number of successful waits.
pub fn raise(&self) {
expect_success(unsafe { abi::set_flg(self.flag, RAISED) }, &"set_flg");
}
}

impl Drop for WaitFlag {
fn drop(&mut self) {
expect_success(unsafe { abi::del_flg(self.flag) }, &"del_flg");
}
}
2 changes: 2 additions & 0 deletions library/std/src/sys/solid/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ mod itron {
pub mod thread;
pub(super) mod time;
use super::unsupported;
pub mod wait_flag;
}

pub mod alloc;
Expand Down Expand Up @@ -43,6 +44,7 @@ pub mod memchr;
pub mod thread_local_dtor;
pub mod thread_local_key;
pub mod time;
pub use self::itron::wait_flag;

mod rwlock;

Expand Down
7 changes: 4 additions & 3 deletions library/std/src/sys_common/thread_parker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ cfg_if::cfg_if! {
))] {
mod futex;
pub use futex::Parker;
} else if #[cfg(windows)] {
pub use crate::sys::thread_parker::Parker;
} else if #[cfg(target_family = "unix")] {
} else if #[cfg(target_os = "solid_asp3")] {
mod wait_flag;
pub use wait_flag::Parker;
} else if #[cfg(any(windows, target_family = "unix"))] {
pub use crate::sys::thread_parker::Parker;
} else {
mod generic;
Expand Down
96 changes: 96 additions & 0 deletions library/std/src/sys_common/thread_parker/wait_flag.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
//! A wait-flag-based thread parker.
//!
//! Some operating systems provide low-level parking primitives like wait counts,
//! event flags or semaphores which are not susceptible to race conditions (meaning
//! the wakeup can occur before the wait operation). To implement the `std` thread
//! parker on top of these primitives, we only have to ensure that parking is fast
//! when the thread token is available, the atomic ordering guarantees are maintained
//! and spurious wakeups are minimized.
//!
//! To achieve this, this parker uses an atomic variable with three states: `EMPTY`,
//! `PARKED` and `NOTIFIED`:
//! * `EMPTY` means the token has not been made available, but the thread is not
//! currently waiting on it.
//! * `PARKED` means the token is not available and the thread is parked.
//! * `NOTIFIED` means the token is available.
//!
//! `park` and `park_timeout` change the state from `EMPTY` to `PARKED` and from
//! `NOTIFIED` to `EMPTY`. If the state was `NOTIFIED`, the thread was unparked and
//! execution can continue without calling into the OS. If the state was `EMPTY`,
//! the token is not available and the thread waits on the primitive (here called
//! "wait flag").
//!
//! `unpark` changes the state to `NOTIFIED`. If the state was `PARKED`, the thread
//! is or will be sleeping on the wait flag, so we raise it. Only the first thread
//! to call `unpark` will raise the wait flag, so spurious wakeups are avoided
//! (this is especially important for semaphores).

use crate::pin::Pin;
use crate::sync::atomic::AtomicI8;
use crate::sync::atomic::Ordering::SeqCst;
use crate::sys::wait_flag::WaitFlag;
use crate::time::Duration;

const EMPTY: i8 = 0;
const PARKED: i8 = -1;
const NOTIFIED: i8 = 1;

pub struct Parker {
state: AtomicI8,
wait_flag: WaitFlag,
}

impl Parker {
/// Construct a parker for the current thread. The UNIX parker
/// implementation requires this to happen in-place.
pub unsafe fn new(parker: *mut Parker) {
parker.write(Parker { state: AtomicI8::new(EMPTY), wait_flag: WaitFlag::new() })
}

// This implementation doesn't require `unsafe` and `Pin`, but other implementations do.
pub unsafe fn park(self: Pin<&Self>) {
// The state values are chosen so that this subtraction changes
// `NOTIFIED` to `EMPTY` and `EMPTY` to `PARKED`.
let state = self.state.fetch_sub(1, SeqCst);
match state {
EMPTY => (),
NOTIFIED => return,
_ => panic!("inconsistent park state"),
}

self.wait_flag.wait();

// We need to do a load here to use `Acquire` ordering.
self.state.swap(EMPTY, SeqCst);
}

// This implementation doesn't require `unsafe` and `Pin`, but other implementations do.
pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) {
let state = self.state.fetch_sub(1, SeqCst);
match state {
EMPTY => (),
NOTIFIED => return,
_ => panic!("inconsistent park state"),
}

let wakeup = self.wait_flag.wait_timeout(dur);
let state = self.state.swap(EMPTY, SeqCst);
if state == NOTIFIED && !wakeup {
// The token was made available after the wait timed out, but before
// we reset the state, so we need to reset the wait flag to avoid
// spurious wakeups. This wait has no timeout, but we know it will
// return quickly, as the unparking thread will definitely raise the
// flag if it has not already done so.
self.wait_flag.wait();
}
joboet marked this conversation as resolved.
Show resolved Hide resolved
}

// This implementation doesn't require `Pin`, but other implementations do.
pub fn unpark(self: Pin<&Self>) {
let state = self.state.swap(NOTIFIED, SeqCst);
joboet marked this conversation as resolved.
Show resolved Hide resolved

if state == PARKED {
self.wait_flag.raise();
}
}
}