From 711f14a7a053f9d3f231663ea4b79b4bc4ddcbf1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20=C5=9Aled=C5=BA?= Date: Wed, 6 Apr 2022 15:29:49 +0200 Subject: [PATCH 01/19] Enhance API for configuring simulcast --- assets/js/const.ts | 8 ++++---- assets/js/membraneWebRTC.ts | 41 +++++++++++++++++++++++++++---------- 2 files changed, 34 insertions(+), 15 deletions(-) diff --git a/assets/js/const.ts b/assets/js/const.ts index 815e6eaf6..bd231a46a 100644 --- a/assets/js/const.ts +++ b/assets/js/const.ts @@ -1,6 +1,6 @@ // const TEMPORAL_LAYERS_COUNT = 2; -export const simulcastConfig: RTCRtpTransceiverInit = { +export const simulcastTransceiverConfig: RTCRtpTransceiverInit = { direction: "sendonly", // keep this array from low resolution to high resolution // in other case lower resolution encoding can get @@ -8,19 +8,19 @@ export const simulcastConfig: RTCRtpTransceiverInit = { sendEncodings: [ { rid: "l", - active: true, + active: false, // maxBitrate: 4_000_000, scaleResolutionDownBy: 4.0, // scalabilityMode: "L1T" + TEMPORAL_LAYERS_COUNT, }, { rid: "m", - active: true, + active: false, scaleResolutionDownBy: 2.0, }, { rid: "h", - active: true, + active: false, // maxBitrate: 4_000_000, // scalabilityMode: "L1T" + TEMPORAL_LAYERS_COUNT, }, diff --git a/assets/js/membraneWebRTC.ts b/assets/js/membraneWebRTC.ts index 5e64f9718..227425524 100644 --- a/assets/js/membraneWebRTC.ts +++ b/assets/js/membraneWebRTC.ts @@ -7,7 +7,7 @@ import { serializeMediaEvent, } from "./mediaEvent"; import { v4 as uuidv4 } from "uuid"; -import { simulcastConfig } from "./const"; +import { simulcastTransceiverConfig } from "./const"; /** * Interface describing Peer. @@ -41,6 +41,14 @@ export interface MembraneWebRTCConfig { rtcConfig?: RTCConfiguration; } +/** + * Simulcast configuration passed to {@link addTrack}. + */ +export interface SimulcastConfig { + enabled: boolean; + active_encodings: TrackEncoding[]; +} + /** * Track's context i.e. all data that can be useful when operating on track. */ @@ -60,9 +68,9 @@ export interface TrackContext { */ trackId: string; /** - * Flag indicating whether track is a simulcast one or not. + * Simulcast configuration. */ - isSimulcast: boolean; + simulcastConfig: SimulcastConfig; /** * Any info that was passed in {@link addTrack}. */ @@ -74,9 +82,9 @@ export interface TrackContext { /** * Type describing possible track encodings. * At the moment, if track was added as a simulcast one ({@link addTrack}) - * it will be transmitted to the server in two versions - low and high. + * it will be transmitted to the server in three versions - low, medium and high. */ -export type TrackEncoding = "l" | "h"; +export type TrackEncoding = "l" | "m" | "h"; /** * Callbacks that has to be implemented by user. @@ -292,7 +300,7 @@ export class MembraneWebRTC { stream: null, track: null, trackId, - isSimulcast: false, + simulcastConfig: { enabled: false, active_encodings: [] }, metadata, peer, maxBandwidth: 0, @@ -448,7 +456,7 @@ export class MembraneWebRTC { track: MediaStreamTrack, stream: MediaStream, trackMetadata: any = new Map(), - isSimulcast: boolean = false, + simulcastConfig: SimulcastConfig = { enabled: false, active_encodings: [] }, maxBandwidth: BandwidthLimit = 0 // unlimited bandwidth ): string { if (this.getPeerId() === "") throw "Cannot add tracks before being accepted by the server"; @@ -462,7 +470,7 @@ export class MembraneWebRTC { trackId, peer: this.localPeer, metadata: trackMetadata, - isSimulcast, + simulcastConfig, maxBandwidth, }; this.localTrackIdToTrack.set(trackId, trackContext); @@ -488,8 +496,17 @@ export class MembraneWebRTC { const track = trackContext.track!!; let transceiverConfig: RTCRtpTransceiverInit; - if (trackContext.isSimulcast) { - transceiverConfig = track.kind === "audio" ? { direction: "sendonly" } : simulcastConfig; + if (trackContext.simulcastConfig.enabled) { + console.log(trackContext); + transceiverConfig = + track.kind === "audio" ? { direction: "sendonly" } : simulcastTransceiverConfig; + transceiverConfig.sendEncodings?.forEach((encoding) => { + if ( + trackContext.simulcastConfig.active_encodings.includes(encoding.rid! as TrackEncoding) + ) { + encoding.active = true; + } + }); this.disabledTrackEncodings.set(trackContext.trackId, []); } else { transceiverConfig = { @@ -506,6 +523,8 @@ export class MembraneWebRTC { } } + console.log(transceiverConfig); + this.connection!.addTransceiver(track, transceiverConfig); }; @@ -993,7 +1012,7 @@ export class MembraneWebRTC { peer: peer, trackId, metadata, - isSimulcast: false, + simulcastConfig: { enabled: false, active_encodings: [] }, }; this.trackIdToTrack.set(trackId, trackContext); From b7ae259c6bbe92756dfd62c630c517d5aaf2c17d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20=C5=9Aled=C5=BA?= Date: Wed, 6 Apr 2022 17:53:45 +0200 Subject: [PATCH 02/19] Remove unused `update?` field in SimulcastTee --- lib/membrane_rtc_engine/endpoints/webrtc/simulcast_tee.ex | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/membrane_rtc_engine/endpoints/webrtc/simulcast_tee.ex b/lib/membrane_rtc_engine/endpoints/webrtc/simulcast_tee.ex index b1e8d2d6a..1f23d828d 100644 --- a/lib/membrane_rtc_engine/endpoints/webrtc/simulcast_tee.ex +++ b/lib/membrane_rtc_engine/endpoints/webrtc/simulcast_tee.ex @@ -57,8 +57,7 @@ defmodule Membrane.RTC.Engine.Endpoint.WebRTC.SimulcastTee do "l" => EncodingTracker.new("l"), "m" => EncodingTracker.new("m"), "h" => EncodingTracker.new("h") - }, - update?: false + } }} end From 8279c8acd68150b7a97d8afd666787a7535a1571 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20=C5=9Aled=C5=BA?= Date: Wed, 6 Apr 2022 19:12:14 +0200 Subject: [PATCH 03/19] Fix switching between encodings in Forwarder --- .../endpoints/webrtc/forwarder.ex | 73 +++++++++++++------ .../endpoints/webrtc/simulcast_tee.ex | 62 ++++++++-------- lib/membrane_rtc_engine/engine.ex | 2 +- .../webrtc/forwarder_test.exs | 72 +++++++++++++++++- 4 files changed, 153 insertions(+), 56 deletions(-) diff --git a/lib/membrane_rtc_engine/endpoints/webrtc/forwarder.ex b/lib/membrane_rtc_engine/endpoints/webrtc/forwarder.ex index 57d9d691c..9b665c24b 100644 --- a/lib/membrane_rtc_engine/endpoints/webrtc/forwarder.ex +++ b/lib/membrane_rtc_engine/endpoints/webrtc/forwarder.ex @@ -24,31 +24,52 @@ defmodule Membrane.RTC.Engine.Endpoint.WebRTC.Forwarder do old_encoding: String.t() | nil, rtp_munger: RTPMunger.t(), vp8_munger: VP8Munger.t(), + encodings: [String.t()], active_encodings: [String.t()], started?: boolean() } - @enforce_keys [:codec, :rtp_munger] + @enforce_keys [:codec, :selected_encoding, :encodings, :active_encodings, :rtp_munger] defstruct @enforce_keys ++ [ :queued_encoding, :old_encoding, :vp8_munger, - selected_encoding: "h", - active_encodings: ["h", "m", "l"], started?: false ] @doc """ Creates a new forwarder. + + * `encodings` - a list of possible encodings. + * `selected_encoding` - encoding to forward. If not provided, + the highest possible encoding will be choosen. """ - @spec new(:H264 | :VP8, Membrane.RTP.clock_rate_t()) :: t() - def new(:VP8, clock_rate) do - %__MODULE__{codec: :VP8, rtp_munger: RTPMunger.new(clock_rate), vp8_munger: VP8Munger.new()} + @spec new(:H264 | :VP8, Membrane.RTP.clock_rate_t(), [String.t()], String.t() | nil) :: t() + def new(codec, clock_rate, encodings, selected_encoding \\ nil) + + def new(:VP8, clock_rate, encodings, selected_encoding) do + %__MODULE__{ + codec: :VP8, + rtp_munger: RTPMunger.new(clock_rate), + vp8_munger: VP8Munger.new(), + encodings: encodings, + # assume that, at the beginning, all encodings are active + # if some encoding is inactive, we will be notified + # about this in `encoding_inactive` function + active_encodings: encodings, + selected_encoding: selected_encoding || get_next_encoding(encodings) + } end - def new(:H264, clock_rate) do - %__MODULE__{codec: :H264, rtp_munger: RTPMunger.new(clock_rate)} + def new(:H264, clock_rate, encodings, selected_encoding) do + %__MODULE__{ + codec: :H264, + rtp_munger: RTPMunger.new(clock_rate), + encodings: encodings, + active_encodings: encodings, + selected_encoding: selected_encoding || get_next_encoding(encodings) + } end @doc """ @@ -59,26 +80,26 @@ defmodule Membrane.RTC.Engine.Endpoint.WebRTC.Forwarder do active again. """ @spec encoding_inactive(t(), String.t()) :: t() - def encoding_inactive( - %__MODULE__{selected_encoding: encoding, old_encoding: nil} = forwarder, - encoding - ) do - forwarder = %__MODULE__{ - forwarder - | old_encoding: encoding, - active_encodings: List.delete(forwarder.active_encodings, encoding) - } - - do_select_encoding(forwarder, get_next_encoding(forwarder.active_encodings)) - end - def encoding_inactive(forwarder, encoding) do forwarder = %__MODULE__{ forwarder | active_encodings: List.delete(forwarder.active_encodings, encoding) } - do_select_encoding(forwarder, List.first(forwarder.active_encodings)) + cond do + # if this is currently used and selected encoding + forwarder.selected_encoding == encoding and forwarder.old_encoding == nil -> + forwarder = %__MODULE__{forwarder | old_encoding: encoding} + do_select_encoding(forwarder, get_next_encoding(forwarder.active_encodings)) + + # if this is currently used encoding but it wasn't explicitly selected + # i.e. we switched to it automatically + forwarder.selected_encoding == encoding -> + do_select_encoding(forwarder, get_next_encoding(forwarder.active_encodings)) + + true -> + forwarder + end end @doc """ @@ -101,7 +122,13 @@ defmodule Membrane.RTC.Engine.Endpoint.WebRTC.Forwarder do forwarder = %__MODULE__{forwarder | old_encoding: nil} do_select_encoding(forwarder, encoding) - # if we don't have any active encoding + # if we are waiting for selected encoding to become + # active again, try to select a new encoding + # it might be better than currently used + forwarder.old_encoding != nil -> + do_select_encoding(forwarder, get_next_encoding(forwarder.active_encodings)) + + # if we didn't have any active encoding forwarder.selected_encoding == nil -> do_select_encoding(forwarder, get_next_encoding(forwarder.active_encodings)) diff --git a/lib/membrane_rtc_engine/endpoints/webrtc/simulcast_tee.ex b/lib/membrane_rtc_engine/endpoints/webrtc/simulcast_tee.ex index 1f23d828d..f117a41d6 100644 --- a/lib/membrane_rtc_engine/endpoints/webrtc/simulcast_tee.ex +++ b/lib/membrane_rtc_engine/endpoints/webrtc/simulcast_tee.ex @@ -9,15 +9,10 @@ defmodule Membrane.RTC.Engine.Endpoint.WebRTC.SimulcastTee do @supported_codecs [:H264, :VP8] - def_options codec: [ - type: :atom, - spec: [:H264 | :VP8], - description: "Codec of track #{inspect(__MODULE__)} will forward." - ], - clock_rate: [ - type: :integer, - spec: Membrane.RTP.clock_rate_t(), - description: "Clock rate of track #{inspect(__MODULE__)} will forward." + def_options track: [ + type: :struct, + spec: Membrane.RTC.Engine.Track.t(), + description: "Track this tee is going to forward to other endpoints" ] def_input_pad :input, @@ -39,25 +34,21 @@ defmodule Membrane.RTC.Engine.Endpoint.WebRTC.SimulcastTee do @impl true def handle_init(opts) do - codec = opts.codec - - if codec not in @supported_codecs do + if opts.track.encoding not in @supported_codecs do raise(""" - #{inspect(__MODULE__)} does not support codec #{inspect(codec)}. + #{inspect(__MODULE__)} does not support codec #{inspect(opts.track.encoding)}. Supported codecs: #{inspect(@supported_codecs)} """) end + trackers = Map.new(opts.track.simulcast_encodings, &{&1, EncodingTracker.new(&1)}) + {:ok, %{ - codec: codec, - clock_rate: opts.clock_rate, + track: opts.track, forwarders: %{}, - trackers: %{ - "l" => EncodingTracker.new("l"), - "m" => EncodingTracker.new("m"), - "h" => EncodingTracker.new("h") - } + trackers: trackers, + inactive_encodings: [] }} end @@ -68,9 +59,15 @@ defmodule Membrane.RTC.Engine.Endpoint.WebRTC.SimulcastTee do @impl true def handle_pad_added(Pad.ref(:output, {:endpoint, endpoint_id}), _context, state) do - state = - put_in(state, [:forwarders, endpoint_id], Forwarder.new(state.codec, state.clock_rate)) + forwarder = + Forwarder.new(state.track.encoding, state.track.clock_rate, state.track.simulcast_encodings) + + forwarder = + Enum.reduce(state.inactive_encodings, forwarder, fn encoding, forwarder -> + Forwarder.encoding_inactive(forwarder, encoding) + end) + state = put_in(state, [:forwarders, endpoint_id], forwarder) {:ok, state} end @@ -127,18 +124,25 @@ defmodule Membrane.RTC.Engine.Endpoint.WebRTC.SimulcastTee do {:ok, tracker} -> put_in(state, [:trackers, rid], tracker) - {:status_changed, tracker, new_status} -> - func = - if new_status == :inactive, - do: &Forwarder.encoding_inactive(&1, rid), - else: &Forwarder.encoding_active(&1, rid) + {:status_changed, tracker, :active} -> + state = + Enum.reduce(state.forwarders, state, fn {endpoint_id, forwarder}, state -> + put_in(state, [:forwarders, endpoint_id], Forwarder.encoding_active(forwarder, rid)) + end) + state + |> update_in([:inactive_encodings], &List.delete(&1, rid)) + |> put_in([:trackers, rid], tracker) + + {:status_changed, tracker, :inactive} -> state = Enum.reduce(state.forwarders, state, fn {endpoint_id, forwarder}, state -> - put_in(state, [:forwarders, endpoint_id], func.(forwarder)) + put_in(state, [:forwarders, endpoint_id], Forwarder.encoding_inactive(forwarder, rid)) end) - put_in(state, [:trackers, rid], tracker) + state + |> update_in([:inactive_encodings], &[rid | &1]) + |> put_in([:trackers, rid], tracker) end end diff --git a/lib/membrane_rtc_engine/engine.ex b/lib/membrane_rtc_engine/engine.ex index 92267dfcb..eea9e912c 100644 --- a/lib/membrane_rtc_engine/engine.ex +++ b/lib/membrane_rtc_engine/engine.ex @@ -964,7 +964,7 @@ defmodule Membrane.RTC.Engine do tee = cond do rid != nil -> - %SimulcastTee{codec: track.encoding, clock_rate: track.clock_rate} + %SimulcastTee{track: track} state.display_manager != nil -> %Engine.Tee{ets_name: state.id, track_id: track_id, type: track.type} diff --git a/test/membrane_rtc_engine/webrtc/forwarder_test.exs b/test/membrane_rtc_engine/webrtc/forwarder_test.exs index c0732aaf1..af6724032 100644 --- a/test/membrane_rtc_engine/webrtc/forwarder_test.exs +++ b/test/membrane_rtc_engine/webrtc/forwarder_test.exs @@ -4,14 +4,22 @@ defmodule Membrane.RTC.Engine.Endpoint.WebRTC.ForwarderTest do alias Membrane.RTC.Engine.Endpoint.WebRTC.Forwarder test "Forwarder switches back to encoding being used before it became inactive" do - forwarder = Forwarder.new(:VP8, 90_000) + forwarder = Forwarder.new(:VP8, 90_000, ["l", "m", "h"]) + + assert %Forwarder{ + selected_encoding: "h", + queued_encoding: nil, + old_encoding: nil, + active_encodings: ["l", "m", "h"] + } = forwarder + forwarder = Forwarder.encoding_inactive(forwarder, "h") assert %Forwarder{ selected_encoding: "h", queued_encoding: "m", old_encoding: "h", - active_encodings: ["m", "l"] + active_encodings: ["l", "m"] } = forwarder forwarder = Forwarder.encoding_active(forwarder, "h") @@ -20,7 +28,65 @@ defmodule Membrane.RTC.Engine.Endpoint.WebRTC.ForwarderTest do selected_encoding: "h", queued_encoding: nil, old_encoding: nil, - active_encodings: ["m", "l", "h"] + active_encodings: ["l", "m", "h"] + } = forwarder + end + + test "Forwarder doesn't switch to a new encoding when not currently used encoding is marked as inactive" do + forwarder = Forwarder.new(:VP8, 90_000, ["l", "m", "h"]) + + assert %Forwarder{ + selected_encoding: "h", + queued_encoding: nil, + old_encoding: nil, + active_encodings: ["l", "m", "h"] + } = forwarder + + forwarder = Forwarder.encoding_inactive(forwarder, "m") + + assert %Forwarder{ + selected_encoding: "h", + queued_encoding: nil, + old_encoding: nil, + active_encodings: ["l", "h"] + } = forwarder + end + + test "Forwarder switches to a new encoding when it is better than currently used encoding and while waiting for the actual encoding" do + forwarder = Forwarder.new(:VP8, 90_000, ["l", "m", "h"]) + + assert %Forwarder{ + selected_encoding: "h", + queued_encoding: nil, + old_encoding: nil, + active_encodings: ["l", "m", "h"] + } = forwarder + + forwarder = Forwarder.encoding_inactive(forwarder, "m") + + assert %Forwarder{ + selected_encoding: "h", + queued_encoding: nil, + old_encoding: nil, + active_encodings: ["l", "h"] + } = forwarder + + forwarder = Forwarder.encoding_inactive(forwarder, "h") + + assert %Forwarder{ + selected_encoding: "h", + queued_encoding: "l", + old_encoding: "h", + active_encodings: ["l"] + } = forwarder + + forwarder = Forwarder.encoding_active(forwarder, "m") + + assert %Forwarder{ + selected_encoding: "h", + queued_encoding: "m", + old_encoding: "h", + active_encodings: ["l", "m"] } = forwarder end end From e8165d5c816989f23ed2380efca77f10b5e616d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20=C5=9Aled=C5=BA?= Date: Thu, 7 Apr 2022 18:19:30 +0200 Subject: [PATCH 04/19] Refactor membraneWebRTC.ts. Remember initially inactive encodings. --- assets/js/membraneWebRTC.ts | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/assets/js/membraneWebRTC.ts b/assets/js/membraneWebRTC.ts index 227425524..527ba981b 100644 --- a/assets/js/membraneWebRTC.ts +++ b/assets/js/membraneWebRTC.ts @@ -45,7 +45,15 @@ export interface MembraneWebRTCConfig { * Simulcast configuration passed to {@link addTrack}. */ export interface SimulcastConfig { + /** + * Whether to simulcast track or not. + */ enabled: boolean; + /** + * List of initially active encodings. + * Encoding that is not present in this list might still be + * enabled using {@link enableTrackEncoding}. + */ active_encodings: TrackEncoding[]; } @@ -300,7 +308,7 @@ export class MembraneWebRTC { stream: null, track: null, trackId, - simulcastConfig: { enabled: false, active_encodings: [] }, + simulcastConfig: { enabled: false, encodings: [], active_encodings: [] }, metadata, peer, maxBandwidth: 0, @@ -497,17 +505,19 @@ export class MembraneWebRTC { let transceiverConfig: RTCRtpTransceiverInit; if (trackContext.simulcastConfig.enabled) { - console.log(trackContext); transceiverConfig = track.kind === "audio" ? { direction: "sendonly" } : simulcastTransceiverConfig; + let disabledTrackEncodings: TrackEncoding[] = []; transceiverConfig.sendEncodings?.forEach((encoding) => { if ( trackContext.simulcastConfig.active_encodings.includes(encoding.rid! as TrackEncoding) ) { encoding.active = true; + } else { + disabledTrackEncodings.push(encoding.rid! as TrackEncoding); } }); - this.disabledTrackEncodings.set(trackContext.trackId, []); + this.disabledTrackEncodings.set(trackContext.trackId, disabledTrackEncodings); } else { transceiverConfig = { direction: "sendonly", @@ -522,9 +532,6 @@ export class MembraneWebRTC { transceiverConfig.sendEncodings![0].maxBitrate = trackContext.maxBandwidth * 1024; // convert to bps; } } - - console.log(transceiverConfig); - this.connection!.addTransceiver(track, transceiverConfig); }; From a43e56c5bc303e8fa5ac5c694a0a0c44a2481040 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20=C5=9Aled=C5=BA?= Date: Fri, 8 Apr 2022 16:12:34 +0200 Subject: [PATCH 05/19] Add ability to set default simulcast encoding to receive --- .../endpoints/hls_endpoint.ex | 22 +- .../endpoints/webrtc/simulcast_tee.ex | 20 +- .../endpoints/webrtc_endpoint.ex | 25 +- lib/membrane_rtc_engine/engine.ex | 276 +++++++++--------- lib/membrane_rtc_engine/subscription.ex | 32 ++ 5 files changed, 202 insertions(+), 173 deletions(-) create mode 100644 lib/membrane_rtc_engine/subscription.ex diff --git a/lib/membrane_rtc_engine/endpoints/hls_endpoint.ex b/lib/membrane_rtc_engine/endpoints/hls_endpoint.ex index f77a4c609..0a71df4d2 100644 --- a/lib/membrane_rtc_engine/endpoints/hls_endpoint.ex +++ b/lib/membrane_rtc_engine/endpoints/hls_endpoint.ex @@ -82,23 +82,17 @@ if Enum.all?( def handle_other({:new_tracks, tracks}, ctx, state) do new_tracks = Map.new(tracks, &{&1.id, &1}) - subscriptions = - tracks - |> Enum.filter(fn track -> :raw in track.format end) - |> Enum.map(fn track -> {track.id, :raw} end) - {:endpoint, endpoint_id} = ctx.name - case Engine.subscribe(state.rtc_engine, subscriptions, endpoint_id) do - :ok -> - {:ok, Map.update!(state, :tracks, &Map.merge(&1, new_tracks))} - - {:error, track_id, reason} -> - raise "Subscription fails on track: #{track_id} because of #{reason}" + Enum.each(tracks, fn track -> + case Engine.subscribe(state.rtc_engine, endpoint_id, track.id, :RTP) do + :ok -> + {:ok, Map.update!(state, :tracks, &Map.merge(&1, new_tracks))} - {:error, :timeout} -> - raise "Timeout subscribing on track in Engine with pid #{state.rtc_engine}" - end + {:error, reason} -> + raise "Couldn't subscribe for track: #{inspect(track.id)}. Reason: #{inspect(reason)}" + end + end) end @impl true diff --git a/lib/membrane_rtc_engine/endpoints/webrtc/simulcast_tee.ex b/lib/membrane_rtc_engine/endpoints/webrtc/simulcast_tee.ex index f117a41d6..b1f4d66d2 100644 --- a/lib/membrane_rtc_engine/endpoints/webrtc/simulcast_tee.ex +++ b/lib/membrane_rtc_engine/endpoints/webrtc/simulcast_tee.ex @@ -24,7 +24,16 @@ defmodule Membrane.RTC.Engine.Endpoint.WebRTC.SimulcastTee do def_output_pad :output, availability: :on_request, mode: :push, - caps: Membrane.RTP + caps: Membrane.RTP, + options: [ + default_simulcast_encoding: [ + spec: String.t() | nil, + default: nil, + description: """ + TODO + """ + ] + ] @typedoc """ Notifies that encoding for endpoint with id `endpoint_id` was switched to encoding `encoding`. @@ -58,9 +67,14 @@ defmodule Membrane.RTC.Engine.Endpoint.WebRTC.SimulcastTee do end @impl true - def handle_pad_added(Pad.ref(:output, {:endpoint, endpoint_id}), _context, state) do + def handle_pad_added(Pad.ref(:output, {:endpoint, endpoint_id}), context, state) do forwarder = - Forwarder.new(state.track.encoding, state.track.clock_rate, state.track.simulcast_encodings) + Forwarder.new( + state.track.encoding, + state.track.clock_rate, + state.track.simulcast_encodings, + context.options[:default_simulcast_encoding] + ) forwarder = Enum.reduce(state.inactive_encodings, forwarder, fn encoding, forwarder -> diff --git a/lib/membrane_rtc_engine/endpoints/webrtc_endpoint.ex b/lib/membrane_rtc_engine/endpoints/webrtc_endpoint.ex index 7c6c568b7..8ace5684a 100644 --- a/lib/membrane_rtc_engine/endpoints/webrtc_endpoint.ex +++ b/lib/membrane_rtc_engine/endpoints/webrtc_endpoint.ex @@ -243,25 +243,22 @@ defmodule Membrane.RTC.Engine.Endpoint.WebRTC do new_outbound_tracks = Enum.map(new_outbound_tracks, &to_rtc_track(&1, state.track_id_to_metadata)) - subscriptions = Enum.map(new_outbound_tracks, fn track -> {track.id, :RTP} end) - {:endpoint, endpoint_id} = ctx.name - case Engine.subscribe(state.rtc_engine, subscriptions, endpoint_id) do - :ok -> - send_if_not_nil( - state.display_manager, - {:subscribe_tracks, ctx.name, new_outbound_tracks} - ) + opts = [default_simulcast_encoding: "m"] - {:ok, state} + Enum.each(new_outbound_tracks, fn track -> + case Engine.subscribe(state.rtc_engine, endpoint_id, track.id, :RTP, opts) do + :ok -> + :ok - {:error, track_id, reason} -> - raise "Couldn't subscribe for track: #{inspect(track_id)}. Reason: #{inspect(reason)}" + {:error, reason} -> + raise "Couldn't subscribe for track: #{inspect(track.id)}. Reason: #{inspect(reason)}" + end + end) - {:error, :timeout} -> - raise "Timeout subscribing on track in Engine with pid #{inspect(state.rtc_engine)}" - end + send_if_not_nil(state.display_manager, {:subscribe_tracks, ctx.name, new_outbound_tracks}) + {:ok, state} end @impl true diff --git a/lib/membrane_rtc_engine/engine.ex b/lib/membrane_rtc_engine/engine.ex index eea9e912c..d15417cb1 100644 --- a/lib/membrane_rtc_engine/engine.ex +++ b/lib/membrane_rtc_engine/engine.ex @@ -200,7 +200,8 @@ defmodule Membrane.RTC.Engine do Message, Track, Peer, - DisplayManager + DisplayManager, + Subscription } alias Membrane.RTC.Engine @@ -405,6 +406,8 @@ defmodule Membrane.RTC.Engine do :ok end + @type subscription_opts_t() :: [default_simulcast_encoding: String.t()] + @doc """ Subscribes endpoint for tracks. @@ -416,20 +419,27 @@ defmodule Membrane.RTC.Engine do """ @spec subscribe( rtc_engine :: pid(), - subscriptions :: [{Track.id(), atom()}], - endpoint_id :: String.t() + endpoint_id :: String.t(), + track_id :: Track.id(), + format :: atom(), + opts :: subscription_opts_t ) :: - :ok | {:error, :timeout} | {:error, Track.id(), :no_such_track | :invalid_track_format} - def subscribe(rtc_engine, subscriptions, endpoint_id) do + :ok + | {:error, + :timeout + | :invalid_track_id + | :invalid_track_format + | :invalid_default_simulcast_encoding} + def subscribe(rtc_engine, endpoint_id, track_id, format, opts \\ []) do ref = make_ref() - send(rtc_engine, {:subscribe, subscriptions, self(), endpoint_id, ref}) + send(rtc_engine, {:subscribe, {self(), ref}, endpoint_id, track_id, format, opts}) receive do {^ref, :ok} -> :ok - {^ref, {:error, track_id, reason}} -> - {:error, track_id, reason} + {^ref, {:error, reason}} -> + {:error, reason} after 5_000 -> {:error, :timeout} @@ -462,7 +472,7 @@ defmodule Membrane.RTC.Engine do trace_context: trace_ctx, peers: %{}, endpoints: %{}, - waiting_for_linking: %{}, + pending_subscriptions: [], filters: %{}, subscriptions: %{}, display_manager: display_manager @@ -570,55 +580,29 @@ defmodule Membrane.RTC.Engine do @decorate trace("engine.other.subscribe", include: [[:state, :id]]) def handle_other( - {:subscribe, tracks_formats, endpoint_pid, endpoint_id, ref}, + {:subscribe, {endpoint_pid, ref}, endpoint_id, track_id, format, opts}, ctx, state ) do - all_tracks = - state.endpoints - |> Map.values() - |> Enum.flat_map(&Endpoint.get_tracks/1) - |> Map.new(&{&1.id, &1}) - - incorrect_subscriptions = - tracks_formats - |> Enum.map(fn {track_id, format} -> - track = Map.get(all_tracks, track_id) - - cond do - track == nil -> - {:error, track_id, :no_such_track} - - format not in track.format -> - {:error, track_id, :invalid_track_format} - - true -> - :ok - end - end) - |> Enum.drop_while(&(&1 == :ok)) - - if incorrect_subscriptions != [] do - [msg | _] = incorrect_subscriptions - send(endpoint_pid, {ref, msg}) - {:ok, state} - else - {links, state} = link_outbound_tracks(tracks_formats, endpoint_id, ctx, state) + subscription = %Subscription{ + endpoint_id: endpoint_id, + track_id: track_id, + format: format, + opts: opts, + status: :created + } - new_endpoint_subscriptions = Map.new(tracks_formats) + case check_subscription(subscription, state) do + :ok -> + {links, state} = try_fulfill_subscription(subscription, ctx, state) - subscriptions = - Map.update( - state.subscriptions, - endpoint_id, - new_endpoint_subscriptions, - &Map.merge(&1, new_endpoint_subscriptions) - ) + parent_spec = %ParentSpec{links: links, log_metadata: [rtc: state.id]} + send(endpoint_pid, {ref, :ok}) + {{:ok, [spec: parent_spec]}, state} - parent_spec = %ParentSpec{links: links, log_metadata: [rtc: state.id]} - state = %{state | subscriptions: subscriptions} - send(endpoint_pid, {ref, :ok}) - {{:ok, [spec: parent_spec]}, state} + {:error, _reason} = error -> + send(endpoint_pid, {ref, error}) + {:ok, state} end end @@ -655,6 +639,34 @@ defmodule Membrane.RTC.Engine do {:ok, state} end + defp check_subscription(subscription, state) do + # checks whether subscription is proper + track = + state.endpoints + |> Map.values() + |> Enum.flat_map(&Endpoint.get_tracks/1) + |> Map.new(&{&1.id, &1}) + |> Map.get(subscription.track_id) + + default_simulcast_encoding = subscription.opts[:default_simulcast_encoding] + + cond do + track == nil -> + {:error, :invalid_track_id} + + subscription.format not in track.format -> + {:error, :invalid_format} + + # TODO maybe simulcast_encodings should be always a list + default_simulcast_encoding != nil and track.simulcast_encodings != nil and + default_simulcast_encoding not in track.simulcast_encodings -> + {:error, :invalid_default_simulcast_encoding} + + true -> + :ok + end + end + defp handle_media_event(%{type: :join, data: data}, peer_id, _ctx, state) do peer = Peer.new(peer_id, data.metadata || %{}) dispatch(%Message.NewPeer{rtc_engine: self(), peer: peer}) @@ -869,29 +881,6 @@ defmodule Membrane.RTC.Engine do {{:ok, spec: spec}, state} end - @decorate trace("engine.notification.subscribe", include: [:endpoint_id, [:state, :id]]) - defp do_handle_notification( - {:subscribe, tracks_formats}, - {:endpoint, endpoint_id}, - ctx, - state - ) do - {links, state} = link_outbound_tracks(tracks_formats, endpoint_id, ctx, state) - new_endpoint_subscriptions = Map.new(tracks_formats) - - subscriptions = - Map.update( - state.subscriptions, - endpoint_id, - new_endpoint_subscriptions, - &Map.merge(&1, new_endpoint_subscriptions) - ) - - parent_spec = %ParentSpec{links: links, log_metadata: [rtc: state.id]} - state = %{state | subscriptions: subscriptions} - {{:ok, [spec: parent_spec]}, state} - end - @decorate trace("engine.notification.publish.new_tracks", include: [:endpoint_id, [:state, :id]]) defp do_handle_notification( {:publish, {:new_tracks, tracks}}, @@ -994,78 +983,55 @@ defmodule Membrane.RTC.Engine do end ] - endpoints_to_link = - ctx.children - |> Enum.flat_map(fn - {{:endpoint, endpoint_id}, _data} -> [endpoint_id] - _child -> [] - end) - |> Enum.filter(&MapSet.member?(state.waiting_for_linking[&1], track_id)) + {pending_subscriptions, rest} = + Enum.split_with(state.pending_subscriptions, fn s -> s.track_id == track.id end) - waiting_for_linking = - Enum.reduce(endpoints_to_link, state.waiting_for_linking, fn endpoint_id, - waiting_for_linking -> - Map.update!(waiting_for_linking, endpoint_id, &MapSet.delete(&1, track_id)) + {links, state} = + Enum.flat_map_reduce(pending_subscriptions, state, fn subscription, state -> + fulfill_subscription(subscription, ctx, state) end) - {endpoints_handling_raw_format, endpoints_handling_remote_format} = - Enum.split_with(endpoints_to_link, fn endpoint_id -> - format = state.subscriptions[endpoint_id][track_id] - format == :raw - end) + state = %{state | pending_subscriptions: rest} + {endpoint_to_tee_links ++ links, state} + end - raw_format_links = - if endpoints_handling_raw_format == [] do + defp try_fulfill_subscription(subscription, ctx, state) do + if Map.has_key?(ctx.children, {:tee, subscription.track_id}) do + fulfill_subscription(subscription, ctx, state) + else + subscription = %Subscription{subscription | status: :pending} + state = update_in(state, [:pending_subscriptions], &[subscription | &1]) + {[], state} + end + end + + defp fulfill_subscription(%Subscription{format: :raw} = s, ctx, state) do + links = + if Map.has_key?(ctx.children, {:raw_format_tee, s.track_id}) do [] else - prepare_raw_format_links(track_id, state) - end + prepare_raw_format_links(s.track_id, state) + end ++ + prepare_track_to_endpoint_links(s, :raw_format_tee, state) - links_to_raw_format_endpoints = - Enum.map(endpoints_handling_raw_format, fn endpoint_id -> - prepare_track_to_endpoint_links(track_id, endpoint_id, :raw_format_tee) - end) - - links_to_remote_format_endpoints = - Enum.map(endpoints_handling_remote_format, fn endpoint_id -> - prepare_track_to_endpoint_links(track_id, endpoint_id, :tee) - end) - - links = - endpoint_to_tee_links ++ - raw_format_links ++ links_to_raw_format_endpoints ++ links_to_remote_format_endpoints + state = + update_in( + state, + [:subscriptions, s.endpoint_id], + &Map.put(&1, s.track_id, %Subscription{s | status: :active}) + ) - state = %{state | waiting_for_linking: waiting_for_linking} {links, state} end - defp link_outbound_tracks(tracks_formats, endpoint_id, ctx, state) do - {tracks_to_link, tracks_not_to_link} = - Enum.split_with(tracks_formats, fn {track_id, _format} -> - Map.has_key?(ctx.children, {:tee, track_id}) - end) - - links = - Enum.flat_map(tracks_to_link, fn - {track_id, :raw} -> - if Map.has_key?(ctx.children, {:raw_format_tee, track_id}) do - [] - else - prepare_raw_format_links(track_id, state) - end ++ - prepare_track_to_endpoint_links(track_id, endpoint_id, :raw_format_tee) - - {track_id, _remote_format} -> - prepare_track_to_endpoint_links(track_id, endpoint_id, :tee) - end) - - waiting_for_linking = MapSet.new(tracks_not_to_link, fn {track_id, _format} -> track_id end) + defp fulfill_subscription(s, _ctx, state) do + links = prepare_track_to_endpoint_links(s, :tee, state) state = update_in( state, - [:waiting_for_linking, endpoint_id], - &MapSet.union(&1, waiting_for_linking) + [:subscriptions, s.endpoint_id], + &Map.put(&1, s.track_id, %Subscription{s | status: :active}) ) {links, state} @@ -1079,12 +1045,35 @@ defmodule Membrane.RTC.Engine do ] end - defp prepare_track_to_endpoint_links(track_id, endpoint_id, tee_kind) do + defp prepare_track_to_endpoint_links(subscription, :tee, state) do + track = + state.endpoints + |> Map.values() + |> Enum.flat_map(&Endpoint.get_tracks/1) + |> Map.new(&{&1.id, &1}) + |> Map.get(subscription.track_id) + + options = + if track.simulcast_encodings != nil do + [default_simulcast_encoding: subscription.opts[:default_simulcast_encoding]] + else + [] + end + + [ + link({:tee, subscription.track_id}) + |> via_out(Pad.ref(:output, {:endpoint, subscription.endpoint_id}), options: options) + |> via_in(Pad.ref(:input, subscription.track_id)) + |> to({:endpoint, subscription.endpoint_id}) + ] + end + + defp prepare_track_to_endpoint_links(subscription, tee_kind, _state) do [ - link({tee_kind, track_id}) - |> via_out(Pad.ref(:output, {:endpoint, endpoint_id})) - |> via_in(Pad.ref(:input, track_id)) - |> to({:endpoint, endpoint_id}) + link({tee_kind, subscription.track_id}) + |> via_out(Pad.ref(:output, {:endpoint, subscription.endpoint_id})) + |> via_in(Pad.ref(:input, subscription.track_id)) + |> to({:endpoint, subscription.endpoint_id}) ] end @@ -1136,10 +1125,7 @@ defmodule Membrane.RTC.Engine do forward: {endpoint_name, {:new_tracks, outbound_tracks}} ] - state = - state - |> put_in([:waiting_for_linking, endpoint_id], MapSet.new()) - |> put_in([:subscriptions, endpoint_id], %{}) + state = put_in(state, [:subscriptions, endpoint_id], %{}) spec = %ParentSpec{ node: opts[:node], @@ -1189,10 +1175,16 @@ defmodule Membrane.RTC.Engine do end end + # TODO `peer_id` -> `endpoint_id` defp do_remove_endpoint(peer_id, ctx, state) do if Map.has_key?(state.endpoints, peer_id) do {endpoint, state} = pop_in(state, [:endpoints, peer_id]) - {_waiting, state} = pop_in(state, [:waiting_for_linking, peer_id]) + + state = + update_in(state, [:waiting_subscriptions, peer_id], fn subscriptions -> + Enum.filter(subscriptions, fn s -> s.endpoint_id != peer_id end) + end) + {_subscriptions, state} = pop_in(state, [:subscriptions, peer_id]) tracks = Enum.map(Endpoint.get_tracks(endpoint), &%Track{&1 | active?: true}) diff --git a/lib/membrane_rtc_engine/subscription.ex b/lib/membrane_rtc_engine/subscription.ex new file mode 100644 index 000000000..4b7ee3aef --- /dev/null +++ b/lib/membrane_rtc_engine/subscription.ex @@ -0,0 +1,32 @@ +defmodule Membrane.RTC.Engine.Subscription do + @moduledoc false + # Module representing subscription for track + alias Membrane.RTC.Engine.{Endpoint, Track} + + @typedoc """ + Subscription options. + + * `default_simulcast_encoding` - initial encoding that + endpoint making subscription wants to receive + """ + @type opts_t :: [default_simulcast_encoding: String.t()] + + @typedoc """ + * `endpoint_id` - id of endpoint making subscription for track + * `track_id` - id of track endpoint subscribes for + * `format` - format of track endpoint subscribes for + * `status` - status of subscription. Subscription is `active` when + given track is linked to given endpoint and `pending` otherwise + * `opts` - additional options + """ + @type t() :: %___MODULE__{ + endpoint_id: Endpoint.id(), + track_id: Track.id(), + format: Track.format(), + status: :created | :pending | :active, + opts: opts_t() + } + + @enforce_keys [:endpoint_id, :track_id, :format, :status, :opts] + defstruct @enforce_keys +end From 4dda8fa2a3e04876494874d6bf2ea2bdb09e6439 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20=C5=9Aled=C5=BA?= Date: Fri, 8 Apr 2022 17:03:28 +0200 Subject: [PATCH 06/19] Refactor code --- .../endpoints/webrtc/simulcast_tee.ex | 3 +- lib/membrane_rtc_engine/engine.ex | 144 +++++++++--------- lib/membrane_rtc_engine/subscription.ex | 26 ++-- 3 files changed, 82 insertions(+), 91 deletions(-) diff --git a/lib/membrane_rtc_engine/endpoints/webrtc/simulcast_tee.ex b/lib/membrane_rtc_engine/endpoints/webrtc/simulcast_tee.ex index b1f4d66d2..7a9f07486 100644 --- a/lib/membrane_rtc_engine/endpoints/webrtc/simulcast_tee.ex +++ b/lib/membrane_rtc_engine/endpoints/webrtc/simulcast_tee.ex @@ -30,7 +30,8 @@ defmodule Membrane.RTC.Engine.Endpoint.WebRTC.SimulcastTee do spec: String.t() | nil, default: nil, description: """ - TODO + Initial encoding that should be sent via this pad. + `nil` means that the best possible encoding should be used. """ ] ] diff --git a/lib/membrane_rtc_engine/engine.ex b/lib/membrane_rtc_engine/engine.ex index d15417cb1..a00074c24 100644 --- a/lib/membrane_rtc_engine/engine.ex +++ b/lib/membrane_rtc_engine/engine.ex @@ -165,10 +165,10 @@ defmodule Membrane.RTC.Engine do Where `caps` are `t:Membrane.Caps.t/0` or `:any`. * publish for some tracks using actions `t:publish_action_t/0` and subscribe for some tracks using - function `#{inspect(__MODULE__)}.subscribe/3`. The first will cause RTC Engine to send a message in + function `#{inspect(__MODULE__)}.subscribe/5`. The first will cause RTC Engine to send a message in form of `{:new_tracks, tracks}` where `tracks` is a list of `t:#{inspect(__MODULE__)}.Track.t/0` to all other Endpoints. When an Endpoint receives such a message it can subscribe for new tracks by - using `#{inspect(__MODULE__)}.subscribe/3` function. An Endpoint will be notified about track readiness + using `#{inspect(__MODULE__)}.subscribe/5` function. An Endpoint will be notified about track readiness it subscribed for in `c:Membrane.Bin.handle_pad_added/3` callback. An example implementation of `handle_pad_added` callback can look like this @@ -240,6 +240,14 @@ defmodule Membrane.RTC.Engine do node: node() ] + @typedoc """ + Subscription options. + + * `default_simulcast_encoding` - initial encoding that + endpoint making subscription wants to receive + """ + @type subscription_opts_t() :: [default_simulcast_encoding: String.t()] + @typedoc """ Membrane action that will cause RTC Engine to publish some message to all other endpoints. """ @@ -406,8 +414,6 @@ defmodule Membrane.RTC.Engine do :ok end - @type subscription_opts_t() :: [default_simulcast_encoding: String.t()] - @doc """ Subscribes endpoint for tracks. @@ -588,14 +594,12 @@ defmodule Membrane.RTC.Engine do endpoint_id: endpoint_id, track_id: track_id, format: format, - opts: opts, - status: :created + opts: opts } case check_subscription(subscription, state) do :ok -> {links, state} = try_fulfill_subscription(subscription, ctx, state) - parent_spec = %ParentSpec{links: links, log_metadata: [rtc: state.id]} send(endpoint_pid, {ref, :ok}) {{:ok, [spec: parent_spec]}, state} @@ -639,34 +643,6 @@ defmodule Membrane.RTC.Engine do {:ok, state} end - defp check_subscription(subscription, state) do - # checks whether subscription is proper - track = - state.endpoints - |> Map.values() - |> Enum.flat_map(&Endpoint.get_tracks/1) - |> Map.new(&{&1.id, &1}) - |> Map.get(subscription.track_id) - - default_simulcast_encoding = subscription.opts[:default_simulcast_encoding] - - cond do - track == nil -> - {:error, :invalid_track_id} - - subscription.format not in track.format -> - {:error, :invalid_format} - - # TODO maybe simulcast_encodings should be always a list - default_simulcast_encoding != nil and track.simulcast_encodings != nil and - default_simulcast_encoding not in track.simulcast_encodings -> - {:error, :invalid_default_simulcast_encoding} - - true -> - :ok - end - end - defp handle_media_event(%{type: :join, data: data}, peer_id, _ctx, state) do peer = Peer.new(peer_id, data.metadata || %{}) dispatch(%Message.NewPeer{rtc_engine: self(), peer: peer}) @@ -995,45 +971,61 @@ defmodule Membrane.RTC.Engine do {endpoint_to_tee_links ++ links, state} end + defp check_subscription(subscription, state) do + # checks whether subscription is correct + track = get_track(subscription.track_id, state.endpoints) + default_simulcast_encoding = subscription.opts[:default_simulcast_encoding] + + cond do + track == nil -> + {:error, :invalid_track_id} + + subscription.format not in track.format -> + {:error, :invalid_format} + + # TODO maybe simulcast_encodings should be always a list + default_simulcast_encoding != nil and track.simulcast_encodings != nil and + default_simulcast_encoding not in track.simulcast_encodings -> + {:error, :invalid_default_simulcast_encoding} + + true -> + :ok + end + end + defp try_fulfill_subscription(subscription, ctx, state) do + # if tee for this track is already spawned, fulfill subscription + # otherwise, save subscription as pending, we will fulfill it + # when tee appears if Map.has_key?(ctx.children, {:tee, subscription.track_id}) do fulfill_subscription(subscription, ctx, state) else - subscription = %Subscription{subscription | status: :pending} state = update_in(state, [:pending_subscriptions], &[subscription | &1]) {[], state} end end defp fulfill_subscription(%Subscription{format: :raw} = s, ctx, state) do - links = + raw_format_links = if Map.has_key?(ctx.children, {:raw_format_tee, s.track_id}) do [] else prepare_raw_format_links(s.track_id, state) - end ++ - prepare_track_to_endpoint_links(s, :raw_format_tee, state) + end - state = - update_in( - state, - [:subscriptions, s.endpoint_id], - &Map.put(&1, s.track_id, %Subscription{s | status: :active}) - ) + {links, state} = do_fulfill_subscription(s, :raw_format_tee, state) - {links, state} + {raw_format_links ++ links, state} end - defp fulfill_subscription(s, _ctx, state) do - links = prepare_track_to_endpoint_links(s, :tee, state) - - state = - update_in( - state, - [:subscriptions, s.endpoint_id], - &Map.put(&1, s.track_id, %Subscription{s | status: :active}) - ) + defp fulfill_subscription(%Subscription{format: _remote_format} = s, _ctx, state) do + do_fulfill_subscription(s, :tee, state) + end + defp do_fulfill_subscription(s, tee_kind, state) do + links = prepare_track_to_endpoint_links(s, tee_kind, state) + s = %Subscription{s | status: :active} + state = update_in(state, [:subscriptions, s.endpoint_id], &Map.put(&1, s.track_id, s)) {links, state} end @@ -1046,12 +1038,9 @@ defmodule Membrane.RTC.Engine do end defp prepare_track_to_endpoint_links(subscription, :tee, state) do - track = - state.endpoints - |> Map.values() - |> Enum.flat_map(&Endpoint.get_tracks/1) - |> Map.new(&{&1.id, &1}) - |> Map.get(subscription.track_id) + # if someone subscribed for simulcast track, prepare options + # for SimulcastTee + track = get_track(subscription.track_id, state.endpoints) options = if track.simulcast_encodings != nil do @@ -1142,6 +1131,14 @@ defmodule Membrane.RTC.Engine do defp get_outbound_tracks(endpoints), do: Enum.flat_map(endpoints, fn {_id, endpoint} -> Endpoint.get_tracks(endpoint) end) + defp get_track(track_id, endpoints) do + endpoints + |> Map.values() + |> Enum.flat_map(&Endpoint.get_tracks/1) + |> Map.new(&{&1.id, &1}) + |> Map.get(track_id) + end + defp handle_remove_peer(peer_id, reason, ctx, state) do case do_remove_peer(peer_id, reason, ctx, state) do {:absent, [], state} -> @@ -1175,28 +1172,27 @@ defmodule Membrane.RTC.Engine do end end - # TODO `peer_id` -> `endpoint_id` - defp do_remove_endpoint(peer_id, ctx, state) do - if Map.has_key?(state.endpoints, peer_id) do - {endpoint, state} = pop_in(state, [:endpoints, peer_id]) + defp do_remove_endpoint(endpoint_id, ctx, state) do + if Map.has_key?(state.endpoints, endpoint_id) do + {endpoint, state} = pop_in(state, [:endpoints, endpoint_id]) + {_subscriptions, state} = pop_in(state, [:subscriptions, endpoint_id]) state = - update_in(state, [:waiting_subscriptions, peer_id], fn subscriptions -> - Enum.filter(subscriptions, fn s -> s.endpoint_id != peer_id end) + update_in(state, [:pending_subscriptions, endpoint_id], fn subscriptions -> + Enum.filter(subscriptions, fn s -> s.endpoint_id != endpoint_id end) end) - {_subscriptions, state} = pop_in(state, [:subscriptions, peer_id]) tracks = Enum.map(Endpoint.get_tracks(endpoint), &%Track{&1 | active?: true}) - tracks_msgs = do_publish({:remove_tracks, tracks}, {:endpoint, peer_id}, state) + tracks_msgs = do_publish({:remove_tracks, tracks}, {:endpoint, endpoint_id}, state) - endpoint_bin = ctx.children[{:endpoint, peer_id}] + endpoint_bin = ctx.children[{:endpoint, endpoint_id}] actions = if endpoint_bin == nil or endpoint_bin.terminating? do [] else - [remove_child: find_children_for_endpoint(endpoint, peer_id, ctx)] + [remove_child: find_children_for_endpoint(endpoint, endpoint_id, ctx)] end {:present, tracks_msgs ++ actions, state} @@ -1205,13 +1201,13 @@ defmodule Membrane.RTC.Engine do end end - defp find_children_for_endpoint(endpoint, peer_id, ctx) do + defp find_children_for_endpoint(endpoint, endpoint_id, ctx) do children = endpoint |> Endpoint.get_tracks() - |> Enum.flat_map(fn track -> get_track_elements(peer_id, track.id, ctx) end) + |> Enum.flat_map(fn track -> get_track_elements(endpoint_id, track.id, ctx) end) - [endpoint: peer_id] ++ children + [endpoint: endpoint_id] ++ children end defp get_track_elements(endpoint_id, track_id, ctx) do diff --git a/lib/membrane_rtc_engine/subscription.ex b/lib/membrane_rtc_engine/subscription.ex index 4b7ee3aef..fd22a5789 100644 --- a/lib/membrane_rtc_engine/subscription.ex +++ b/lib/membrane_rtc_engine/subscription.ex @@ -1,32 +1,26 @@ defmodule Membrane.RTC.Engine.Subscription do @moduledoc false # Module representing subscription for track + alias Membrane.RTC.Engine alias Membrane.RTC.Engine.{Endpoint, Track} @typedoc """ - Subscription options. - - * `default_simulcast_encoding` - initial encoding that - endpoint making subscription wants to receive - """ - @type opts_t :: [default_simulcast_encoding: String.t()] - - @typedoc """ - * `endpoint_id` - id of endpoint making subscription for track + * `endpoint_id` - id of endpoint making subscription * `track_id` - id of track endpoint subscribes for * `format` - format of track endpoint subscribes for - * `status` - status of subscription. Subscription is `active` when - given track is linked to given endpoint and `pending` otherwise + * `status` - status of subscription. Subscription is + `active` when track some endpoint subscribed for is linked + to this endpoint and `pending` otherwise * `opts` - additional options """ - @type t() :: %___MODULE__{ + @type t() :: %__MODULE__{ endpoint_id: Endpoint.id(), track_id: Track.id(), format: Track.format(), - status: :created | :pending | :active, - opts: opts_t() + status: :pending | :active, + opts: Engine.subscription_opts_t() } - @enforce_keys [:endpoint_id, :track_id, :format, :status, :opts] - defstruct @enforce_keys + @enforce_keys [:endpoint_id, :track_id, :format] + defstruct @enforce_keys ++ [status: :pending, opts: []] end From 50e07ae2c171f64552c1a037a9f366674c60e147 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20=C5=9Aled=C5=BA?= Date: Mon, 11 Apr 2022 13:24:25 +0200 Subject: [PATCH 07/19] Update guide for simulcast. Add SimulcastConfig --- guides/simulcast.md | 36 +++++++++++++------ .../endpoints/webrtc/simulcast_config.ex | 24 +++++++++++++ .../endpoints/webrtc_endpoint.ex | 22 +++++------- 3 files changed, 58 insertions(+), 24 deletions(-) create mode 100644 lib/membrane_rtc_engine/endpoints/webrtc/simulcast_config.ex diff --git a/guides/simulcast.md b/guides/simulcast.md index ff3caa350..746034657 100644 --- a/guides/simulcast.md +++ b/guides/simulcast.md @@ -4,21 +4,19 @@ Simulcast is a technique where a client sends multiple encodings of the same vid * receiver awailable bandwidth * receiver preferences (e.g. explicit request to receive video in HD resolution instead of FHD) -* UI layaout (e.g. videos being displayed in smaller video tiles will be sent in lower resolution) +* UI layaout (e.g. videos being displayed in smaller video tiles will be sent in a lower resolution) -At the moment, Membrane supports only receiver preferences i.e. receiver can chose which encoding it is willing to receive. Additionaly, sender can turn off/on specific encoding. Membrane RTC Engine will detect changes and switch to another available encoding.Simulcast is a technique where client sends multiple encodings (different resolutions) of the same -video to a server and the server forwards proper encoding to each other client basing on -client preferences, network bandwidth or UI layaout. +At the moment, Membrane supports only receiver preferences i.e. receiver can chose which encoding it is willing to receive. Additionaly, sender can turn off/on specific encoding. Membrane RTC Engine will detect changes and switch to another available encoding. ## Turning simulcast on/off -On the client side simulcast can be enabled while adding new track e.g.: +On the client side simulcast can be enabled while adding a new track e.g.: ```ts // create MembraneWebRTC class instance // ... // add simulcasted track - let trackId = webrtc.addTrack(track, stream, {}, true); + let trackId = webrtc.addTrack(track, stream, {}, {enabled: true, active_encodings: ["l", "m", "h"]}); ``` This will add a new track that will be sent in three versions: @@ -26,7 +24,8 @@ This will add a new track that will be sent in three versions: * original scaled down by 2 (identified as `m`) * original scaled down by 4 (identified as `l`) -Those settings are not configurable at the moment. +You can turn off some of the encodings by excluding them from `active_encodings` list. +Encodings that are turned off might still be enabled using `enableTrackEncoding` function. > #### Minimal required resolution {: .warning} > @@ -36,9 +35,24 @@ Those settings are not configurable at the moment. > passed to [`getUserMedia`](https://developer.mozilla.org/en-US/docs/Web/API/MediaDevices/getUserMedia) > or [`getDisplayMedia`](https://developer.mozilla.org/en-US/docs/Web/API/MediaDevices/getDisplayMedia). -On the server side, simulcast can be disabled while adding new WebRTC Endpoint by setting its `simulcast?` -option to `false`. -This will result in rejecting all incoming simulcast tracks i.e. client will not send them. +On the server side, simulcast can be configured while adding new WebRTC Endpoint by setting its `simulcast_config` option. + +For example + +```elixir +%WebRTC{ + rtc_engine: rtc_engine, + # ... + simulcast_config: %SimulcastConfig{ + enabled: true, + default_encoding: fn %Track{simulcast_encodings: _simulcast_encodings} -> "m" end + } +} +``` + +Here we turn simulcast on and choose medium encoding for each track to be forwarded to the client. + +On the other hand, setting `enabled` to `false` will result in rejecting all incoming simulcast tracks i.e. client will not send them to the server. ## Disabling and enabling specific track encoding @@ -58,7 +72,7 @@ Disabled encoding can be turned on again using `enableTrackEncoding` function. Membrane RTC Engine tracks encoding activity. Therefore, when some encoding is turned off, RTC Engine will detect this and switch to the highest awailable encoding. -If turned off encoding returns, RTC Engine will switch back to it. +When disabled encoding becomes active again, RTC Engine will switch back to it. ## Selecting encoding to receive diff --git a/lib/membrane_rtc_engine/endpoints/webrtc/simulcast_config.ex b/lib/membrane_rtc_engine/endpoints/webrtc/simulcast_config.ex new file mode 100644 index 000000000..66a4c986c --- /dev/null +++ b/lib/membrane_rtc_engine/endpoints/webrtc/simulcast_config.ex @@ -0,0 +1,24 @@ +defmodule Membrane.RTC.Engine.Endpoint.WebRTC.SimulcastConfig do + @moduledoc """ + Module representing simulcast configuration for WebRTC endpoint. + """ + + alias Membrane.RTC.Engine.Track + + @typedoc """ + * `enabled` - whether to accept simulcast tracks or not. + Setting this to false will result in rejecting all incoming + simulcast tracks i.e. client will not send them. + * `default_encoding` - function used to determine initial encoding + this endpoint is willing to receive for given track. + It is called for each track this endpoint subscribes for. + """ + @type t() :: %__MODULE__{ + enabled: boolean(), + default_encoding: (Track.t() -> String.t()) + } + defstruct [ + :default_encoding, + enabled: false + ] +end diff --git a/lib/membrane_rtc_engine/endpoints/webrtc_endpoint.ex b/lib/membrane_rtc_engine/endpoints/webrtc_endpoint.ex index 8ace5684a..bf7643a7e 100644 --- a/lib/membrane_rtc_engine/endpoints/webrtc_endpoint.ex +++ b/lib/membrane_rtc_engine/endpoints/webrtc_endpoint.ex @@ -13,6 +13,7 @@ defmodule Membrane.RTC.Engine.Endpoint.WebRTC do alias Membrane.RTC.Engine alias ExSDP.Attribute.FMTP alias ExSDP.Attribute.RTPMapping + alias Membrane.RTC.Engine.Endpoint.WebRTC.SimulcastConfig require Membrane.Logger @@ -147,15 +148,10 @@ defmodule Membrane.RTC.Engine.Endpoint.WebRTC do For more information refer to RFC 5104 section 4.3.1. """ ], - simulcast?: [ - spec: boolean(), - default: true, - description: """ - Whether to accept simulcast tracks or not. - - Setting this to false will result in rejecting all incoming - simulcast tracks i.e. client will not send them. - """ + simulcast_config: [ + spec: SimulcastConfig.t(), + default: %SimulcastConfig{}, + description: "Simulcast configuration" ] def_input_pad :input, @@ -185,7 +181,7 @@ defmodule Membrane.RTC.Engine.Endpoint.WebRTC do trace_metadata: [name: opts.ice_name], rtcp_receiver_report_interval: opts.rtcp_receiver_report_interval, rtcp_sender_report_interval: opts.rtcp_sender_report_interval, - simulcast?: opts.simulcast? + simulcast?: opts.simulcast_config.enabled } spec = %ParentSpec{ @@ -203,7 +199,8 @@ defmodule Membrane.RTC.Engine.Endpoint.WebRTC do integrated_turn_domain: opts.integrated_turn_domain, owner: opts.owner, video_tracks_limit: opts.video_tracks_limit, - rtcp_fir_interval: opts.rtcp_fir_interval + rtcp_fir_interval: opts.rtcp_fir_interval, + simulcast_config: opts.simulcast_config } {{:ok, spec: spec, log_metadata: opts.log_metadata}, state} @@ -245,9 +242,8 @@ defmodule Membrane.RTC.Engine.Endpoint.WebRTC do {:endpoint, endpoint_id} = ctx.name - opts = [default_simulcast_encoding: "m"] - Enum.each(new_outbound_tracks, fn track -> + opts = [default_simulcast_encoding: state.simulcast_config.default_encoding.(track)] case Engine.subscribe(state.rtc_engine, endpoint_id, track.id, :RTP, opts) do :ok -> :ok From 33570579d8ea0c7ca964c1c89b4ed5f77645a43d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20=C5=9Aled=C5=BA?= Date: Mon, 11 Apr 2022 13:24:45 +0200 Subject: [PATCH 08/19] Fix cleaning pending subscriptions --- lib/membrane_rtc_engine/engine.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/membrane_rtc_engine/engine.ex b/lib/membrane_rtc_engine/engine.ex index a00074c24..cda829c01 100644 --- a/lib/membrane_rtc_engine/engine.ex +++ b/lib/membrane_rtc_engine/engine.ex @@ -1178,7 +1178,7 @@ defmodule Membrane.RTC.Engine do {_subscriptions, state} = pop_in(state, [:subscriptions, endpoint_id]) state = - update_in(state, [:pending_subscriptions, endpoint_id], fn subscriptions -> + update_in(state, [:pending_subscriptions], fn subscriptions -> Enum.filter(subscriptions, fn s -> s.endpoint_id != endpoint_id end) end) From 1685bf177cfed3473d8073a2ad5389878d87b8bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20=C5=9Aled=C5=BA?= Date: Mon, 11 Apr 2022 13:24:58 +0200 Subject: [PATCH 09/19] Format code --- lib/membrane_rtc_engine/endpoints/webrtc_endpoint.ex | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/membrane_rtc_engine/endpoints/webrtc_endpoint.ex b/lib/membrane_rtc_engine/endpoints/webrtc_endpoint.ex index bf7643a7e..26c9a5f4c 100644 --- a/lib/membrane_rtc_engine/endpoints/webrtc_endpoint.ex +++ b/lib/membrane_rtc_engine/endpoints/webrtc_endpoint.ex @@ -244,6 +244,7 @@ defmodule Membrane.RTC.Engine.Endpoint.WebRTC do Enum.each(new_outbound_tracks, fn track -> opts = [default_simulcast_encoding: state.simulcast_config.default_encoding.(track)] + case Engine.subscribe(state.rtc_engine, endpoint_id, track.id, :RTP, opts) do :ok -> :ok From ca667befd92d9b45b8717315321b6266589115e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20=C5=9Aled=C5=BA?= Date: Mon, 11 Apr 2022 13:32:22 +0200 Subject: [PATCH 10/19] Refactor code, polish docs --- assets/js/membraneWebRTC.ts | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/assets/js/membraneWebRTC.ts b/assets/js/membraneWebRTC.ts index 527ba981b..c003e51e7 100644 --- a/assets/js/membraneWebRTC.ts +++ b/assets/js/membraneWebRTC.ts @@ -89,8 +89,14 @@ export interface TrackContext { /** * Type describing possible track encodings. - * At the moment, if track was added as a simulcast one ({@link addTrack}) - * it will be transmitted to the server in three versions - low, medium and high. + * `"h"` - original encoding + * `"m"` - original encoding scaled down by 2 + * `"l"` - original encoding scaled down by 4 + * + * Notice that to make all encodings work, the initial + * resolution has to be at least 1280x720. + * In other case, browser might not be able to scale + * some encodings down. */ export type TrackEncoding = "l" | "m" | "h"; @@ -308,7 +314,7 @@ export class MembraneWebRTC { stream: null, track: null, trackId, - simulcastConfig: { enabled: false, encodings: [], active_encodings: [] }, + simulcastConfig: { enabled: false, active_encodings: [] }, metadata, peer, maxBandwidth: 0, From 2fba29641c2517fce9eb7f8528bcd5e2301424c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20=C5=9Aled=C5=BA?= Date: Mon, 11 Apr 2022 13:44:59 +0200 Subject: [PATCH 11/19] Provide default implementation of default_encoding function --- .../endpoints/webrtc/simulcast_config.ex | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/lib/membrane_rtc_engine/endpoints/webrtc/simulcast_config.ex b/lib/membrane_rtc_engine/endpoints/webrtc/simulcast_config.ex index 66a4c986c..01c6747ff 100644 --- a/lib/membrane_rtc_engine/endpoints/webrtc/simulcast_config.ex +++ b/lib/membrane_rtc_engine/endpoints/webrtc/simulcast_config.ex @@ -12,13 +12,19 @@ defmodule Membrane.RTC.Engine.Endpoint.WebRTC.SimulcastConfig do * `default_encoding` - function used to determine initial encoding this endpoint is willing to receive for given track. It is called for each track this endpoint subscribes for. + If not provided, the highest possible encoding will be used. """ @type t() :: %__MODULE__{ enabled: boolean(), default_encoding: (Track.t() -> String.t()) } - defstruct [ - :default_encoding, - enabled: false - ] + defstruct enabled: false, + default_encoding: &__MODULE__.default_encoding/1 + + @doc """ + Default implementation of `default_encoding` function in `t:t/0`. + + Returns nil, which will result in choosing the highest possible encoding. + """ + def default_encoding(_track), do: nil end From a3d8419407930f5a3dd443e948f3201bdc490617 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20=C5=9Aled=C5=BA?= Date: Mon, 11 Apr 2022 13:54:29 +0200 Subject: [PATCH 12/19] Make credo happy --- lib/membrane_rtc_engine/endpoints/webrtc/simulcast_config.ex | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/membrane_rtc_engine/endpoints/webrtc/simulcast_config.ex b/lib/membrane_rtc_engine/endpoints/webrtc/simulcast_config.ex index 01c6747ff..7c0e37a6f 100644 --- a/lib/membrane_rtc_engine/endpoints/webrtc/simulcast_config.ex +++ b/lib/membrane_rtc_engine/endpoints/webrtc/simulcast_config.ex @@ -16,7 +16,7 @@ defmodule Membrane.RTC.Engine.Endpoint.WebRTC.SimulcastConfig do """ @type t() :: %__MODULE__{ enabled: boolean(), - default_encoding: (Track.t() -> String.t()) + default_encoding: (Track.t() -> String.t() | nil) } defstruct enabled: false, default_encoding: &__MODULE__.default_encoding/1 @@ -26,5 +26,6 @@ defmodule Membrane.RTC.Engine.Endpoint.WebRTC.SimulcastConfig do Returns nil, which will result in choosing the highest possible encoding. """ + @spec default_encoding(Track.t()) :: nil def default_encoding(_track), do: nil end From 26b2be86df54dab470ca0d1fd27bc2d4ce5eefac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20=C5=9Aled=C5=BA?= Date: Mon, 11 Apr 2022 23:45:30 +0200 Subject: [PATCH 13/19] Refactor `addTrackToConnection` --- assets/js/membraneWebRTC.ts | 33 ++++++++++++++++++++++++++------- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/assets/js/membraneWebRTC.ts b/assets/js/membraneWebRTC.ts index c003e51e7..a71e67b11 100644 --- a/assets/js/membraneWebRTC.ts +++ b/assets/js/membraneWebRTC.ts @@ -507,17 +507,35 @@ export class MembraneWebRTC { } private addTrackToConnection = (trackContext: TrackContext) => { + let transceiverConfig = this.createTransceiverConfig(trackContext); const track = trackContext.track!!; + this.connection!.addTransceiver(track, transceiverConfig); + }; + + private createTransceiverConfig(trackContext: TrackContext): RTCRtpTransceiverInit { let transceiverConfig: RTCRtpTransceiverInit; + if (trackContext.track!!.kind === "audio") { + transceiverConfig = this.createAudioTransceiverConfig(trackContext); + } else { + transceiverConfig = this.createVideoTransceiverConfig(trackContext); + } + + return transceiverConfig; + } + + private createAudioTransceiverConfig(_trackContext: TrackContext): RTCRtpTransceiverInit { + return { direction: "sendonly" }; + } + + private createVideoTransceiverConfig(trackContext: TrackContext): RTCRtpTransceiverInit { + let transceiverConfig: RTCRtpTransceiverInit; if (trackContext.simulcastConfig.enabled) { - transceiverConfig = - track.kind === "audio" ? { direction: "sendonly" } : simulcastTransceiverConfig; + transceiverConfig = simulcastTransceiverConfig; + let trackActiveEncodings = trackContext.simulcastConfig.active_encodings; let disabledTrackEncodings: TrackEncoding[] = []; transceiverConfig.sendEncodings?.forEach((encoding) => { - if ( - trackContext.simulcastConfig.active_encodings.includes(encoding.rid! as TrackEncoding) - ) { + if (trackActiveEncodings.includes(encoding.rid! as TrackEncoding)) { encoding.active = true; } else { disabledTrackEncodings.push(encoding.rid! as TrackEncoding); @@ -538,8 +556,9 @@ export class MembraneWebRTC { transceiverConfig.sendEncodings![0].maxBitrate = trackContext.maxBandwidth * 1024; // convert to bps; } } - this.connection!.addTransceiver(track, transceiverConfig); - }; + + return transceiverConfig; + } /** * Replaces a track that is being sent to the RTC Engine. From 64d57e9a08279b8542ed5a94c1a8f2e5085876b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20=C5=9Aled=C5=BA?= Date: Tue, 12 Apr 2022 12:31:19 +0200 Subject: [PATCH 14/19] Fix HLS endpoint --- lib/membrane_rtc_engine/endpoints/hls_endpoint.ex | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/lib/membrane_rtc_engine/endpoints/hls_endpoint.ex b/lib/membrane_rtc_engine/endpoints/hls_endpoint.ex index 0a71df4d2..ce9421067 100644 --- a/lib/membrane_rtc_engine/endpoints/hls_endpoint.ex +++ b/lib/membrane_rtc_engine/endpoints/hls_endpoint.ex @@ -80,14 +80,13 @@ if Enum.all?( @impl true def handle_other({:new_tracks, tracks}, ctx, state) do - new_tracks = Map.new(tracks, &{&1.id, &1}) - {:endpoint, endpoint_id} = ctx.name + tracks = Enum.filter(tracks, fn track -> :raw in track.format end) Enum.each(tracks, fn track -> - case Engine.subscribe(state.rtc_engine, endpoint_id, track.id, :RTP) do + case Engine.subscribe(state.rtc_engine, endpoint_id, track.id, :raw) do :ok -> - {:ok, Map.update!(state, :tracks, &Map.merge(&1, new_tracks))} + {:ok, Map.update!(state, :tracks, &Map.put(&1, track.id, track))} {:error, reason} -> raise "Couldn't subscribe for track: #{inspect(track.id)}. Reason: #{inspect(reason)}" From 37c754d7a6d4c03a1b14be5ecf7f9066fdb819d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20=C5=9Aled=C5=BA?= Date: Tue, 12 Apr 2022 12:36:16 +0200 Subject: [PATCH 15/19] Refacto engine.ex --- lib/membrane_rtc_engine/engine.ex | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/membrane_rtc_engine/engine.ex b/lib/membrane_rtc_engine/engine.ex index cda829c01..45984b919 100644 --- a/lib/membrane_rtc_engine/engine.ex +++ b/lib/membrane_rtc_engine/engine.ex @@ -959,15 +959,15 @@ defmodule Membrane.RTC.Engine do end ] - {pending_subscriptions, rest} = + {pending_track_subscriptions, pending_rest_subscriptions} = Enum.split_with(state.pending_subscriptions, fn s -> s.track_id == track.id end) {links, state} = - Enum.flat_map_reduce(pending_subscriptions, state, fn subscription, state -> + Enum.flat_map_reduce(pending_track_subscriptions, state, fn subscription, state -> fulfill_subscription(subscription, ctx, state) end) - state = %{state | pending_subscriptions: rest} + state = %{state | pending_subscriptions: pending_rest_subscriptions} {endpoint_to_tee_links ++ links, state} end From 9bc679d8d1845e94b84ddbf211375c8db4474270 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20=C5=9Aled=C5=BA?= Date: Tue, 12 Apr 2022 12:40:19 +0200 Subject: [PATCH 16/19] Refactor HLS endpoint --- lib/membrane_rtc_engine/endpoints/hls_endpoint.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/membrane_rtc_engine/endpoints/hls_endpoint.ex b/lib/membrane_rtc_engine/endpoints/hls_endpoint.ex index ce9421067..9ee3c8367 100644 --- a/lib/membrane_rtc_engine/endpoints/hls_endpoint.ex +++ b/lib/membrane_rtc_engine/endpoints/hls_endpoint.ex @@ -86,7 +86,7 @@ if Enum.all?( Enum.each(tracks, fn track -> case Engine.subscribe(state.rtc_engine, endpoint_id, track.id, :raw) do :ok -> - {:ok, Map.update!(state, :tracks, &Map.put(&1, track.id, track))} + {:ok, put_in(state, [:tracks, track.id], track)} {:error, reason} -> raise "Couldn't subscribe for track: #{inspect(track.id)}. Reason: #{inspect(reason)}" From 44c109110c28da6c85a5dcb656e532b8a845860b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20=C5=9Aled=C5=BA?= Date: Thu, 14 Apr 2022 19:55:46 +0200 Subject: [PATCH 17/19] Refactor code --- lib/membrane_rtc_engine/engine.ex | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/lib/membrane_rtc_engine/engine.ex b/lib/membrane_rtc_engine/engine.ex index 45984b919..abbe61251 100644 --- a/lib/membrane_rtc_engine/engine.ex +++ b/lib/membrane_rtc_engine/engine.ex @@ -960,7 +960,7 @@ defmodule Membrane.RTC.Engine do ] {pending_track_subscriptions, pending_rest_subscriptions} = - Enum.split_with(state.pending_subscriptions, fn s -> s.track_id == track.id end) + Enum.split_with(state.pending_subscriptions, &(&1.track_id == track.id)) {links, state} = Enum.flat_map_reduce(pending_track_subscriptions, state, fn subscription, state -> @@ -1005,27 +1005,29 @@ defmodule Membrane.RTC.Engine do end end - defp fulfill_subscription(%Subscription{format: :raw} = s, ctx, state) do + defp fulfill_subscription(%Subscription{format: :raw} = subscription, ctx, state) do raw_format_links = - if Map.has_key?(ctx.children, {:raw_format_tee, s.track_id}) do + if Map.has_key?(ctx.children, {:raw_format_tee, subscription.track_id}) do [] else - prepare_raw_format_links(s.track_id, state) + prepare_raw_format_links(subscription.track_id, state) end - {links, state} = do_fulfill_subscription(s, :raw_format_tee, state) + {links, state} = do_fulfill_subscription(subscription, :raw_format_tee, state) {raw_format_links ++ links, state} end - defp fulfill_subscription(%Subscription{format: _remote_format} = s, _ctx, state) do - do_fulfill_subscription(s, :tee, state) + defp fulfill_subscription(%Subscription{format: _remote_format} = subscription, _ctx, state) do + do_fulfill_subscription(subscription, :tee, state) end defp do_fulfill_subscription(s, tee_kind, state) do links = prepare_track_to_endpoint_links(s, tee_kind, state) - s = %Subscription{s | status: :active} - state = update_in(state, [:subscriptions, s.endpoint_id], &Map.put(&1, s.track_id, s)) + subscription = %Subscription{s | status: :active} + endpoint_id = subscription.endpoint_id + track_id = subscription.track_id + state = put_in(state, [:subscriptions, endpoint_id, track_id], subscription) {links, state} end @@ -1179,7 +1181,7 @@ defmodule Membrane.RTC.Engine do state = update_in(state, [:pending_subscriptions], fn subscriptions -> - Enum.filter(subscriptions, fn s -> s.endpoint_id != endpoint_id end) + Enum.filter(subscriptions, &(&1.endpoint_id != endpoint_id)) end) tracks = Enum.map(Endpoint.get_tracks(endpoint), &%Track{&1 | active?: true}) From 54614ca15a0f7dc7cd53d6c66d2cd7faca70830d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20=C5=9Aled=C5=BA?= Date: Thu, 14 Apr 2022 20:02:11 +0200 Subject: [PATCH 18/19] Refactor `link_inbound_track` function --- lib/membrane_rtc_engine/engine.ex | 37 +++++++++++++++++-------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/lib/membrane_rtc_engine/engine.ex b/lib/membrane_rtc_engine/engine.ex index abbe61251..8bf49f1e2 100644 --- a/lib/membrane_rtc_engine/engine.ex +++ b/lib/membrane_rtc_engine/engine.ex @@ -839,13 +839,19 @@ defmodule Membrane.RTC.Engine do state = put_in(state, [:filters, track_id], depayloading_filter) track = state.endpoints |> Map.fetch!(endpoint_id) |> Endpoint.get_track_by_id(track_id) - {links, state} = link_inbound_track(track_id, rid, track, endpoint_id, ctx, state) + {tee_links, state} = create_tee_and_link(track_id, rid, track, endpoint_id, ctx, state) - spec = %ParentSpec{ - links: links, - crash_group: {endpoint_id, :temporary}, - log_metadata: [rtc: state.id] - } + # check if there are subscriptions for this track and fulfill them + {pending_track_subscriptions, pending_rest_subscriptions} = + Enum.split_with(state.pending_subscriptions, &(&1.track_id == track.id)) + + {subscription_links, state} = + Enum.flat_map_reduce(pending_track_subscriptions, state, fn subscription, state -> + fulfill_subscription(subscription, ctx, state) + end) + + links = tee_links ++ subscription_links + state = %{state | pending_subscriptions: pending_rest_subscriptions} state = update_in( @@ -854,6 +860,12 @@ defmodule Membrane.RTC.Engine do &Endpoint.update_track_encoding(&1, track_id, encoding) ) + spec = %ParentSpec{ + links: links, + crash_group: {endpoint_id, :temporary}, + log_metadata: [rtc: state.id] + } + {{:ok, spec: spec}, state} end @@ -925,7 +937,7 @@ defmodule Membrane.RTC.Engine do {:ok, state} end - defp link_inbound_track(track_id, rid, track, endpoint_id, ctx, state) do + defp create_tee_and_link(track_id, rid, track, endpoint_id, ctx, state) do tee = cond do rid != nil -> @@ -959,16 +971,7 @@ defmodule Membrane.RTC.Engine do end ] - {pending_track_subscriptions, pending_rest_subscriptions} = - Enum.split_with(state.pending_subscriptions, &(&1.track_id == track.id)) - - {links, state} = - Enum.flat_map_reduce(pending_track_subscriptions, state, fn subscription, state -> - fulfill_subscription(subscription, ctx, state) - end) - - state = %{state | pending_subscriptions: pending_rest_subscriptions} - {endpoint_to_tee_links ++ links, state} + {endpoint_to_tee_links, state} end defp check_subscription(subscription, state) do From 1f03816619f944b84f1612d2538e484d4a4e5eb3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20=C5=9Aled=C5=BA?= Date: Tue, 19 Apr 2022 18:47:37 +0200 Subject: [PATCH 19/19] One more refactor --- lib/membrane_rtc_engine/engine.ex | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/membrane_rtc_engine/engine.ex b/lib/membrane_rtc_engine/engine.ex index 8bf49f1e2..ee1aa1eac 100644 --- a/lib/membrane_rtc_engine/engine.ex +++ b/lib/membrane_rtc_engine/engine.ex @@ -839,7 +839,7 @@ defmodule Membrane.RTC.Engine do state = put_in(state, [:filters, track_id], depayloading_filter) track = state.endpoints |> Map.fetch!(endpoint_id) |> Endpoint.get_track_by_id(track_id) - {tee_links, state} = create_tee_and_link(track_id, rid, track, endpoint_id, ctx, state) + {tee_links, state} = create_and_link_tee(track_id, rid, track, endpoint_id, ctx, state) # check if there are subscriptions for this track and fulfill them {pending_track_subscriptions, pending_rest_subscriptions} = @@ -937,7 +937,7 @@ defmodule Membrane.RTC.Engine do {:ok, state} end - defp create_tee_and_link(track_id, rid, track, endpoint_id, ctx, state) do + defp create_and_link_tee(track_id, rid, track, endpoint_id, ctx, state) do tee = cond do rid != nil -> @@ -1025,9 +1025,9 @@ defmodule Membrane.RTC.Engine do do_fulfill_subscription(subscription, :tee, state) end - defp do_fulfill_subscription(s, tee_kind, state) do - links = prepare_track_to_endpoint_links(s, tee_kind, state) - subscription = %Subscription{s | status: :active} + defp do_fulfill_subscription(subscription, tee_kind, state) do + links = prepare_track_to_endpoint_links(subscription, tee_kind, state) + subscription = %Subscription{subscription | status: :active} endpoint_id = subscription.endpoint_id track_id = subscription.track_id state = put_in(state, [:subscriptions, endpoint_id, track_id], subscription)