Skip to content

Commit

Permalink
test to implement wait_for_dc_buffer_low
Browse files Browse the repository at this point in the history
  • Loading branch information
typester committed Jan 10, 2025
1 parent e502e30 commit 7a4be4f
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 7 deletions.
6 changes: 6 additions & 0 deletions livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,12 @@ impl RoomSession {
EngineEvent::DataStreamChunk { chunk, participant_identity } => {
self.handle_data_stream_chunk(chunk, participant_identity);
}
EngineEvent::DataChannelBufferedAmountChanged { buffered_amount } => {
let local_participant = self.local_participant.clone();
livekit_runtime::spawn(async move {
local_participant.handle_dc_buffered_amount_changed(buffered_amount).await;
});
},
_ => {}
}

Expand Down
56 changes: 55 additions & 1 deletion livekit/src/room/participant/local_participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{collections::HashMap, fmt::Debug, pin::Pin, sync::Arc, time::Duration};
use std::{
collections::HashMap,
fmt::Debug,
pin::Pin,
sync::{atomic::AtomicU64, Arc},
time::Duration,
};

use super::{ConnectionQuality, ParticipantInner, ParticipantKind};
use crate::{
Expand Down Expand Up @@ -71,6 +77,8 @@ struct LocalInfo {
events: LocalEvents,
encryption_type: EncryptionType,
rpc_state: Mutex<RpcState>,
dc_buffered_amount_low_threshold: AtomicU64,
dc_buffered_amount_low_tx: Mutex<Option<oneshot::Sender<()>>>,
}

#[derive(Clone)]
Expand Down Expand Up @@ -106,6 +114,8 @@ impl LocalParticipant {
events: LocalEvents::default(),
encryption_type,
rpc_state: Mutex::new(RpcState::new()),
dc_buffered_amount_low_threshold: Default::default(),
dc_buffered_amount_low_tx: Default::default(),
}),
}
}
Expand Down Expand Up @@ -471,6 +481,50 @@ impl LocalParticipant {
self.inner.rtc_engine.publish_data(&data, kind).await.map_err(Into::into)
}

pub async fn wait_for_dc_buffer_low(&self) -> RoomResult<()> {
let threshold =
self.local.dc_buffered_amount_low_threshold.load(std::sync::atomic::Ordering::Relaxed);
let amount =
self.inner.rtc_engine.session().data_channel_buffered_amount(DataPacketKind::Reliable);

if amount <= threshold {
return Ok(());
}

// wait buffered amount becam low
let rx = {
let (tx, rx) = oneshot::channel();

let mut low_tx = self.local.dc_buffered_amount_low_tx.lock();
if low_tx.is_some() {
return Err(RoomError::Request {
reason: Reason::NotAllowed,
message: "Another wait request is already in progress.".into(),
});
}
*low_tx = Some(tx);
rx
};

match rx.await {
Ok(()) => Ok(()),
Err(err) => Err(RoomError::Internal(format!("failed to wait: {}", err))),
}
}

pub(crate) async fn handle_dc_buffered_amount_changed(&self, buffered_amount: u64) {
let threshold =
self.local.dc_buffered_amount_low_threshold.load(std::sync::atomic::Ordering::Relaxed);
if buffered_amount > threshold {
return;
}
let Some(tx) = self.local.dc_buffered_amount_low_tx.lock().take() else {
return;
};
log::debug!("return wait_for_dc_buffer_low: buffered={}, threshold={}", buffered_amount, threshold);
let _ = tx.send(());
}

pub async fn publish_transcription(&self, packet: Transcription) -> RoomResult<()> {
let segments: Vec<proto::TranscriptionSegment> = packet
.segments
Expand Down
1 change: 1 addition & 0 deletions livekit/src/room/participant/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::{collections::HashMap, fmt::Debug, sync::Arc};
use livekit_protocol as proto;
use livekit_protocol::enum_dispatch;
use parking_lot::{Mutex, RwLock};
use tokio::sync::oneshot;

use crate::{prelude::*, rtc_engine::RtcEngine};

Expand Down
8 changes: 8 additions & 0 deletions livekit/src/rtc_engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ pub enum EngineEvent {
chunk: proto::data_stream::Chunk,
participant_identity: String,
},
DataChannelBufferedAmountChanged {
buffered_amount: u64,
},
}

/// Represents a running RtcSession with the ability to close the session
Expand Down Expand Up @@ -542,6 +545,11 @@ impl EngineInner {
.engine_tx
.send(EngineEvent::DataStreamChunk { chunk, participant_identity });
}
SessionEvent::DataChannelBufferedAmountChanged { buffered_amount } => {
let _ = self
.engine_tx
.send(EngineEvent::DataChannelBufferedAmountChanged { buffered_amount });
}
}
Ok(())
}
Expand Down
33 changes: 30 additions & 3 deletions livekit/src/rtc_engine/rtc_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use livekit_protocol as proto;
use tokio::sync::mpsc;

use super::peer_transport::PeerTransport;
use crate::rtc_engine::peer_transport::OnOfferCreated;
use crate::{rtc_engine::peer_transport::OnOfferCreated, DataPacketKind};

pub type RtcEmitter = mpsc::UnboundedSender<RtcEvent>;
pub type RtcEvents = mpsc::UnboundedReceiver<RtcEvent>;
Expand Down Expand Up @@ -51,6 +51,14 @@ pub enum RtcEvent {
data: Vec<u8>,
binary: bool,
},
DataChannelStateChange {
state: DataChannelState,
},
DataChannelBufferedAmountChange {
sent: u64,
amount: u64,
kind: DataPacketKind,
},
}

/// Handlers used to forward events to a channel
Expand Down Expand Up @@ -141,6 +149,25 @@ fn on_message(emitter: RtcEmitter) -> rtc::data_channel::OnMessage {
})
}

pub fn forward_dc_events(dc: &mut DataChannel, rtc_emitter: RtcEmitter) {
dc.on_message(Some(on_message(rtc_emitter)));
fn on_state_change(emitter: RtcEmitter) -> rtc::data_channel::OnStateChange {
Box::new(move |state| {
let _ = emitter.send(RtcEvent::DataChannelStateChange { state });
})
}

fn on_buffered_amount_change(
emitter: RtcEmitter,
dc: DataChannel,
kind: DataPacketKind,
) -> rtc::data_channel::OnBufferedAmountChange {
Box::new(move |sent| {
let amount = dc.buffered_amount();
let _ = emitter.send(RtcEvent::DataChannelBufferedAmountChange { sent, amount, kind });
})
}

pub fn forward_dc_events(dc: &mut DataChannel, kind: DataPacketKind, rtc_emitter: RtcEmitter) {
dc.on_message(Some(on_message(rtc_emitter.clone())));
dc.on_state_change(Some(on_state_change(rtc_emitter.clone())));
dc.on_buffered_amount_change(Some(on_buffered_amount_change(rtc_emitter, dc.clone(), kind)));
}
38 changes: 35 additions & 3 deletions livekit/src/rtc_engine/rtc_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::{
fmt::Debug,
ops::Not,
sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
time::Duration,
Expand Down Expand Up @@ -143,6 +143,9 @@ pub enum SessionEvent {
chunk: proto::data_stream::Chunk,
participant_identity: String,
},
DataChannelBufferedAmountChanged {
buffered_amount: u64,
},
}

#[derive(Serialize, Deserialize)]
Expand All @@ -166,7 +169,9 @@ struct SessionInner {
// Publisher data channels
// used to send data to other participants (The SFU forwards the messages)
lossy_dc: DataChannel,
lossy_dc_buffered_amount: AtomicU64,
reliable_dc: DataChannel,
reliable_dc_buffered_amount: AtomicU64,

// Keep a strong reference to the subscriber datachannels,
// so we can receive data from other participants
Expand Down Expand Up @@ -247,8 +252,8 @@ impl RtcSession {
// Forward events received inside the signaling thread to our rtc channel
rtc_events::forward_pc_events(&mut publisher_pc, rtc_emitter.clone());
rtc_events::forward_pc_events(&mut subscriber_pc, rtc_emitter.clone());
rtc_events::forward_dc_events(&mut lossy_dc, rtc_emitter.clone());
rtc_events::forward_dc_events(&mut reliable_dc, rtc_emitter);
rtc_events::forward_dc_events(&mut lossy_dc, DataPacketKind::Lossy, rtc_emitter.clone());
rtc_events::forward_dc_events(&mut reliable_dc, DataPacketKind::Reliable, rtc_emitter);

let (close_tx, close_rx) = watch::channel(false);
let inner = Arc::new(SessionInner {
Expand All @@ -258,7 +263,9 @@ impl RtcSession {
subscriber_pc,
pending_tracks: Default::default(),
lossy_dc,
lossy_dc_buffered_amount: Default::default(),
reliable_dc,
reliable_dc_buffered_amount: Default::default(),
sub_lossy_dc: Mutex::new(None),
sub_reliable_dc: Mutex::new(None),
closed: Default::default(),
Expand Down Expand Up @@ -370,6 +377,15 @@ impl RtcSession {
self.inner.data_channel(target, kind)
}

pub fn data_channel_buffered_amount(&self, kind: DataPacketKind) -> u64 {
match kind {
DataPacketKind::Lossy => self.inner.lossy_dc_buffered_amount.load(Ordering::Relaxed),
DataPacketKind::Reliable => {
self.inner.reliable_dc_buffered_amount.load(Ordering::Relaxed)
}
}
}

pub async fn get_response(&self, request_id: u32) -> proto::RequestResponse {
self.inner.get_response(request_id).await
}
Expand Down Expand Up @@ -747,6 +763,22 @@ impl SessionInner {
}
}
}
RtcEvent::DataChannelStateChange { state } => {
}
RtcEvent::DataChannelBufferedAmountChange { sent: _, amount, kind } => {
match kind {
DataPacketKind::Lossy => {
self.lossy_dc_buffered_amount.store(amount, Ordering::Relaxed)
}
DataPacketKind::Reliable => {
self.reliable_dc_buffered_amount.store(amount, Ordering::Relaxed);
// Only reliable dc is needed this event at this time
let _ = self.emitter.send(SessionEvent::DataChannelBufferedAmountChanged {
buffered_amount: amount,
});
}
}
}
}

Ok(())
Expand Down

0 comments on commit 7a4be4f

Please sign in to comment.