Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make publish_data wait until the DataChannel's bufferedAmount becomes low. #545

Merged
merged 23 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
e502e30
expose buffered_amount method to Rust
typester Jan 9, 2025
7a4be4f
test to implement wait_for_dc_buffer_low
typester Jan 10, 2025
ee64923
remove wait_for_low function, add functionality to wait it in publish…
typester Jan 13, 2025
82d6298
test FFI implementation
typester Jan 13, 2025
fb93565
add callback
typester Jan 13, 2025
cb4e34c
revert unused changes
typester Jan 13, 2025
3a2fb5e
not necessary to make this async
typester Jan 14, 2025
6e0cd81
update lock
typester Jan 14, 2025
8887378
add nanpa changeset
typester Jan 14, 2025
a69387b
create dc_task for more reliable data publishing
typester Jan 15, 2025
d9cb564
change get/set dc buffered_amount_low_threshold FFI functions to supp…
typester Jan 15, 2025
5c7731e
fmt
typester Jan 15, 2025
ff09a80
add logs if buffer amount become unexpected value
typester Jan 15, 2025
5a3773d
Merge remote-tracking branch 'origin/main' into typester/data-stream
typester Jan 16, 2025
ab0bc9c
set default threshold to 2MB
typester Jan 16, 2025
08f5f5c
fmt
typester Jan 16, 2025
0aad6d1
ignore error here
typester Jan 16, 2025
ca69743
add buffered_amount_low_threshold in RoomInfo
typester Jan 16, 2025
8e5e9d8
remove Get ffi function for dc buffered_low_threshold, instead, add i…
typester Jan 16, 2025
1a42a4f
update changeset
typester Jan 16, 2025
6c73254
flatten DataChannelOptions in protobuf
typester Jan 16, 2025
88a623e
fmt
typester Jan 16, 2025
ae7d1df
Merge remote-tracking branch 'origin/main' into typester/data-stream
typester Jan 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .nanpa/dc-buffered-amount-low-threshold.kdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
patch type="added" package="libwebrtc" "Expose DataChannel.bufferedAmount property"
patch type="fixed" package="livekit" "Wait for the buffered amount to become low before sending data during publish_data for Reliable Data Channel"
patch type="added" package="livekit" "Add an API to set buffer_amount_low_threshold for DataChannel"
patch type="added" package="livekit" "Update RoomInfo to contain buffer_amount_low_threshold for DataChannel"
patch type="added" package="livekit-ffi" "Add an API to set buffer_amount_low_threshold for DataChannel"
patch type="added" package="livekit-ffi" "Update RoomInfo to contain buffer_amount_low_threshold for DataChannel"
4 changes: 4 additions & 0 deletions libwebrtc/src/data_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ impl DataChannel {
self.handle.close()
}

pub fn buffered_amount(&self) -> u64 {
self.handle.buffered_amount()
}

pub fn on_state_change(&self, callback: Option<OnStateChange>) {
self.handle.on_state_change(callback)
}
Expand Down
4 changes: 4 additions & 0 deletions libwebrtc/src/native/data_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ impl DataChannel {
self.sys_handle.close();
}

pub fn buffered_amount(&self) -> u64 {
self.sys_handle.buffered_amount()
}

pub fn on_state_change(&self, handler: Option<OnStateChange>) {
*self.observer.state_change_handler.lock() = handler;
}
Expand Down
6 changes: 6 additions & 0 deletions livekit-ffi/protocol/ffi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ message FfiRequest {
SendStreamHeaderRequest send_stream_header = 44;
SendStreamChunkRequest send_stream_chunk = 45;
SendStreamTrailerRequest send_stream_trailer = 46;

// Data Channel
SetDataChannelBufferedAmountLowThresholdRequest set_data_channel_buffered_amount_low_threshold = 47;
}
}

Expand Down Expand Up @@ -178,6 +181,9 @@ message FfiResponse {
SendStreamHeaderResponse send_stream_header = 43;
SendStreamChunkResponse send_stream_chunk = 44;
SendStreamTrailerResponse send_stream_trailer = 45;

// Data Channel
SetDataChannelBufferedAmountLowThresholdResponse set_data_channel_buffered_amount_low_threshold = 46;
}
}

Expand Down
17 changes: 17 additions & 0 deletions livekit-ffi/protocol/room.proto
Original file line number Diff line number Diff line change
Expand Up @@ -370,13 +370,16 @@ message RoomEvent {
DataStreamHeaderReceived stream_header_received = 30;
DataStreamChunkReceived stream_chunk_received = 31;
DataStreamTrailerReceived stream_trailer_received = 32;
DataChannelBufferedAmountLowThresholdChanged data_channel_low_threshold_changed = 33;
}
}

message RoomInfo {
optional string sid = 1;
required string name = 2;
required string metadata = 3;
required uint64 lossy_dc_buffered_amount_low_threshold = 4;
required uint64 reliable_dc_buffered_amount_low_threshold = 5;
}

message OwnedRoom {
Expand Down Expand Up @@ -647,3 +650,17 @@ message SendStreamTrailerCallback {
required uint64 async_id = 1;
optional string error = 2;
}

message SetDataChannelBufferedAmountLowThresholdRequest {
required uint64 local_participant_handle = 1;
required uint64 threshold = 2;
required DataPacketKind kind = 3;
}

message SetDataChannelBufferedAmountLowThresholdResponse {
}

message DataChannelBufferedAmountLowThresholdChanged {
required DataPacketKind kind = 1;
required uint64 threshold = 2;
}
6 changes: 6 additions & 0 deletions livekit-ffi/src/conversion/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,12 @@ impl From<&FfiRoom> for proto::RoomInfo {
sid: room.maybe_sid().map(|x| x.to_string()),
name: room.name(),
metadata: room.metadata(),
lossy_dc_buffered_amount_low_threshold: room
.data_channel_options(DataPacketKind::Lossy)
.buffered_amount_low_threshold,
reliable_dc_buffered_amount_low_threshold: room
.data_channel_options(DataPacketKind::Reliable)
.buffered_amount_low_threshold,
}
}
}
Expand Down
41 changes: 37 additions & 4 deletions livekit-ffi/src/livekit.proto.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
// @generated
// This file is @generated by prost-build.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FrameCryptor {
Expand Down Expand Up @@ -2635,7 +2634,7 @@ pub struct OwnedBuffer {
pub struct RoomEvent {
#[prost(uint64, required, tag="1")]
pub room_handle: u64,
#[prost(oneof="room_event::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32")]
#[prost(oneof="room_event::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33")]
pub message: ::core::option::Option<room_event::Message>,
}
/// Nested message and enum types in `RoomEvent`.
Expand Down Expand Up @@ -2707,6 +2706,8 @@ pub mod room_event {
StreamChunkReceived(super::DataStreamChunkReceived),
#[prost(message, tag="32")]
StreamTrailerReceived(super::DataStreamTrailerReceived),
#[prost(message, tag="33")]
DataChannelLowThresholdChanged(super::DataChannelBufferedAmountLowThresholdChanged),
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
Expand All @@ -2718,6 +2719,10 @@ pub struct RoomInfo {
pub name: ::prost::alloc::string::String,
#[prost(string, required, tag="3")]
pub metadata: ::prost::alloc::string::String,
#[prost(uint64, required, tag="4")]
pub lossy_dc_buffered_amount_low_threshold: u64,
#[prost(uint64, required, tag="5")]
pub reliable_dc_buffered_amount_low_threshold: u64,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -3217,6 +3222,28 @@ pub struct SendStreamTrailerCallback {
#[prost(string, optional, tag="2")]
pub error: ::core::option::Option<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SetDataChannelBufferedAmountLowThresholdRequest {
#[prost(uint64, required, tag="1")]
pub local_participant_handle: u64,
#[prost(uint64, required, tag="2")]
pub threshold: u64,
#[prost(enumeration="DataPacketKind", required, tag="3")]
pub kind: i32,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SetDataChannelBufferedAmountLowThresholdResponse {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct DataChannelBufferedAmountLowThresholdChanged {
#[prost(enumeration="DataPacketKind", required, tag="1")]
pub kind: i32,
#[prost(uint64, required, tag="2")]
pub threshold: u64,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum IceTransportType {
Expand Down Expand Up @@ -3989,7 +4016,7 @@ pub struct RpcMethodInvocationEvent {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FfiRequest {
#[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46")]
#[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47")]
pub message: ::core::option::Option<ffi_request::Message>,
}
/// Nested message and enum types in `FfiRequest`.
Expand Down Expand Up @@ -4094,13 +4121,16 @@ pub mod ffi_request {
SendStreamChunk(super::SendStreamChunkRequest),
#[prost(message, tag="46")]
SendStreamTrailer(super::SendStreamTrailerRequest),
/// Data Channel
#[prost(message, tag="47")]
SetDataChannelBufferedAmountLowThreshold(super::SetDataChannelBufferedAmountLowThresholdRequest),
}
}
/// This is the output of livekit_ffi_request function.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FfiResponse {
#[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45")]
#[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46")]
pub message: ::core::option::Option<ffi_response::Message>,
}
/// Nested message and enum types in `FfiResponse`.
Expand Down Expand Up @@ -4203,6 +4233,9 @@ pub mod ffi_response {
SendStreamChunk(super::SendStreamChunkResponse),
#[prost(message, tag="45")]
SendStreamTrailer(super::SendStreamTrailerResponse),
/// Data Channel
#[prost(message, tag="46")]
SetDataChannelBufferedAmountLowThreshold(super::SetDataChannelBufferedAmountLowThresholdResponse),
}
}
/// To minimize complexity, participant events are not included in the protocol.
Expand Down
19 changes: 19 additions & 0 deletions livekit-ffi/src/server/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,20 @@ fn on_rpc_method_invocation_response(
Ok(proto::RpcMethodInvocationResponseResponse { error })
}

fn on_set_data_channel_buffered_amount_low_threshold(
server: &'static FfiServer,
set_data_channel_buffered_amount_low_threshold: proto::SetDataChannelBufferedAmountLowThresholdRequest,
) -> FfiResult<proto::SetDataChannelBufferedAmountLowThresholdResponse> {
let ffi_participant = server
.retrieve_handle::<FfiParticipant>(
set_data_channel_buffered_amount_low_threshold.local_participant_handle,
)?
.clone();
Ok(ffi_participant.room.set_data_channel_buffered_amount_low_threshold(
set_data_channel_buffered_amount_low_threshold,
))
}

#[allow(clippy::field_reassign_with_default)] // Avoid uggly format
pub fn handle_request(
server: &'static FfiServer,
Expand Down Expand Up @@ -1078,6 +1092,11 @@ pub fn handle_request(
server, request,
)?)
}
proto::ffi_request::Message::SetDataChannelBufferedAmountLowThreshold(request) => {
proto::ffi_response::Message::SetDataChannelBufferedAmountLowThreshold(
on_set_data_channel_buffered_amount_low_threshold(server, request)?,
)
}
});

Ok(res)
Expand Down
19 changes: 19 additions & 0 deletions livekit-ffi/src/server/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,17 @@ impl RoomInner {
) -> Option<oneshot::Sender<Result<String, RpcError>>> {
return self.rpc_method_invocation_waiters.lock().remove(&invocation_id);
}

pub fn set_data_channel_buffered_amount_low_threshold(
&self,
request: proto::SetDataChannelBufferedAmountLowThresholdRequest,
) -> proto::SetDataChannelBufferedAmountLowThresholdResponse {
let _ = self.room.local_participant().set_data_channel_buffered_amount_low_threshold(
request.threshold,
request.kind().into(),
);
proto::SetDataChannelBufferedAmountLowThresholdResponse {}
}
}

// Task used to publish data without blocking the client thread
Expand Down Expand Up @@ -1246,6 +1257,14 @@ async fn forward_event(
proto::DataStreamTrailerReceived { trailer: trailer.into(), participant_identity },
));
}
RoomEvent::DataChannelBufferedAmountLowThresholdChanged { kind, threshold } => {
let _ = send_event(proto::room_event::Message::DataChannelLowThresholdChanged(
proto::DataChannelBufferedAmountLowThresholdChanged {
kind: proto::DataPacketKind::from(kind).into(),
threshold,
},
));
}
_ => {
log::warn!("unhandled room event: {:?}", event);
}
Expand Down
49 changes: 48 additions & 1 deletion livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use crate::{
prelude::*,
rtc_engine::{
EngineError, EngineEvent, EngineEvents, EngineOptions, EngineResult, RtcEngine,
SessionStats,
SessionStats, INITIAL_BUFFERED_AMOUNT_LOW_THRESHOLD,
},
};

Expand Down Expand Up @@ -196,6 +196,10 @@ pub enum RoomEvent {
},
Reconnecting,
Reconnected,
DataChannelBufferedAmountLowThresholdChanged {
kind: DataPacketKind,
threshold: u64,
},
}

#[derive(Debug, Clone, Copy, Eq, PartialEq)]
Expand Down Expand Up @@ -360,6 +364,19 @@ impl Debug for Room {
struct RoomInfo {
metadata: String,
state: ConnectionState,
lossy_dc_options: DataChannelOptions,
reliable_dc_options: DataChannelOptions,
}

#[derive(Clone)]
pub struct DataChannelOptions {
pub buffered_amount_low_threshold: u64,
}

impl Default for DataChannelOptions {
fn default() -> Self {
Self { buffered_amount_low_threshold: INITIAL_BUFFERED_AMOUNT_LOW_THRESHOLD }
}
}

pub(crate) struct RoomSession {
Expand Down Expand Up @@ -506,6 +523,8 @@ impl Room {
info: RwLock::new(RoomInfo {
state: ConnectionState::Disconnected,
metadata: room_info.metadata,
lossy_dc_options: Default::default(),
reliable_dc_options: Default::default(),
}),
remote_participants: Default::default(),
active_speakers: Default::default(),
Expand Down Expand Up @@ -623,6 +642,13 @@ impl Room {
pub fn e2ee_manager(&self) -> &E2eeManager {
&self.inner.e2ee_manager
}

pub fn data_channel_options(&self, kind: DataPacketKind) -> DataChannelOptions {
match kind {
DataPacketKind::Lossy => self.inner.info.read().lossy_dc_options.clone(),
DataPacketKind::Reliable => self.inner.info.read().reliable_dc_options.clone(),
}
}
}

impl RoomSession {
Expand Down Expand Up @@ -741,6 +767,9 @@ impl RoomSession {
EngineEvent::DataStreamTrailer { trailer, participant_identity } => {
self.handle_data_stream_trailer(trailer, participant_identity);
}
EngineEvent::DataChannelBufferedAmountLowThresholdChanged { kind, threshold } => {
self.handle_data_channel_buffered_low_threshold_change(kind, threshold);
}
_ => {}
}

Expand Down Expand Up @@ -1278,6 +1307,24 @@ impl RoomSession {
self.dispatcher.dispatch(&event);
}

fn handle_data_channel_buffered_low_threshold_change(
&self,
kind: DataPacketKind,
threshold: u64,
) {
let mut info = self.info.write();
match kind {
DataPacketKind::Lossy => {
info.lossy_dc_options.buffered_amount_low_threshold = threshold;
}
DataPacketKind::Reliable => {
info.reliable_dc_options.buffered_amount_low_threshold = threshold;
}
}
let event = RoomEvent::DataChannelBufferedAmountLowThresholdChanged { kind, threshold };
self.dispatcher.dispatch(&event);
}

/// Create a new participant
/// Also add it to the participants list
fn create_participant(
Expand Down
Loading
Loading