From e4dff2aaae567ced3d64277bf7f2984440d77002 Mon Sep 17 00:00:00 2001 From: Gnome Date: Thu, 23 Jun 2022 13:17:50 +0100 Subject: [PATCH 1/2] Remove unnecessary poison messages --- src/driver/mod.rs | 8 ------ src/driver/tasks/disposal.rs | 7 +----- src/driver/tasks/events.rs | 26 +++++++++---------- src/driver/tasks/message/core.rs | 1 - src/driver/tasks/message/disposal.rs | 2 -- src/driver/tasks/message/mixer.rs | 7 ------ src/driver/tasks/message/udp_rx.rs | 2 -- src/driver/tasks/message/udp_tx.rs | 6 ++--- src/driver/tasks/message/ws.rs | 3 --- src/driver/tasks/mixer/mod.rs | 12 +++------ src/driver/tasks/mod.rs | 37 +++++++++++++--------------- src/driver/tasks/udp_rx.rs | 2 +- src/driver/tasks/udp_tx.rs | 8 ++---- src/driver/tasks/ws.rs | 2 +- 14 files changed, 39 insertions(+), 84 deletions(-) diff --git a/src/driver/mod.rs b/src/driver/mod.rs index 20a3f02b5..f12a41275 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -312,14 +312,6 @@ impl Default for Driver { } } -impl Drop for Driver { - /// Leaves the current connected voice channel, if connected to one, and - /// forgets all configurations relevant to this Handler. - fn drop(&mut self) { - drop(self.sender.send(CoreMessage::Poison)); - } -} - /// Future for a call to [`Driver::connect`]. /// /// This future awaits the *result* of a connection; the driver diff --git a/src/driver/tasks/disposal.rs b/src/driver/tasks/disposal.rs index b10a56f39..0b13014b0 100644 --- a/src/driver/tasks/disposal.rs +++ b/src/driver/tasks/disposal.rs @@ -9,10 +9,5 @@ use tracing::instrument; /// to prevent deadline misses. #[instrument(skip(mix_rx))] pub(crate) fn runner(mix_rx: Receiver) { - loop { - match mix_rx.recv() { - Err(_) | Ok(DisposalMessage::Poison) => break, - _ => {}, - } - } + while mix_rx.recv().is_ok() {} } diff --git a/src/driver/tasks/events.rs b/src/driver/tasks/events.rs index eaf464b93..9becd2db0 100644 --- a/src/driver/tasks/events.rs +++ b/src/driver/tasks/events.rs @@ -14,13 +14,13 @@ pub(crate) async fn runner(_interconnect: Interconnect, evt_rx: Receiver = vec![]; let mut handles: Vec = vec![]; - loop { - match evt_rx.recv_async().await { - Ok(EventMessage::AddGlobalEvent(data)) => { + while let Ok(msg) = evt_rx.recv_async().await { + match msg { + EventMessage::AddGlobalEvent(data) => { info!("Global event added."); global.add_event(data); }, - Ok(EventMessage::AddTrackEvent(i, data)) => { + EventMessage::AddTrackEvent(i, data) => { info!("Adding event to track {}.", i); let event_store = events @@ -32,7 +32,7 @@ pub(crate) async fn runner(_interconnect: Interconnect, evt_rx: Receiver { + EventMessage::FireCoreEvent(ctx) => { let ctx = ctx.to_user_context(); let evt = ctx .to_core_event() @@ -42,17 +42,17 @@ pub(crate) async fn runner(_interconnect: Interconnect, evt_rx: Receiver { + EventMessage::RemoveGlobalEvents => { global.remove_handlers(); }, - Ok(EventMessage::AddTrack(store, state, handle)) => { + EventMessage::AddTrack(store, state, handle) => { events.push(store); states.push(state); handles.push(handle); info!("Event state for track {} added", events.len()); }, - Ok(EventMessage::ChangeState(i, change)) => { + EventMessage::ChangeState(i, change) => { let max_states = states.len(); debug!( "Changing state for track {} of {}: {:?}", @@ -107,27 +107,25 @@ pub(crate) async fn runner(_interconnect: Interconnect, evt_rx: Receiver { + EventMessage::RemoveTrack(i) => { info!("Event state for track {} of {} removed.", i, events.len()); events.swap_remove(i); states.swap_remove(i); handles.swap_remove(i); }, - Ok(EventMessage::RemoveAllTracks) => { + EventMessage::RemoveAllTracks => { info!("Event state for all tracks removed."); events.clear(); states.clear(); handles.clear(); }, - Ok(EventMessage::Tick) => { + EventMessage::Tick => { // NOTE: this should fire saved up blocks of state change evts. global.tick(&mut events, &mut states, &mut handles).await; }, - Err(_) | Ok(EventMessage::Poison) => { - break; - }, + EventMessage::Poison => break, } } diff --git a/src/driver/tasks/message/core.rs b/src/driver/tasks/message/core.rs index b6145264a..b7a2e1650 100644 --- a/src/driver/tasks/message/core.rs +++ b/src/driver/tasks/message/core.rs @@ -23,7 +23,6 @@ pub enum CoreMessage { Reconnect, FullReconnect, RebuildInterconnect, - Poison, } pub struct TrackContext { diff --git a/src/driver/tasks/message/disposal.rs b/src/driver/tasks/message/disposal.rs index 3940046e7..795e5bcef 100644 --- a/src/driver/tasks/message/disposal.rs +++ b/src/driver/tasks/message/disposal.rs @@ -5,6 +5,4 @@ use crate::{driver::tasks::mixer::InternalTrack, tracks::TrackHandle}; pub enum DisposalMessage { Track(Box), Handle(TrackHandle), - - Poison, } diff --git a/src/driver/tasks/message/mixer.rs b/src/driver/tasks/message/mixer.rs index e1221c1bd..9754e6baf 100644 --- a/src/driver/tasks/message/mixer.rs +++ b/src/driver/tasks/message/mixer.rs @@ -17,13 +17,6 @@ pub struct MixerConnection { pub udp_tx: Sender, } -impl Drop for MixerConnection { - fn drop(&mut self) { - drop(self.udp_rx.send(UdpRxMessage::Poison)); - drop(self.udp_tx.send(UdpTxMessage::Poison)); - } -} - pub enum MixerMessage { AddTrack(TrackContext), SetTrack(Option), diff --git a/src/driver/tasks/message/udp_rx.rs b/src/driver/tasks/message/udp_rx.rs index 9034090ae..12a6d0c31 100644 --- a/src/driver/tasks/message/udp_rx.rs +++ b/src/driver/tasks/message/udp_rx.rs @@ -6,6 +6,4 @@ use crate::driver::Config; pub enum UdpRxMessage { SetConfig(Config), ReplaceInterconnect(Interconnect), - - Poison, } diff --git a/src/driver/tasks/message/udp_tx.rs b/src/driver/tasks/message/udp_tx.rs index d3dbf3602..16b7ad1fd 100644 --- a/src/driver/tasks/message/udp_tx.rs +++ b/src/driver/tasks/message/udp_tx.rs @@ -1,6 +1,4 @@ #![allow(missing_docs)] -pub enum UdpTxMessage { - Packet(Vec), // TODO: do something cheaper. - Poison, -} +// TODO: do something cheaper. +pub type UdpTxMessage = Vec; diff --git a/src/driver/tasks/message/ws.rs b/src/driver/tasks/message/ws.rs index 1cd7e49e6..4faf68367 100644 --- a/src/driver/tasks/message/ws.rs +++ b/src/driver/tasks/message/ws.rs @@ -3,12 +3,9 @@ use super::Interconnect; use crate::ws::WsStream; -#[allow(dead_code)] pub enum WsMessage { Ws(Box), ReplaceInterconnect(Interconnect), SetKeepalive(f64), Speaking(bool), - - Poison, } diff --git a/src/driver/tasks/mixer/mod.rs b/src/driver/tasks/mixer/mod.rs index 85762d199..b13b0a150 100644 --- a/src/driver/tasks/mixer/mod.rs +++ b/src/driver/tasks/mixer/mod.rs @@ -783,8 +783,7 @@ impl Mixer { // Test mode: send unencrypted (compressed) packets to local receiver. drop(tx.send(self.packet[..index].to_vec().into())); } else { - conn.udp_tx - .send(UdpTxMessage::Packet(self.packet[..index].to_vec()))?; + conn.udp_tx.send(self.packet[..index].to_vec())?; } #[cfg(not(test))] @@ -794,8 +793,7 @@ impl Mixer { // TODO: This is dog slow, don't do this. // Can we replace this with a shared ring buffer + semaphore? // or the BBQueue crate? - conn.udp_tx - .send(UdpTxMessage::Packet(self.packet[..index].to_vec()))?; + conn.udp_tx.send(self.packet[..index].to_vec())?; } let mut rtp = MutableRtpPacket::new(&mut self.packet[..]).expect( @@ -923,9 +921,5 @@ pub(crate) fn runner( async_handle: Handle, config: Config, ) { - let mut mixer = Mixer::new(mix_rx, async_handle, interconnect, config); - - mixer.run(); - - drop(mixer.disposer.send(DisposalMessage::Poison)); + Mixer::new(mix_rx, async_handle, interconnect, config).run(); } diff --git a/src/driver/tasks/mod.rs b/src/driver/tasks/mod.rs index 275d3a318..4176cd1f0 100644 --- a/src/driver/tasks/mod.rs +++ b/src/driver/tasks/mod.rs @@ -21,7 +21,7 @@ use crate::{ Config, ConnectionInfo, }; -use flume::{Receiver, RecvError, Sender}; +use flume::{Receiver, Sender}; use message::*; use tokio::{runtime::Handle, spawn, time::sleep as tsleep}; use tracing::{debug, instrument, trace}; @@ -70,9 +70,9 @@ async fn runner(mut config: Config, rx: Receiver, tx: Sender { + while let Ok(msg) = rx.recv_async().await { + match msg { + CoreMessage::ConnectWithResult(info, tx) => { config = if let Some(new_config) = next_config.take() { drop( interconnect @@ -98,7 +98,7 @@ async fn runner(mut config: Config, rx: Receiver, tx: Sender { + CoreMessage::RetryConnect(retry_idx) => { debug!("Retrying idx: {} (vs. {})", retry_idx, attempt_idx); if retry_idx == attempt_idx { if let Some(progress) = retrying.take() { @@ -108,7 +108,7 @@ async fn runner(mut config: Config, rx: Receiver, tx: Sender { + CoreMessage::Disconnect => { let last_conn = connection.take(); drop(interconnect.mixer.send(MixerMessage::DropConn)); drop(interconnect.mixer.send(MixerMessage::RebuildEncoder)); @@ -123,7 +123,7 @@ async fn runner(mut config: Config, rx: Receiver, tx: Sender { + CoreMessage::SignalWsClosure(ws_idx, ws_info, mut reason) => { // if idx is not a match, quash reason // (i.e., prevent users from mistakenly trying to reconnect for an *old* dead conn). // if it *is* a match, the conn needs to die! @@ -144,32 +144,32 @@ async fn runner(mut config: Config, rx: Receiver, tx: Sender { + CoreMessage::SetTrack(s) => { drop(interconnect.mixer.send(MixerMessage::SetTrack(s))); }, - Ok(CoreMessage::AddTrack(s)) => { + CoreMessage::AddTrack(s) => { drop(interconnect.mixer.send(MixerMessage::AddTrack(s))); }, - Ok(CoreMessage::SetBitrate(b)) => { + CoreMessage::SetBitrate(b) => { drop(interconnect.mixer.send(MixerMessage::SetBitrate(b))); }, - Ok(CoreMessage::SetConfig(mut new_config)) => { + CoreMessage::SetConfig(mut new_config) => { next_config = Some(new_config.clone()); new_config.make_safe(&config, connection.is_some()); drop(interconnect.mixer.send(MixerMessage::SetConfig(new_config))); }, - Ok(CoreMessage::AddEvent(evt)) => { + CoreMessage::AddEvent(evt) => { drop(interconnect.events.send(EventMessage::AddGlobalEvent(evt))); }, - Ok(CoreMessage::RemoveGlobalEvents) => { + CoreMessage::RemoveGlobalEvents => { drop(interconnect.events.send(EventMessage::RemoveGlobalEvents)); }, - Ok(CoreMessage::Mute(m)) => { + CoreMessage::Mute(m) => { drop(interconnect.mixer.send(MixerMessage::SetMute(m))); }, - Ok(CoreMessage::Reconnect) => { + CoreMessage::Reconnect => { if let Some(mut conn) = connection.take() { // try once: if interconnect, try again. // if still issue, full connect. @@ -208,7 +208,7 @@ async fn runner(mut config: Config, rx: Receiver, tx: Sender + CoreMessage::FullReconnect => if let Some(conn) = connection.take() { let info = conn.info.clone(); @@ -216,12 +216,9 @@ async fn runner(mut config: Config, rx: Receiver, tx: Sender { + CoreMessage::RebuildInterconnect => { interconnect.restart_volatile_internals(); }, - Err(RecvError::Disconnected) | Ok(CoreMessage::Poison) => { - break; - }, } } diff --git a/src/driver/tasks/udp_rx.rs b/src/driver/tasks/udp_rx.rs index bd024fc4a..c0bd3d2d1 100644 --- a/src/driver/tasks/udp_rx.rs +++ b/src/driver/tasks/udp_rx.rs @@ -259,7 +259,7 @@ impl UdpRx { Ok(UdpRxMessage::SetConfig(c)) => { self.config = c; }, - Ok(UdpRxMessage::Poison) | Err(_) => break, + Err(flume::RecvError::Disconnected) => break, } } } diff --git a/src/driver/tasks/udp_tx.rs b/src/driver/tasks/udp_tx.rs index 88b685725..7eb9e852d 100644 --- a/src/driver/tasks/udp_tx.rs +++ b/src/driver/tasks/udp_tx.rs @@ -34,16 +34,12 @@ impl UdpTx { } ka_time += UDP_KEEPALIVE_GAP; }, - Ok(Ok(UdpTxMessage::Packet(p))) => + Ok(Ok(p)) => if let Err(e) = self.udp_tx.send(&p[..]).await { error!("Fatal UDP packet send error: {:?}.", e); break; }, - Ok(Err(e)) => { - error!("Fatal UDP packet receive error: {:?}.", e); - break; - }, - Ok(Ok(UdpTxMessage::Poison)) => { + Ok(Err(flume::RecvError::Disconnected)) => { break; }, } diff --git a/src/driver/tasks/ws.rs b/src/driver/tasks/ws.rs index c9137f361..c7a3626a3 100644 --- a/src/driver/tasks/ws.rs +++ b/src/driver/tasks/ws.rs @@ -140,7 +140,7 @@ impl AuxNetwork { } } }, - Err(_) | Ok(WsMessage::Poison) => { + Err(flume::RecvError::Disconnected) => { break; }, } From 63d2e14376361eba6c0923e7eb3bab5d082d02c2 Mon Sep 17 00:00:00 2001 From: Gnome Date: Fri, 24 Jun 2022 18:49:05 +0100 Subject: [PATCH 2/2] Re-add Poison for CoreMessage --- src/driver/mod.rs | 8 ++++++++ src/driver/tasks/message/core.rs | 1 + src/driver/tasks/mod.rs | 1 + 3 files changed, 10 insertions(+) diff --git a/src/driver/mod.rs b/src/driver/mod.rs index f12a41275..20a3f02b5 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -312,6 +312,14 @@ impl Default for Driver { } } +impl Drop for Driver { + /// Leaves the current connected voice channel, if connected to one, and + /// forgets all configurations relevant to this Handler. + fn drop(&mut self) { + drop(self.sender.send(CoreMessage::Poison)); + } +} + /// Future for a call to [`Driver::connect`]. /// /// This future awaits the *result* of a connection; the driver diff --git a/src/driver/tasks/message/core.rs b/src/driver/tasks/message/core.rs index b7a2e1650..b6145264a 100644 --- a/src/driver/tasks/message/core.rs +++ b/src/driver/tasks/message/core.rs @@ -23,6 +23,7 @@ pub enum CoreMessage { Reconnect, FullReconnect, RebuildInterconnect, + Poison, } pub struct TrackContext { diff --git a/src/driver/tasks/mod.rs b/src/driver/tasks/mod.rs index 4176cd1f0..945c6e222 100644 --- a/src/driver/tasks/mod.rs +++ b/src/driver/tasks/mod.rs @@ -219,6 +219,7 @@ async fn runner(mut config: Config, rx: Receiver, tx: Sender { interconnect.restart_volatile_internals(); }, + CoreMessage::Poison => break, } }