Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Driver: Fix transition to Live on connect #222

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions src/driver/scheduler/idle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,18 @@ impl Idle {
self.tasks.insert(id, task);
},
Ok(SchedulerMessage::Do(id, mix_msg)) => {
let now_live = mix_msg.is_mixer_now_live();
let maybe_live = mix_msg.is_mixer_maybe_live();
if let Some(task) = self.tasks.get_mut(&id) {
match task.handle_message(mix_msg) {
Ok(false) if now_live => {
let task = self.tasks.remove(&id).unwrap();
self.schedule_mixer(task, id, None);
Ok(false) if maybe_live => {
if task.mixer.tracks.is_empty() {
// No tracks, likely due to SetConn.
// Recreate message forwarding task.
task.spawn_forwarder(self.tx.clone(), id);
} else {
let task = self.tasks.remove(&id).unwrap();
self.schedule_mixer(task, id, None);
}
},
Ok(false) => {},
Ok(true) | Err(()) => self.to_cull.push(id),
Expand Down
5 changes: 3 additions & 2 deletions src/driver/scheduler/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl ParkedMixer {
_ = kill_rx.recv_async() => break,
msg = remote_rx.recv_async() => {
let exit = if let Ok(msg) = msg {
let remove_self = msg.is_mixer_now_live();
let remove_self = msg.is_mixer_maybe_live();
tx.send_async(SchedulerMessage::Do(id, msg)).await.is_err() || remove_self
} else {
true
Expand All @@ -135,7 +135,8 @@ impl ParkedMixer {
pub fn handle_message(&mut self, msg: MixerMessage) -> Result<bool, ()> {
match msg {
MixerMessage::SetConn(conn, ssrc) => {
// Overridden because
// Overridden because payload-specific fields are carried
// externally on `ParkedMixer`.
self.ssrc = ssrc;
self.rtp_sequence = random::<u16>();
self.rtp_timestamp = random::<u32>();
Expand Down
7 changes: 5 additions & 2 deletions src/driver/tasks/message/mixer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,11 @@ pub enum MixerMessage {
}

impl MixerMessage {
pub fn is_mixer_now_live(&self) -> bool {
matches!(self, Self::AddTrack(_) | Self::SetTrack(Some(_)))
pub fn is_mixer_maybe_live(&self) -> bool {
matches!(
self,
Self::AddTrack(_) | Self::SetTrack(Some(_)) | Self::SetConn(..)
)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/driver/tasks/udp_rx/ssrc_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ impl SsrcState {
let mut out = vec![0; self.decode_size.len()];

for _ in 0..missed_packets {
let missing_frame: Option<OpusPacket> = None;
let missing_frame: Option<OpusPacket<'_>> = None;
let dest_samples = (&mut out[..])
.try_into()
.expect("Decode logic will cap decode buffer size at i32::MAX.");
Expand Down