Skip to content

Commit

Permalink
remove exit-future usage,
Browse files Browse the repository at this point in the history
as it is non maintained, and replace with async-channel which is already in the repo.
  • Loading branch information
jxs committed Feb 6, 2024
1 parent 8530427 commit b29f6e5
Show file tree
Hide file tree
Showing 18 changed files with 60 additions and 74 deletions.
23 changes: 5 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ edition = "2021"

[workspace.dependencies]
arbitrary = { version = "1", features = ["derive"] }
async-channel = "1.9.0"
bincode = "1"
bitvec = "1"
byteorder = "1"
Expand Down
1 change: 0 additions & 1 deletion beacon_node/beacon_chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ execution_layer = { workspace = true }
sensitive_url = { workspace = true }
superstruct = { workspace = true }
hex = { workspace = true }
exit-future = { workspace = true }
oneshot_broadcast = { path = "../../common/oneshot_broadcast/" }
slog-term = { workspace = true }
slog-async = { workspace = true }
Expand Down
1 change: 0 additions & 1 deletion beacon_node/execution_layer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ kzg = { workspace = true }
state_processing = { workspace = true }
superstruct = { workspace = true }
lru = { workspace = true }
exit-future = { workspace = true }
tree_hash = { workspace = true }
tree_hash_derive = { workspace = true }
parking_lot = { workspace = true }
Expand Down
3 changes: 1 addition & 2 deletions beacon_node/lighthouse_network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ authors = ["Sigma Prime <contact@sigmaprime.io>"]
edition = { workspace = true }

[dependencies]
async-channel = { workspace = true }
discv5 = { workspace = true }
unsigned-varint = { version = "0.6", features = ["codec"] }
ssz_types = { workspace = true }
Expand Down Expand Up @@ -55,7 +56,6 @@ hex_fmt = "0.3.0"
instant = "0.1.12"
quick-protobuf = "0.8"
void = "1.0.2"
async-channel = "1.9.0"
asynchronous-codec = "0.7.0"
base64 = "0.21.5"
libp2p-mplex = "0.41"
Expand All @@ -70,7 +70,6 @@ features = ["identify", "yamux", "noise", "dns", "tcp", "tokio", "plaintext", "s
slog-term = { workspace = true }
slog-async = { workspace = true }
tempfile = { workspace = true }
exit-future = { workspace = true }
quickcheck = { workspace = true }
quickcheck_macros = { workspace = true }
async-std = { version = "1.6.3", features = ["unstable"] }
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/lighthouse_network/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub struct Libp2pInstance(
LibP2PService<ReqId, E>,
#[allow(dead_code)]
// This field is managed for lifetime purposes may not be used directly, hence the `#[allow(dead_code)]` attribute.
exit_future::Signal,
async_channel::Sender<()>,
);

impl std::ops::Deref for Libp2pInstance {
Expand Down Expand Up @@ -110,7 +110,7 @@ pub async fn build_libp2p_instance(
let config = build_config(boot_nodes);
// launch libp2p service

let (signal, exit) = exit_future::signal();
let (signal, exit) = async_channel::bounded(1);
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let executor = task_executor::TaskExecutor::new(rt, exit, log.clone(), shutdown_tx);
let libp2p_context = lighthouse_network::Context {
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ edition = { workspace = true }
sloggers = { workspace = true }
genesis = { workspace = true }
matches = "0.1.8"
exit-future = { workspace = true }
slog-term = { workspace = true }
slog-async = { workspace = true }
eth2 = { workspace = true }

[dependencies]
async-channel = { workspace = true }
beacon_chain = { workspace = true }
store = { workspace = true }
lighthouse_network = { workspace = true }
Expand Down Expand Up @@ -56,4 +56,4 @@ environment = { workspace = true }
# NOTE: This can be run via cargo build --bin lighthouse --features network/disable-backfill
disable-backfill = []
fork_from_env = ["beacon_chain/fork_from_env"]
portable = ["beacon_chain/portable"]
portable = ["beacon_chain/portable"]
4 changes: 2 additions & 2 deletions beacon_node/network/src/service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ mod tests {

let runtime = Arc::new(Runtime::new().unwrap());

let (signal, exit) = exit_future::signal();
let (signal, exit) = async_channel::bounded(1);
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let executor = task_executor::TaskExecutor::new(
Arc::downgrade(&runtime),
Expand Down Expand Up @@ -139,7 +139,7 @@ mod tests {

// Build network service.
let (mut network_service, network_globals, _network_senders) = runtime.block_on(async {
let (_, exit) = exit_future::signal();
let (_, exit) = async_channel::bounded(1);
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let executor = task_executor::TaskExecutor::new(
Arc::downgrade(&runtime),
Expand Down
2 changes: 1 addition & 1 deletion common/task_executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ authors = ["Sigma Prime <contact@sigmaprime.io>"]
edition = { workspace = true }

[dependencies]
async-channel = { workspace = true }
tokio = { workspace = true }
slog = { workspace = true }
futures = { workspace = true }
exit-future = { workspace = true }
lazy_static = { workspace = true }
lighthouse_metrics = { workspace = true }
sloggers = { workspace = true }
61 changes: 32 additions & 29 deletions common/task_executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ pub struct TaskExecutor {
/// The handle to the runtime on which tasks are spawned
handle_provider: HandleProvider,
/// The receiver exit future which on receiving shuts down the task
exit: exit_future::Exit,
exit: async_channel::Receiver<()>,
/// Sender given to tasks, so that if they encounter a state in which execution cannot
/// continue they can request that everything shuts down.
///
Expand All @@ -93,7 +93,7 @@ impl TaskExecutor {
/// crate).
pub fn new<T: Into<HandleProvider>>(
handle: T,
exit: exit_future::Exit,
exit: async_channel::Receiver<()>,
log: slog::Logger,
signal_tx: Sender<ShutdownReason>,
) -> Self {
Expand Down Expand Up @@ -159,8 +159,8 @@ impl TaskExecutor {

/// Spawn a future on the tokio runtime.
///
/// The future is wrapped in an `exit_future::Exit`. The task is cancelled when the corresponding
/// exit_future `Signal` is fired/dropped.
/// The future is wrapped in an `async-channel::Receiver`. The task is cancelled when the corresponding
/// Sender is dropped.
///
/// The future is monitored via another spawned future to ensure that it doesn't panic. In case
/// of a panic, the executor will be shut down via `self.signal_tx`.
Expand All @@ -172,9 +172,9 @@ impl TaskExecutor {
}
}

/// Spawn a future on the tokio runtime. This function does not wrap the task in an `exit_future::Exit`
/// Spawn a future on the tokio runtime. This function does not wrap the task in an `async-channel::Receiver`
/// like [spawn](#method.spawn).
/// The caller of this function is responsible for wrapping up the task with an `exit_future::Exit` to
/// The caller of this function is responsible for wrapping up the task with an `async-channel::Receiver` to
/// ensure that the task gets canceled appropriately.
/// This function generates prometheus metrics on number of tasks and task duration.
///
Expand Down Expand Up @@ -213,40 +213,39 @@ impl TaskExecutor {
}
}

/// Spawn a future on the tokio runtime wrapped in an `exit_future::Exit` returning an optional
/// Spawn a future on the tokio runtime wrapped in an `async-channel::Receiver` returning an optional
/// join handle to the future.
/// The task is canceled when the corresponding exit_future `Signal` is fired/dropped.
/// The task is canceled when the corresponding async-channel is dropped.
///
/// This function generates prometheus metrics on number of tasks and task duration.
pub fn spawn_handle<R: Send + 'static>(
&self,
task: impl Future<Output = R> + Send + 'static,
name: &'static str,
) -> Option<tokio::task::JoinHandle<Option<R>>> {
let exit = self.exit.clone();
let exit = self.exit();
let log = self.log.clone();

if let Some(int_gauge) = metrics::get_int_gauge(&metrics::ASYNC_TASKS_COUNT, &[name]) {
// Task is shutdown before it completes if `exit` receives
let int_gauge_1 = int_gauge.clone();
let future = future::select(Box::pin(task), exit).then(move |either| {
let result = match either {
future::Either::Left((value, _)) => {
trace!(log, "Async task completed"; "task" => name);
Some(value)
}
future::Either::Right(_) => {
debug!(log, "Async task shutdown, exit received"; "task" => name);
None
}
};
int_gauge_1.dec();
futures::future::ready(result)
});

int_gauge.inc();
if let Some(handle) = self.handle() {
Some(handle.spawn(future))
Some(handle.spawn(async move {
futures::pin_mut!(exit);
let result = match future::select(Box::pin(task), exit).await {
future::Either::Left((value, _)) => {
trace!(log, "Async task completed"; "task" => name);
Some(value)
}
future::Either::Right(_) => {
debug!(log, "Async task shutdown, exit received"; "task" => name);
None
}
};
int_gauge_1.dec();
result
}))
} else {
debug!(self.log, "Couldn't spawn task. Runtime shutting down");
None
Expand Down Expand Up @@ -324,7 +323,7 @@ impl TaskExecutor {
metrics::inc_gauge_vec(&metrics::BLOCK_ON_TASKS_COUNT, &[name]);
let log = self.log.clone();
let handle = self.handle()?;
let exit = self.exit.clone();
let exit = self.exit();

debug!(
log,
Expand Down Expand Up @@ -362,9 +361,13 @@ impl TaskExecutor {
self.handle_provider.handle()
}

/// Returns a copy of the `exit_future::Exit`.
pub fn exit(&self) -> exit_future::Exit {
self.exit.clone()
/// Returns a future that completes when `async-channel::Sender` is dropped or () is sent,
/// which translates to the exit signal being triggered.
pub fn exit(&self) -> impl Future<Output = ()> {
let exit = self.exit.clone();
async move {
let _ = exit.recv().await;
}
}

/// Get a channel to request shutting down.
Expand Down
4 changes: 2 additions & 2 deletions common/task_executor/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio::runtime;
/// This struct should never be used in production, only testing.
pub struct TestRuntime {
runtime: Option<Arc<tokio::runtime::Runtime>>,
_runtime_shutdown: exit_future::Signal,
_runtime_shutdown: async_channel::Sender<()>,
pub task_executor: TaskExecutor,
pub log: Logger,
}
Expand All @@ -24,7 +24,7 @@ impl Default for TestRuntime {
/// called *outside* any existing runtime, create a new `Runtime` and keep it alive until the
/// `Self` is dropped.
fn default() -> Self {
let (runtime_shutdown, exit) = exit_future::signal();
let (runtime_shutdown, exit) = async_channel::bounded(1);
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let log = null_logger().unwrap();

Expand Down
2 changes: 1 addition & 1 deletion lighthouse/environment/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ authors = ["Paul Hauner <paul@paulhauner.com>"]
edition = { workspace = true }

[dependencies]
async-channel = { workspace = true }
tokio = { workspace = true }
slog = { workspace = true }
sloggers = { workspace = true }
Expand All @@ -17,7 +18,6 @@ slog-term = { workspace = true }
slog-async = { workspace = true }
futures = { workspace = true }
slog-json = "2.3.0"
exit-future = { workspace = true }
serde = { workspace = true }

[target.'cfg(not(target_family = "unix"))'.dependencies]
Expand Down
Loading

0 comments on commit b29f6e5

Please sign in to comment.