Skip to content

Commit

Permalink
Merge pull request #54 from EspressoSystems/kill-reconnection-on-drop
Browse files Browse the repository at this point in the history
Kill reconnection on drop
  • Loading branch information
rob-maron authored Aug 28, 2024
2 parents 69df01c + 4e81955 commit dd0596f
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ tracing-subscriber = { version = "0.3", default-features = false, features = [
"ansi",
] }
clap = { version = "4", features = ["derive"] }
parking_lot = "0.12"
prometheus = { version = "0.13", default-features = false }
lazy_static = "1"
async-std = { version = "1", default-features = false, features = [
Expand Down
2 changes: 1 addition & 1 deletion cdn-broker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,5 @@ derivative.workspace = true
dashmap = { version = "6", default-features = false }
rand.workspace = true
local-ip-address = "0.6"
parking_lot = "0.12"
parking_lot.workspace = true
portpicker = "0.1"
1 change: 1 addition & 0 deletions cdn-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ tracing-subscriber.workspace = true
rand.workspace = true
tracing.workspace = true
clap.workspace = true
parking_lot.workspace = true
13 changes: 11 additions & 2 deletions cdn-client/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use cdn_proto::{
def::{ConnectionDef, Protocol, Scheme},
error::{Error, Result},
message::{Message, Topic},
util::AbortOnDropHandle,
};
use parking_lot::Mutex;
use tokio::{
spawn,
sync::{RwLock, Semaphore},
Expand Down Expand Up @@ -58,6 +60,9 @@ pub struct Inner<C: ConnectionDef> {
/// The semaphore to ensure only one reconnection is happening at a time
connecting_guard: Arc<Semaphore>,

/// The (optional) task that is responsible for reconnecting
reconnection_task: Arc<Mutex<Option<AbortOnDropHandle<()>>>>,

/// The keypair to use when authenticating
pub keypair: KeyPair<Scheme<C>>,

Expand Down Expand Up @@ -168,7 +173,7 @@ impl<C: ConnectionDef> Retry<C> {
if let Ok(permit) = Arc::clone(&self.inner.connecting_guard).try_acquire_owned() {
// We were the first to try reconnecting, spawn a reconnection task
let inner = self.inner.clone();
spawn(async move {
let reconnection_task = AbortOnDropHandle(spawn(async move {
let mut connection = inner.connection.write().await;

// Forever,
Expand All @@ -189,7 +194,10 @@ impl<C: ConnectionDef> Retry<C> {
}
}
drop(permit);
});
}));

// Update the reconnection task
*self.inner.reconnection_task.lock() = Some(reconnection_task);
}

// If we are in the middle of reconnecting, return an error
Expand Down Expand Up @@ -249,6 +257,7 @@ impl<C: ConnectionDef> Retry<C> {
use_local_authority,
connection: Arc::default(),
connecting_guard: Arc::from(Semaphore::const_new(1)),
reconnection_task: Arc::default(),
keypair,
subscribed_topics,
}),
Expand Down

0 comments on commit dd0596f

Please sign in to comment.