Skip to content

Commit

Permalink
if any STT service errors, clear all queued workers
Browse files Browse the repository at this point in the history
  • Loading branch information
tazz4843 committed Dec 11, 2023
1 parent 702db73 commit e5aa3ff
Showing 1 changed file with 26 additions and 2 deletions.
28 changes: 26 additions & 2 deletions scripty_stt/src/load_balancer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ impl LoadBalancer {
}

let workers = Arc::new(DashMap::new());
let (purge_tx, purge_rx) = flume::bounded(1);
for (n, addr) in peer_addresses.into_iter().enumerate() {
workers.insert(n, LoadBalancedStream::new(addr).await?);
workers.insert(n, LoadBalancedStream::new(addr, purge_tx.clone()).await?);
}
let (new_worker_tx, new_worker_rx) = flume::unbounded();
let this = Self {
Expand All @@ -95,6 +96,20 @@ impl LoadBalancer {
};
let t2 = this.clone();
tokio::spawn(t2.new_worker_background_task(new_worker_rx));
let t3 = this.clone();
tokio::spawn(async move {
loop {
if let Ok(_) = purge_rx.recv_async().await {

Check warning on line 102 in scripty_stt/src/load_balancer.rs

View workflow job for this annotation

GitHub Actions / Clippy Output

redundant pattern matching, consider using `is_ok()`

warning: redundant pattern matching, consider using `is_ok()` --> scripty_stt/src/load_balancer.rs:102:12 | 102 | if let Ok(_) = purge_rx.recv_async().await { | -------^^^^^------------------------------ help: try: `if (purge_rx.recv_async().await).is_ok()` | = note: this will change drop order of the result, as well as all temporaries = note: add `#[allow(clippy::redundant_pattern_matching)]` if this is important = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#redundant_pattern_matching = note: `#[warn(clippy::redundant_pattern_matching)]` on by default
t3.queued_workers.lock().clear();
// request the queue be refilled
if let Err(_) = t3.new_worker_tx.send_async(()).await {

Check warning on line 105 in scripty_stt/src/load_balancer.rs

View workflow job for this annotation

GitHub Actions / Clippy Output

redundant pattern matching, consider using `is_err()`

warning: redundant pattern matching, consider using `is_err()` --> scripty_stt/src/load_balancer.rs:105:13 | 105 | if let Err(_) = t3.new_worker_tx.send_async(()).await { | -------^^^^^^---------------------------------------- help: try: `if (t3.new_worker_tx.send_async(()).await).is_err()` | = note: this will change drop order of the result, as well as all temporaries = note: add `#[allow(clippy::redundant_pattern_matching)]` if this is important = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#redundant_pattern_matching
break error!(
"error sending new worker request: all client queues dropped"
);
}
}
}
});
Ok(this)
}

Expand Down Expand Up @@ -262,7 +277,10 @@ impl LoadBalancedStream {
.await
}

pub async fn new(peer_address: SocketAddr) -> Result<Self, ModelError> {
pub async fn new(
peer_address: SocketAddr,
purge_tx: flume::Sender<()>,
) -> Result<Self, ModelError> {
// open a connection to the remote
info!("trying to connect to STT service at {}", peer_address);
let peer_stream = TcpStream::connect(peer_address).await?;
Expand Down Expand Up @@ -464,6 +482,12 @@ impl LoadBalancedStream {
warn!("got error from stream pair");
wfns2.store(true, Ordering::Relaxed);

// immediately purge all queued workers as we have bad state
if let Err(_) = purge_tx.send_async(()).await {

Check warning on line 486 in scripty_stt/src/load_balancer.rs

View workflow job for this annotation

GitHub Actions / Clippy Output

redundant pattern matching, consider using `is_err()`

warning: redundant pattern matching, consider using `is_err()` --> scripty_stt/src/load_balancer.rs:486:12 | 486 | if let Err(_) = purge_tx.send_async(()).await { | -------^^^^^^-------------------------------- help: try: `if (purge_tx.send_async(()).await).is_err()` | = note: this will change drop order of the result, as well as all temporaries = note: add `#[allow(clippy::redundant_pattern_matching)]` if this is important = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#redundant_pattern_matching
error!("error sending purge request: all client queues dropped");
break;
}

// start a new connection to the server
let mut peer_stream = None;
for n in 0..=12 {
Expand Down

0 comments on commit e5aa3ff

Please sign in to comment.