From 26c9c9117c5c71fc0a3d654ad4cef70f60beb878 Mon Sep 17 00:00:00 2001 From: Kyle Simpson Date: Sun, 8 Nov 2020 21:28:40 +0000 Subject: [PATCH] Handle Voice close codes, prevent Songbird spinning WS threads (#1068) Voice `CloseCode`s now map to a type rather than a collection of constants. Correct close code handling in this way terminates the websocket task when there is no likelihood of resuming, which was causing leftover tasks to spin at the `tokio::select` in some circumstances (i.e., ::leave, which keeps the `Driver` alive). --- src/driver/tasks/ws.rs | 39 +++++++++++++++++++++++++++++++++------ 1 file changed, 33 insertions(+), 6 deletions(-) diff --git a/src/driver/tasks/ws.rs b/src/driver/tasks/ws.rs index 6f9813c39..9b407f3ea 100644 --- a/src/driver/tasks/ws.rs +++ b/src/driver/tasks/ws.rs @@ -1,13 +1,16 @@ -use super::{error::Result, message::*}; +use super::message::*; use crate::{ events::CoreContext, model::{ payload::{Heartbeat, Speaking}, + CloseCode as VoiceCloseCode, Event as GatewayEvent, + FromPrimitive, SpeakingState, }, ws::{Error as WsError, ReceiverExt, SenderExt, WsStream}, }; +use async_tungstenite::tungstenite::protocol::frame::coding::CloseCode; use flume::Receiver; use rand::random; use std::time::Duration; @@ -52,6 +55,7 @@ impl AuxNetwork { loop { let mut ws_error = false; + let mut should_reconnect = false; let hb = time::delay_until(next_heartbeat); @@ -59,7 +63,7 @@ impl AuxNetwork { _ = hb => { ws_error = match self.send_heartbeat().await { Err(e) => { - error!("Heartbeat send failure {:?}.", e); + should_reconnect = ws_error_is_not_final(e); true }, _ => false, @@ -73,7 +77,7 @@ impl AuxNetwork { false }, Err(e) => { - error!("Error processing ws {:?}.", e); + should_reconnect = ws_error_is_not_final(e); true }, Ok(Some(msg)) => { @@ -113,7 +117,7 @@ impl AuxNetwork { ws_error |= match ssu_status { Err(e) => { - error!("Issue sending speaking update {:?}.", e); + should_reconnect = ws_error_is_not_final(e); true }, _ => false, @@ -128,8 +132,13 @@ impl AuxNetwork { } if ws_error { - let _ = interconnect.core.send(CoreMessage::Reconnect); self.dont_send = true; + + if should_reconnect { + let _ = interconnect.core.send(CoreMessage::Reconnect); + } else { + break; + } } } } @@ -138,7 +147,7 @@ impl AuxNetwork { Instant::now() + self.heartbeat_interval } - async fn send_heartbeat(&mut self) -> Result<()> { + async fn send_heartbeat(&mut self) -> Result<(), WsError> { let nonce = random::(); self.last_heartbeat_nonce = Some(nonce); @@ -203,3 +212,21 @@ pub(crate) async fn runner( aux.run(&mut interconnect).await; info!("WS thread finished."); } + +fn ws_error_is_not_final(err: WsError) -> bool { + match err { + WsError::WsClosed(Some(frame)) => match frame.code { + CloseCode::Library(l) => + if let Some(code) = VoiceCloseCode::from_u16(l) { + code.should_resume() + } else { + true + }, + _ => true, + }, + e => { + error!("Error sending/receiving ws {:?}.", e); + true + }, + } +}