Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove exit-future #5183

Merged
merged 3 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 5 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ edition = "2021"

[workspace.dependencies]
arbitrary = { version = "1", features = ["derive"] }
async-channel = "1.9.0"
bincode = "1"
bitvec = "1"
byteorder = "1"
Expand Down
1 change: 0 additions & 1 deletion beacon_node/beacon_chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ execution_layer = { workspace = true }
sensitive_url = { workspace = true }
superstruct = { workspace = true }
hex = { workspace = true }
exit-future = { workspace = true }
oneshot_broadcast = { path = "../../common/oneshot_broadcast/" }
slog-term = { workspace = true }
slog-async = { workspace = true }
Expand Down
1 change: 0 additions & 1 deletion beacon_node/execution_layer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ kzg = { workspace = true }
state_processing = { workspace = true }
superstruct = { workspace = true }
lru = { workspace = true }
exit-future = { workspace = true }
tree_hash = { workspace = true }
tree_hash_derive = { workspace = true }
parking_lot = { workspace = true }
Expand Down
3 changes: 1 addition & 2 deletions beacon_node/lighthouse_network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ authors = ["Sigma Prime <contact@sigmaprime.io>"]
edition = { workspace = true }

[dependencies]
async-channel = { workspace = true }
discv5 = { workspace = true }
unsigned-varint = { version = "0.6", features = ["codec"] }
ssz_types = { workspace = true }
Expand Down Expand Up @@ -55,7 +56,6 @@ hex_fmt = "0.3.0"
instant = "0.1.12"
quick-protobuf = "0.8"
void = "1.0.2"
async-channel = "1.9.0"
asynchronous-codec = "0.7.0"
base64 = "0.21.5"
libp2p-mplex = "0.41"
Expand All @@ -70,7 +70,6 @@ features = ["identify", "yamux", "noise", "dns", "tcp", "tokio", "plaintext", "s
slog-term = { workspace = true }
slog-async = { workspace = true }
tempfile = { workspace = true }
exit-future = { workspace = true }
quickcheck = { workspace = true }
quickcheck_macros = { workspace = true }
async-std = { version = "1.6.3", features = ["unstable"] }
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/lighthouse_network/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub struct Libp2pInstance(
LibP2PService<ReqId, E>,
#[allow(dead_code)]
// This field is managed for lifetime purposes may not be used directly, hence the `#[allow(dead_code)]` attribute.
exit_future::Signal,
async_channel::Sender<()>,
);

impl std::ops::Deref for Libp2pInstance {
Expand Down Expand Up @@ -110,7 +110,7 @@ pub async fn build_libp2p_instance(
let config = build_config(boot_nodes);
// launch libp2p service

let (signal, exit) = exit_future::signal();
let (signal, exit) = async_channel::bounded(1);
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let executor = task_executor::TaskExecutor::new(rt, exit, log.clone(), shutdown_tx);
let libp2p_context = lighthouse_network::Context {
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ edition = { workspace = true }
sloggers = { workspace = true }
genesis = { workspace = true }
matches = "0.1.8"
exit-future = { workspace = true }
slog-term = { workspace = true }
slog-async = { workspace = true }
eth2 = { workspace = true }

[dependencies]
async-channel = { workspace = true }
beacon_chain = { workspace = true }
store = { workspace = true }
lighthouse_network = { workspace = true }
Expand Down Expand Up @@ -56,4 +56,4 @@ environment = { workspace = true }
# NOTE: This can be run via cargo build --bin lighthouse --features network/disable-backfill
disable-backfill = []
fork_from_env = ["beacon_chain/fork_from_env"]
portable = ["beacon_chain/portable"]
portable = ["beacon_chain/portable"]
4 changes: 2 additions & 2 deletions beacon_node/network/src/service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ mod tests {

let runtime = Arc::new(Runtime::new().unwrap());

let (signal, exit) = exit_future::signal();
let (signal, exit) = async_channel::bounded(1);
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let executor = task_executor::TaskExecutor::new(
Arc::downgrade(&runtime),
Expand Down Expand Up @@ -139,7 +139,7 @@ mod tests {

// Build network service.
let (mut network_service, network_globals, _network_senders) = runtime.block_on(async {
let (_, exit) = exit_future::signal();
let (_, exit) = async_channel::bounded(1);
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let executor = task_executor::TaskExecutor::new(
Arc::downgrade(&runtime),
Expand Down
2 changes: 1 addition & 1 deletion common/task_executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ authors = ["Sigma Prime <contact@sigmaprime.io>"]
edition = { workspace = true }

[dependencies]
async-channel = { workspace = true }
tokio = { workspace = true }
slog = { workspace = true }
futures = { workspace = true }
exit-future = { workspace = true }
lazy_static = { workspace = true }
lighthouse_metrics = { workspace = true }
sloggers = { workspace = true }
61 changes: 32 additions & 29 deletions common/task_executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ pub struct TaskExecutor {
/// The handle to the runtime on which tasks are spawned
handle_provider: HandleProvider,
/// The receiver exit future which on receiving shuts down the task
exit: exit_future::Exit,
exit: async_channel::Receiver<()>,
/// Sender given to tasks, so that if they encounter a state in which execution cannot
/// continue they can request that everything shuts down.
///
Expand All @@ -93,7 +93,7 @@ impl TaskExecutor {
/// crate).
pub fn new<T: Into<HandleProvider>>(
handle: T,
exit: exit_future::Exit,
exit: async_channel::Receiver<()>,
log: slog::Logger,
signal_tx: Sender<ShutdownReason>,
) -> Self {
Expand Down Expand Up @@ -159,8 +159,8 @@ impl TaskExecutor {

/// Spawn a future on the tokio runtime.
///
/// The future is wrapped in an `exit_future::Exit`. The task is cancelled when the corresponding
/// exit_future `Signal` is fired/dropped.
/// The future is wrapped in an `async-channel::Receiver`. The task is cancelled when the corresponding
/// Sender is dropped.
///
/// The future is monitored via another spawned future to ensure that it doesn't panic. In case
/// of a panic, the executor will be shut down via `self.signal_tx`.
Expand All @@ -172,9 +172,9 @@ impl TaskExecutor {
}
}

/// Spawn a future on the tokio runtime. This function does not wrap the task in an `exit_future::Exit`
/// Spawn a future on the tokio runtime. This function does not wrap the task in an `async-channel::Receiver`
/// like [spawn](#method.spawn).
/// The caller of this function is responsible for wrapping up the task with an `exit_future::Exit` to
/// The caller of this function is responsible for wrapping up the task with an `async-channel::Receiver` to
/// ensure that the task gets canceled appropriately.
/// This function generates prometheus metrics on number of tasks and task duration.
///
Expand Down Expand Up @@ -213,40 +213,39 @@ impl TaskExecutor {
}
}

/// Spawn a future on the tokio runtime wrapped in an `exit_future::Exit` returning an optional
/// Spawn a future on the tokio runtime wrapped in an `async-channel::Receiver` returning an optional
/// join handle to the future.
/// The task is canceled when the corresponding exit_future `Signal` is fired/dropped.
/// The task is canceled when the corresponding async-channel is dropped.
///
/// This function generates prometheus metrics on number of tasks and task duration.
pub fn spawn_handle<R: Send + 'static>(
&self,
task: impl Future<Output = R> + Send + 'static,
name: &'static str,
) -> Option<tokio::task::JoinHandle<Option<R>>> {
let exit = self.exit.clone();
let exit = self.exit();
let log = self.log.clone();

if let Some(int_gauge) = metrics::get_int_gauge(&metrics::ASYNC_TASKS_COUNT, &[name]) {
// Task is shutdown before it completes if `exit` receives
let int_gauge_1 = int_gauge.clone();
let future = future::select(Box::pin(task), exit).then(move |either| {
let result = match either {
future::Either::Left((value, _)) => {
trace!(log, "Async task completed"; "task" => name);
Some(value)
}
future::Either::Right(_) => {
debug!(log, "Async task shutdown, exit received"; "task" => name);
None
}
};
int_gauge_1.dec();
futures::future::ready(result)
});

int_gauge.inc();
if let Some(handle) = self.handle() {
Some(handle.spawn(future))
Some(handle.spawn(async move {
futures::pin_mut!(exit);
let result = match future::select(Box::pin(task), exit).await {
future::Either::Left((value, _)) => {
trace!(log, "Async task completed"; "task" => name);
Some(value)
}
future::Either::Right(_) => {
debug!(log, "Async task shutdown, exit received"; "task" => name);
None
}
};
int_gauge_1.dec();
result
}))
} else {
debug!(self.log, "Couldn't spawn task. Runtime shutting down");
None
Expand Down Expand Up @@ -324,7 +323,7 @@ impl TaskExecutor {
metrics::inc_gauge_vec(&metrics::BLOCK_ON_TASKS_COUNT, &[name]);
let log = self.log.clone();
let handle = self.handle()?;
let exit = self.exit.clone();
let exit = self.exit();

debug!(
log,
Expand Down Expand Up @@ -362,9 +361,13 @@ impl TaskExecutor {
self.handle_provider.handle()
}

/// Returns a copy of the `exit_future::Exit`.
pub fn exit(&self) -> exit_future::Exit {
self.exit.clone()
/// Returns a future that completes when `async-channel::Sender` is dropped or () is sent,
/// which translates to the exit signal being triggered.
pub fn exit(&self) -> impl Future<Output = ()> {
let exit = self.exit.clone();
async move {
let _ = exit.recv().await;
}
}

/// Get a channel to request shutting down.
Expand Down
4 changes: 2 additions & 2 deletions common/task_executor/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio::runtime;
/// This struct should never be used in production, only testing.
pub struct TestRuntime {
runtime: Option<Arc<tokio::runtime::Runtime>>,
_runtime_shutdown: exit_future::Signal,
_runtime_shutdown: async_channel::Sender<()>,
pub task_executor: TaskExecutor,
pub log: Logger,
}
Expand All @@ -24,7 +24,7 @@ impl Default for TestRuntime {
/// called *outside* any existing runtime, create a new `Runtime` and keep it alive until the
/// `Self` is dropped.
fn default() -> Self {
let (runtime_shutdown, exit) = exit_future::signal();
let (runtime_shutdown, exit) = async_channel::bounded(1);
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let log = null_logger().unwrap();

Expand Down
2 changes: 1 addition & 1 deletion lighthouse/environment/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ authors = ["Paul Hauner <paul@paulhauner.com>"]
edition = { workspace = true }

[dependencies]
async-channel = { workspace = true }
tokio = { workspace = true }
slog = { workspace = true }
sloggers = { workspace = true }
Expand All @@ -17,7 +18,6 @@ slog-term = { workspace = true }
slog-async = { workspace = true }
futures = { workspace = true }
slog-json = "2.3.0"
exit-future = { workspace = true }
serde = { workspace = true }

[target.'cfg(not(target_family = "unix"))'.dependencies]
Expand Down
Loading
Loading