-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlocal_executor.rs
200 lines (170 loc) · 6.1 KB
/
local_executor.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
//
// Copyright Stjepan Glavina <stjepang@gmail.com>
// Copyright The OCaml Lwt Interop contributors
//
// Apache License
// This thread-local executor implementation is obtained by forking the
// thread-local executor from async-executor crate [1]. async-executor contains
// both multi-threaded executor and thread-local variant, which is built on top
// of multi-threaded one. This implimentation is basically just inlining of
// multi-threaded executor into thread-local one, removing any multi-thread
// synchronization primitive and dropping unused code. Notable addition is
// callback mechanism to notify external event loop when new pending tasks are
// available for running.
//
// [1] https://github.com/smol-rs/async-executor/blob/master/src/lib.rs
// Good read on async streams, executors, reactors and tasks:
// https://www.qovery.com/blog/a-guided-tour-of-streams-in-rust
use std::cell::RefCell;
use std::fmt;
use std::future::Future;
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::rc::Rc;
use std::sync::Arc;
use std::task::Waker;
use async_task::Runnable;
use concurrent_queue::ConcurrentQueue;
use slab::Slab;
#[doc(no_inline)]
pub use async_task::Task;
use crate::{borrow, borrow_mut};
/// A thread-local executor.
#[derive(Clone)]
pub struct LocalExecutor {
/// The executor state.
state: Rc<State>,
}
impl UnwindSafe for LocalExecutor {}
impl RefUnwindSafe for LocalExecutor {}
impl fmt::Debug for LocalExecutor {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
debug_executor(self, "LocalExecutor", f)
}
}
impl LocalExecutor {
/// Creates a new executor.
pub fn new() -> LocalExecutor {
LocalExecutor {
state: Rc::new(State::new()),
}
}
pub fn set_notifier(&mut self, notifier: impl Notifier + Send + Sync + 'static) {
self.state.set_notifier(notifier)
}
/// Returns `true` if there are no unfinished tasks.
#[allow(dead_code)]
pub fn is_empty(&self) -> bool {
borrow!(self.state.active).is_empty()
}
/// Attempts to run a task if at least one is scheduled.
///
/// Running a scheduled task means simply polling its future once.
pub fn try_tick(&self) -> bool {
match self.state.queue.pop() {
Err(_) => false,
Ok(runnable) => {
// Run the task.
runnable.run();
true
}
}
}
/// Spawns a task onto the executor.
pub fn spawn<T>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
let mut active = borrow_mut!(self.state.active);
// Remove the task from the set of active tasks when the future finishes.
let index = active.vacant_entry().key();
let state = self.state.clone();
let future = async move {
let _guard = CallOnDrop(move || drop(borrow_mut!(state.active).try_remove(index)));
future.await
};
let notifier = borrow!(self.state.notifier).clone();
// Create the task and register it in the set of active tasks.
let (runnable, task) = unsafe {
// We can't use safe `async_task::spawn` as it expects the future to
// be Send, while our local futures are not Send (we want them to
// run only on one thread where the executor was created). So we use
// `async_task::spawn_unchecked`, which is an unsafe function and
// bypasses the compiler checks. But we want the compiler to enforce
// the `schedule` callback to be actually Send, because it will be
// called from the other threads when they will wake up our local
// futures - so we wrap it with Self::schedule function, which has
// Send explicitly marked in it's signature for this purpose.
async_task::spawn_unchecked(future, Self::schedule(&self.state.queue, notifier))
};
active.insert(runnable.waker());
runnable.schedule();
task
}
/// Returns a function that schedules a runnable task when it gets woken up
/// Returned function has to be Sync + Send, our local futures might get
/// woken up by other threads
fn schedule(
queue: &ConcurrentQueue<Runnable>,
notifier: Option<Arc<dyn Notifier + Send + Sync>>,
) -> impl Fn(Runnable) + Sync + Send + '_ {
move |runnable| {
queue.push(runnable).unwrap();
match ¬ifier {
Some(notifier) => notifier.to_owned().notify(),
None => (),
};
}
}
}
impl Drop for LocalExecutor {
fn drop(&mut self) {
let mut active = borrow_mut!(self.state.active);
for w in active.drain() {
w.wake();
}
drop(active);
while self.state.queue.pop().is_ok() {}
}
}
impl Default for LocalExecutor {
fn default() -> LocalExecutor {
LocalExecutor::new()
}
}
/// The state of a executor.
struct State {
/// The global queue.
queue: ConcurrentQueue<Runnable>,
/// Optional notify callback
notifier: RefCell<Option<Arc<dyn Notifier + Send + Sync>>>,
/// Currently active tasks.
active: RefCell<Slab<Waker>>,
}
impl State {
/// Creates state for a new executor.
fn new() -> State {
State {
queue: ConcurrentQueue::unbounded(),
notifier: RefCell::new(None),
active: RefCell::new(Slab::new()),
}
}
fn set_notifier(&self, notifier: impl Notifier + Send + Sync + 'static) {
let mut self_notifier = borrow_mut!(self.notifier);
*self_notifier = Some(Arc::new(notifier))
}
}
pub trait Notifier {
fn notify(&self);
}
/// Debug implementation for `Executor` and `LocalExecutor`.
fn debug_executor(executor: &LocalExecutor, name: &str, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct(name)
.field("active", &borrow!(executor.state.active).len())
.field("global_tasks", &executor.state.queue.len())
.finish()
}
/// Runs a closure when dropped.
struct CallOnDrop<F: Fn()>(F);
impl<F: Fn()> Drop for CallOnDrop<F> {
fn drop(&mut self) {
(self.0)();
}
}