Skip to content

Commit af307d5

Browse files
committed
✨ (sync): Introduce sync::once event channel (there are still some docs to be added)
1 parent 752553e commit af307d5

File tree

4 files changed

+274
-2
lines changed

4 files changed

+274
-2
lines changed

Cargo.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,11 @@ indexmap = "2.7.0"
3030
ron = "0.8.1"
3131
serde = { version = "1.0.217", features = ["derive"] }
3232
thiserror = "2.0.11"
33-
tokio = { version = "1.43.0", features = ["rt"] }
33+
tokio = { version = "1.43.0", features = ["rt", "sync"] }
3434
tokio-util = { version = "0.7.13", features = ["rt"] }
3535

3636
[dev-dependencies]
37-
tokio = { version = "1.43.0", features = ["macros", "rt-multi-thread"] }
37+
tokio = { version = "1.43.0", features = ["macros", "rt-multi-thread", "test-util", "time"] }
3838
tokio-util = { version = "0.7.13", features = ["time"] }
3939

4040
[build-dependencies]

src/lib.rs

+3
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
pub mod collections;
77
/// Extensions to the [`std::result`](https://doc.rust-lang.org/stable/std/result/index.html) module.
88
pub mod result;
9+
/// Extensions to the [`std::sync`](https://doc.rust-lang.org/stable/std/sync/index.html) &
10+
/// [`tokio::sync`](https://docs.rs/tokio/latest/tokio/sync/index.html) module.
11+
pub mod sync;
912
/// Extensions to the [`std::task`](https://doc.rust-lang.org/stable/std/task/index.html) &
1013
/// [`tokio::task`](https://docs.rs/tokio/latest/tokio/task/index.html) module.
1114
pub mod task;

src/sync.rs

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod once;

src/sync/once.rs

+268
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,268 @@
1+
//! A simple one-time channel that can `trigger` and `wait` on a single
2+
//! `"untyped event"` between two tasks.
3+
//!
4+
//! Can be regarded as a thin wrapper layer over
5+
//! [`tokio::sync::oneshot<()>`](https://docs.rs/tokio/latest/tokio/sync/oneshot/index.html)
6+
//! channel.
7+
//!
8+
//! The [`once_event`] function is used to create a [`OnceTrigger`] and [`OnceWaiter`]
9+
//! handle pair that form the channel.
10+
//!
11+
//! The [`OnceTrigger`] handle is used by the producer to trigger the event.
12+
//! The [`OnceWaiter`] handle is used by the consumer to wait for the event.
13+
//!
14+
//! Each handle can be used on separate tasks.
15+
//!
16+
//! Since the [`OnceTrigger::trigger`] method is not async, it can be used anywhere.
17+
//! This includes triggering between two runtimes, and using it from non-async code.
18+
//!
19+
//! # Examples
20+
//!
21+
//! ```
22+
//! use est::sync::once::once_event;
23+
//!
24+
//! #[tokio::main]
25+
//! async fn main() {
26+
//! let (trigger, waiter) = once_event();
27+
//!
28+
//! tokio::spawn(async move {
29+
//! if trigger.trigger() {
30+
//! println!("event triggered");
31+
//! } else {
32+
//! println!("the waiter dropped");
33+
//! }
34+
//! });
35+
//!
36+
//! if waiter.await {
37+
//! println!("event received");
38+
//! } else {
39+
//! println!("the trigger dropped");
40+
//! }
41+
//! }
42+
//! ```
43+
//!
44+
//! To use a [`OnceWaiter`] in a [`tokio::select!`](https://docs.rs/tokio/latest/tokio/macro.select.html)
45+
//! loop, add `&mut` in front of the waiter.
46+
//!
47+
//! ```
48+
//! use est::sync::once::once_event;
49+
//! use tokio::time::{interval, sleep, Duration};
50+
//!
51+
//! #[tokio::main]
52+
//! # async fn _doc() {}
53+
//! # #[tokio::main(flavor = "current_thread", start_paused = true)]
54+
//! async fn main() {
55+
//! let (shutdown_t, mut shutdown_w) = once_event();
56+
//! let mut interval = interval(Duration::from_millis(100));
57+
//!
58+
//! # let handle =
59+
//! tokio::spawn(async move {
60+
//! sleep(Duration::from_secs(1)).await;
61+
//! shutdown_t.trigger();
62+
//! });
63+
//!
64+
//! loop {
65+
//! tokio::select! {
66+
//! _ = interval.tick() => println!("Another 100ms"),
67+
//! _ = &mut shutdown_w => {
68+
//! println!("Shutdown!");
69+
//! break;
70+
//! }
71+
//! }
72+
//! }
73+
//! # handle.await.unwrap();
74+
//! }
75+
//! ```
76+
//!
77+
//! To use a [`OnceTrigger`] from a destructor, put it in an [`Option`] and call
78+
//! [`Option::take`].
79+
//!
80+
//! ```
81+
//! use est::sync::once::{once_event, OnceTrigger};
82+
//!
83+
//! struct TriggerOnDrop {
84+
//! trigger: Option<OnceTrigger>,
85+
//! }
86+
//! impl Drop for TriggerOnDrop {
87+
//! fn drop(&mut self) {
88+
//! if let Some(trigger) = self.trigger.take() {
89+
//! trigger.trigger();
90+
//! }
91+
//! }
92+
//! }
93+
//!
94+
//! #[tokio::main]
95+
//! # async fn _doc() {}
96+
//! # #[tokio::main(flavor = "current_thread")]
97+
//! async fn main() {
98+
//! let (trigger, waiter) = once_event();
99+
//!
100+
//! let trigger_on_drop = TriggerOnDrop { trigger: Some(trigger) };
101+
//! drop(trigger_on_drop);
102+
//!
103+
//! assert!(waiter.await);
104+
//! }
105+
//! ```
106+
107+
use std::{
108+
future::Future,
109+
pin::Pin,
110+
task::{Context, Poll},
111+
};
112+
use tokio::sync::oneshot::{channel, error::TryRecvError, Receiver, Sender};
113+
114+
#[derive(Debug)]
115+
pub struct OnceTrigger(Sender<()>);
116+
117+
impl OnceTrigger {
118+
pub fn trigger(self) -> bool {
119+
self.0.send(()).is_ok()
120+
}
121+
}
122+
123+
#[derive(Default, Debug, Copy, Clone, Eq, PartialEq, Hash)]
124+
pub enum Triggered {
125+
#[default]
126+
Pending,
127+
Triggered,
128+
Dropped,
129+
}
130+
131+
#[derive(Debug)]
132+
pub struct OnceWaiter {
133+
recv: Receiver<()>,
134+
triggered: Triggered,
135+
}
136+
137+
impl OnceWaiter {
138+
pub fn triggered(&mut self) -> Triggered {
139+
match self.triggered {
140+
Triggered::Pending => {
141+
let triggered = match self.recv.try_recv() {
142+
Ok(_) => Triggered::Triggered,
143+
Err(TryRecvError::Closed) => Triggered::Dropped,
144+
_ => Triggered::Pending,
145+
};
146+
self.triggered = triggered;
147+
triggered
148+
}
149+
triggered => triggered,
150+
}
151+
}
152+
153+
pub fn has_been_triggered(mut self) -> Triggered {
154+
self.triggered()
155+
}
156+
157+
pub fn blocking_wait(self) -> bool {
158+
self.recv.blocking_recv().is_ok()
159+
}
160+
}
161+
162+
impl Future for OnceWaiter {
163+
type Output = bool;
164+
165+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
166+
Pin::new(&mut self.recv).poll(cx).map(|r| r.is_ok())
167+
}
168+
}
169+
170+
pub fn once_event() -> (OnceTrigger, OnceWaiter) {
171+
let triggered = Default::default();
172+
let (send, recv) = channel();
173+
174+
(OnceTrigger(send), OnceWaiter { recv, triggered })
175+
}
176+
177+
#[cfg(test)]
178+
mod tests {
179+
use super::*;
180+
181+
#[tokio::test(flavor = "multi_thread")]
182+
async fn async_wait() {
183+
let (trigger, waiter) = once_event();
184+
tokio::spawn(async move {
185+
assert!(trigger.trigger());
186+
});
187+
assert!(waiter.await);
188+
189+
let (trigger, waiter) = once_event();
190+
drop(waiter);
191+
assert!(!trigger.trigger());
192+
193+
let (trigger, waiter) = once_event();
194+
drop(trigger);
195+
assert!(!waiter.await);
196+
}
197+
198+
#[test]
199+
fn blocking_wait() {
200+
let (trigger, waiter) = once_event();
201+
std::thread::spawn(move || {
202+
assert!(trigger.trigger());
203+
});
204+
assert!(waiter.blocking_wait());
205+
206+
let (trigger, waiter) = once_event();
207+
drop(waiter);
208+
assert!(!trigger.trigger());
209+
210+
let (trigger, waiter) = once_event();
211+
drop(trigger);
212+
assert!(!waiter.blocking_wait());
213+
}
214+
215+
#[test]
216+
fn triggered() {
217+
let (trigger, mut waiter) = once_event();
218+
assert_eq!(waiter.triggered(), Triggered::Pending);
219+
assert_eq!(waiter.triggered(), Triggered::Pending);
220+
assert!(trigger.trigger());
221+
assert_eq!(waiter.triggered(), Triggered::Triggered);
222+
assert_eq!(waiter.triggered(), Triggered::Triggered);
223+
224+
let (trigger, mut waiter) = once_event();
225+
drop(trigger);
226+
assert_eq!(waiter.triggered(), Triggered::Dropped);
227+
assert_eq!(waiter.triggered(), Triggered::Dropped);
228+
}
229+
230+
#[test]
231+
fn has_been_triggered() {
232+
let (trigger, waiter) = once_event();
233+
assert_eq!(waiter.has_been_triggered(), Triggered::Pending);
234+
assert!(!trigger.trigger());
235+
236+
let (trigger, waiter) = once_event();
237+
assert!(trigger.trigger());
238+
assert_eq!(waiter.has_been_triggered(), Triggered::Triggered);
239+
240+
let (trigger, waiter) = once_event();
241+
drop(trigger);
242+
assert_eq!(waiter.has_been_triggered(), Triggered::Dropped);
243+
}
244+
245+
#[tokio::test(flavor = "multi_thread")]
246+
async fn tokio_select() {
247+
use std::time::Duration;
248+
use tokio::time::{interval, sleep};
249+
250+
let mut ticks = 0;
251+
let mut interval = interval(Duration::from_millis(500));
252+
let (trigger, mut waiter) = once_event();
253+
254+
tokio::spawn(async move {
255+
sleep(Duration::from_millis(1250)).await;
256+
trigger.trigger();
257+
});
258+
259+
loop {
260+
tokio::select! {
261+
_ = interval.tick() => ticks += 1,
262+
_ = &mut waiter => break,
263+
}
264+
}
265+
266+
assert_eq!(ticks, 3);
267+
}
268+
}

0 commit comments

Comments
 (0)