Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unnecessary poison messages #6

Merged
merged 2 commits into from
Jun 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions src/driver/tasks/disposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,5 @@ use tracing::instrument;
/// to prevent deadline misses.
#[instrument(skip(mix_rx))]
pub(crate) fn runner(mix_rx: Receiver<DisposalMessage>) {
loop {
match mix_rx.recv() {
Err(_) | Ok(DisposalMessage::Poison) => break,
_ => {},
}
}
while mix_rx.recv().is_ok() {}
}
26 changes: 12 additions & 14 deletions src/driver/tasks/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ pub(crate) async fn runner(_interconnect: Interconnect, evt_rx: Receiver<EventMe
let mut states: Vec<TrackState> = vec![];
let mut handles: Vec<TrackHandle> = vec![];

loop {
match evt_rx.recv_async().await {
Ok(EventMessage::AddGlobalEvent(data)) => {
while let Ok(msg) = evt_rx.recv_async().await {
match msg {
EventMessage::AddGlobalEvent(data) => {
info!("Global event added.");
global.add_event(data);
},
Ok(EventMessage::AddTrackEvent(i, data)) => {
EventMessage::AddTrackEvent(i, data) => {
info!("Adding event to track {}.", i);

let event_store = events
Expand All @@ -32,7 +32,7 @@ pub(crate) async fn runner(_interconnect: Interconnect, evt_rx: Receiver<EventMe

event_store.add_event(data, state.position);
},
Ok(EventMessage::FireCoreEvent(ctx)) => {
EventMessage::FireCoreEvent(ctx) => {
let ctx = ctx.to_user_context();
let evt = ctx
.to_core_event()
Expand All @@ -42,17 +42,17 @@ pub(crate) async fn runner(_interconnect: Interconnect, evt_rx: Receiver<EventMe

global.fire_core_event(evt, ctx).await;
},
Ok(EventMessage::RemoveGlobalEvents) => {
EventMessage::RemoveGlobalEvents => {
global.remove_handlers();
},
Ok(EventMessage::AddTrack(store, state, handle)) => {
EventMessage::AddTrack(store, state, handle) => {
events.push(store);
states.push(state);
handles.push(handle);

info!("Event state for track {} added", events.len());
},
Ok(EventMessage::ChangeState(i, change)) => {
EventMessage::ChangeState(i, change) => {
let max_states = states.len();
debug!(
"Changing state for track {} of {}: {:?}",
Expand Down Expand Up @@ -107,27 +107,25 @@ pub(crate) async fn runner(_interconnect: Interconnect, evt_rx: Receiver<EventMe
},
}
},
Ok(EventMessage::RemoveTrack(i)) => {
EventMessage::RemoveTrack(i) => {
info!("Event state for track {} of {} removed.", i, events.len());

events.swap_remove(i);
states.swap_remove(i);
handles.swap_remove(i);
},
Ok(EventMessage::RemoveAllTracks) => {
EventMessage::RemoveAllTracks => {
info!("Event state for all tracks removed.");

events.clear();
states.clear();
handles.clear();
},
Ok(EventMessage::Tick) => {
EventMessage::Tick => {
// NOTE: this should fire saved up blocks of state change evts.
global.tick(&mut events, &mut states, &mut handles).await;
},
Err(_) | Ok(EventMessage::Poison) => {
break;
},
EventMessage::Poison => break,
}
}

Expand Down
2 changes: 0 additions & 2 deletions src/driver/tasks/message/disposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,4 @@ use crate::{driver::tasks::mixer::InternalTrack, tracks::TrackHandle};
pub enum DisposalMessage {
Track(Box<InternalTrack>),
Handle(TrackHandle),

Poison,
}
7 changes: 0 additions & 7 deletions src/driver/tasks/message/mixer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,6 @@ pub struct MixerConnection {
pub udp_tx: Sender<UdpTxMessage>,
}

impl Drop for MixerConnection {
fn drop(&mut self) {
drop(self.udp_rx.send(UdpRxMessage::Poison));
drop(self.udp_tx.send(UdpTxMessage::Poison));
}
}

pub enum MixerMessage {
AddTrack(TrackContext),
SetTrack(Option<TrackContext>),
Expand Down
2 changes: 0 additions & 2 deletions src/driver/tasks/message/udp_rx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,4 @@ use crate::driver::Config;
pub enum UdpRxMessage {
SetConfig(Config),
ReplaceInterconnect(Interconnect),

Poison,
}
6 changes: 2 additions & 4 deletions src/driver/tasks/message/udp_tx.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
#![allow(missing_docs)]

pub enum UdpTxMessage {
Packet(Vec<u8>), // TODO: do something cheaper.
Poison,
}
// TODO: do something cheaper.
pub type UdpTxMessage = Vec<u8>;
3 changes: 0 additions & 3 deletions src/driver/tasks/message/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@
use super::Interconnect;
use crate::ws::WsStream;

#[allow(dead_code)]
pub enum WsMessage {
Ws(Box<WsStream>),
ReplaceInterconnect(Interconnect),
SetKeepalive(f64),
Speaking(bool),

Poison,
}
12 changes: 3 additions & 9 deletions src/driver/tasks/mixer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -783,8 +783,7 @@ impl Mixer {
// Test mode: send unencrypted (compressed) packets to local receiver.
drop(tx.send(self.packet[..index].to_vec().into()));
} else {
conn.udp_tx
.send(UdpTxMessage::Packet(self.packet[..index].to_vec()))?;
conn.udp_tx.send(self.packet[..index].to_vec())?;
}

#[cfg(not(test))]
Expand All @@ -794,8 +793,7 @@ impl Mixer {
// TODO: This is dog slow, don't do this.
// Can we replace this with a shared ring buffer + semaphore?
// or the BBQueue crate?
conn.udp_tx
.send(UdpTxMessage::Packet(self.packet[..index].to_vec()))?;
conn.udp_tx.send(self.packet[..index].to_vec())?;
}

let mut rtp = MutableRtpPacket::new(&mut self.packet[..]).expect(
Expand Down Expand Up @@ -923,9 +921,5 @@ pub(crate) fn runner(
async_handle: Handle,
config: Config,
) {
let mut mixer = Mixer::new(mix_rx, async_handle, interconnect, config);

mixer.run();

drop(mixer.disposer.send(DisposalMessage::Poison));
Mixer::new(mix_rx, async_handle, interconnect, config).run();
}
38 changes: 18 additions & 20 deletions src/driver/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
Config,
ConnectionInfo,
};
use flume::{Receiver, RecvError, Sender};
use flume::{Receiver, Sender};
use message::*;
use tokio::{runtime::Handle, spawn, time::sleep as tsleep};
use tracing::{debug, instrument, trace};
Expand Down Expand Up @@ -70,9 +70,9 @@ async fn runner(mut config: Config, rx: Receiver<CoreMessage>, tx: Sender<CoreMe
let mut retrying = None;
let mut attempt_idx = 0;

loop {
match rx.recv_async().await {
Ok(CoreMessage::ConnectWithResult(info, tx)) => {
while let Ok(msg) = rx.recv_async().await {
match msg {
CoreMessage::ConnectWithResult(info, tx) => {
config = if let Some(new_config) = next_config.take() {
drop(
interconnect
Expand All @@ -98,7 +98,7 @@ async fn runner(mut config: Config, rx: Receiver<CoreMessage>, tx: Sender<CoreMe
drop(tx.send(Ok(())));
}
},
Ok(CoreMessage::RetryConnect(retry_idx)) => {
CoreMessage::RetryConnect(retry_idx) => {
debug!("Retrying idx: {} (vs. {})", retry_idx, attempt_idx);
if retry_idx == attempt_idx {
if let Some(progress) = retrying.take() {
Expand All @@ -108,7 +108,7 @@ async fn runner(mut config: Config, rx: Receiver<CoreMessage>, tx: Sender<CoreMe
}
}
},
Ok(CoreMessage::Disconnect) => {
CoreMessage::Disconnect => {
let last_conn = connection.take();
drop(interconnect.mixer.send(MixerMessage::DropConn));
drop(interconnect.mixer.send(MixerMessage::RebuildEncoder));
Expand All @@ -123,7 +123,7 @@ async fn runner(mut config: Config, rx: Receiver<CoreMessage>, tx: Sender<CoreMe
)));
}
},
Ok(CoreMessage::SignalWsClosure(ws_idx, ws_info, mut reason)) => {
CoreMessage::SignalWsClosure(ws_idx, ws_info, mut reason) => {
// if idx is not a match, quash reason
// (i.e., prevent users from mistakenly trying to reconnect for an *old* dead conn).
// if it *is* a match, the conn needs to die!
Expand All @@ -144,32 +144,32 @@ async fn runner(mut config: Config, rx: Receiver<CoreMessage>, tx: Sender<CoreMe
}),
)));
},
Ok(CoreMessage::SetTrack(s)) => {
CoreMessage::SetTrack(s) => {
drop(interconnect.mixer.send(MixerMessage::SetTrack(s)));
},
Ok(CoreMessage::AddTrack(s)) => {
CoreMessage::AddTrack(s) => {
drop(interconnect.mixer.send(MixerMessage::AddTrack(s)));
},
Ok(CoreMessage::SetBitrate(b)) => {
CoreMessage::SetBitrate(b) => {
drop(interconnect.mixer.send(MixerMessage::SetBitrate(b)));
},
Ok(CoreMessage::SetConfig(mut new_config)) => {
CoreMessage::SetConfig(mut new_config) => {
next_config = Some(new_config.clone());

new_config.make_safe(&config, connection.is_some());

drop(interconnect.mixer.send(MixerMessage::SetConfig(new_config)));
},
Ok(CoreMessage::AddEvent(evt)) => {
CoreMessage::AddEvent(evt) => {
drop(interconnect.events.send(EventMessage::AddGlobalEvent(evt)));
},
Ok(CoreMessage::RemoveGlobalEvents) => {
CoreMessage::RemoveGlobalEvents => {
drop(interconnect.events.send(EventMessage::RemoveGlobalEvents));
},
Ok(CoreMessage::Mute(m)) => {
CoreMessage::Mute(m) => {
drop(interconnect.mixer.send(MixerMessage::SetMute(m)));
},
Ok(CoreMessage::Reconnect) => {
CoreMessage::Reconnect => {
if let Some(mut conn) = connection.take() {
// try once: if interconnect, try again.
// if still issue, full connect.
Expand Down Expand Up @@ -208,20 +208,18 @@ async fn runner(mut config: Config, rx: Receiver<CoreMessage>, tx: Sender<CoreMe
}
}
},
Ok(CoreMessage::FullReconnect) =>
CoreMessage::FullReconnect =>
if let Some(conn) = connection.take() {
let info = conn.info.clone();

connection = ConnectionRetryData::reconnect(info, &mut attempt_idx)
.attempt(&mut retrying, &interconnect, &config)
.await;
},
Ok(CoreMessage::RebuildInterconnect) => {
CoreMessage::RebuildInterconnect => {
interconnect.restart_volatile_internals();
},
Err(RecvError::Disconnected) | Ok(CoreMessage::Poison) => {
break;
},
CoreMessage::Poison => break,
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/driver/tasks/udp_rx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ impl UdpRx {
Ok(UdpRxMessage::SetConfig(c)) => {
self.config = c;
},
Ok(UdpRxMessage::Poison) | Err(_) => break,
Err(flume::RecvError::Disconnected) => break,
}
}
}
Expand Down
8 changes: 2 additions & 6 deletions src/driver/tasks/udp_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,12 @@ impl UdpTx {
}
ka_time += UDP_KEEPALIVE_GAP;
},
Ok(Ok(UdpTxMessage::Packet(p))) =>
Ok(Ok(p)) =>
if let Err(e) = self.udp_tx.send(&p[..]).await {
error!("Fatal UDP packet send error: {:?}.", e);
break;
},
Ok(Err(e)) => {
error!("Fatal UDP packet receive error: {:?}.", e);
break;
},
Ok(Ok(UdpTxMessage::Poison)) => {
Ok(Err(flume::RecvError::Disconnected)) => {
break;
},
}
Expand Down
2 changes: 1 addition & 1 deletion src/driver/tasks/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl AuxNetwork {
}
}
},
Err(_) | Ok(WsMessage::Poison) => {
Err(flume::RecvError::Disconnected) => {
break;
},
}
Expand Down