diff --git a/Cargo.lock b/Cargo.lock index 9a66b3c..d20af89 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -808,6 +808,7 @@ dependencies = [ "cdn-proto", "clap", "jf-signature", + "parking_lot", "rand", "tokio", "tracing", diff --git a/Cargo.toml b/Cargo.toml index f1e9ea5..862eda6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ tracing-subscriber = { version = "0.3", default-features = false, features = [ "ansi", ] } clap = { version = "4", features = ["derive"] } +parking_lot = "0.12" prometheus = { version = "0.13", default-features = false } lazy_static = "1" async-std = { version = "1", default-features = false, features = [ diff --git a/cdn-broker/Cargo.toml b/cdn-broker/Cargo.toml index 2c87b4e..a113f7f 100644 --- a/cdn-broker/Cargo.toml +++ b/cdn-broker/Cargo.toml @@ -59,5 +59,5 @@ derivative.workspace = true dashmap = { version = "6", default-features = false } rand.workspace = true local-ip-address = "0.6" -parking_lot = "0.12" +parking_lot.workspace = true portpicker = "0.1" diff --git a/cdn-client/Cargo.toml b/cdn-client/Cargo.toml index 5afab6c..8e63291 100644 --- a/cdn-client/Cargo.toml +++ b/cdn-client/Cargo.toml @@ -32,3 +32,4 @@ tracing-subscriber.workspace = true rand.workspace = true tracing.workspace = true clap.workspace = true +parking_lot.workspace = true diff --git a/cdn-client/src/retry.rs b/cdn-client/src/retry.rs index 83ee446..8cc6d96 100644 --- a/cdn-client/src/retry.rs +++ b/cdn-client/src/retry.rs @@ -23,7 +23,9 @@ use cdn_proto::{ def::{ConnectionDef, Protocol, Scheme}, error::{Error, Result}, message::{Message, Topic}, + util::AbortOnDropHandle, }; +use parking_lot::Mutex; use tokio::{ spawn, sync::{RwLock, Semaphore}, @@ -58,6 +60,9 @@ pub struct Inner { /// The semaphore to ensure only one reconnection is happening at a time connecting_guard: Arc, + /// The (optional) task that is responsible for reconnecting + reconnection_task: Arc>>>, + /// The keypair to use when authenticating pub keypair: KeyPair>, @@ -168,7 +173,7 @@ impl Retry { if let Ok(permit) = Arc::clone(&self.inner.connecting_guard).try_acquire_owned() { // We were the first to try reconnecting, spawn a reconnection task let inner = self.inner.clone(); - spawn(async move { + let reconnection_task = AbortOnDropHandle(spawn(async move { let mut connection = inner.connection.write().await; // Forever, @@ -189,7 +194,10 @@ impl Retry { } } drop(permit); - }); + })); + + // Update the reconnection task + *self.inner.reconnection_task.lock() = Some(reconnection_task); } // If we are in the middle of reconnecting, return an error @@ -249,6 +257,7 @@ impl Retry { use_local_authority, connection: Arc::default(), connecting_guard: Arc::from(Semaphore::const_new(1)), + reconnection_task: Arc::default(), keypair, subscribed_topics, }),