Skip to content
This repository was archived by the owner on Sep 19, 2024. It is now read-only.

Adjust to simulcast #55

Merged
merged 6 commits into from
Jan 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions assets/js/const.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
const TEMPORAL_LAYERS_COUNT = 2;

export const simulcastConfig: RTCRtpTransceiverInit = {
direction: "sendonly",
sendEncodings: [
{
rid: "h",
active: true,
maxBitrate: 1_200_000,
// From 720p to 360p
scaleResolutionDownBy: 2,
// scalabilityMode: "L1T" + TEMPORAL_LAYERS_COUNT,
},
{
rid: "l",
active: true,
maxBitrate: 100_000,
// From 720p to 180p
scaleResolutionDownBy: 4,
// scalabilityMode: "L1T" + TEMPORAL_LAYERS_COUNT,
},
],
};
41 changes: 33 additions & 8 deletions assets/js/membraneWebRTC.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
serializeMediaEvent,
} from "./mediaEvent";
import { v4 as uuidv4 } from "uuid";
import { simulcastConfig } from "./const";

/**
* Interface describing Peer.
Expand Down Expand Up @@ -53,6 +54,10 @@ export interface TrackContext {
* It is WebRTC agnostic i.e. it does not contain `mid` or `stream id`.
*/
trackId: string;
/**
* Flag indicating whether track is a simulcast one or not.
*/
isSimulcast: boolean;
/**
* Any info that was passed in {@link addTrack}.
*/
Expand Down Expand Up @@ -245,6 +250,7 @@ export class MembraneWebRTC {
stream: null,
track: null,
trackId,
isSimulcast: false,
metadata,
peer,
};
Expand Down Expand Up @@ -361,23 +367,26 @@ export class MembraneWebRTC {
public addTrack(
track: MediaStreamTrack,
stream: MediaStream,
trackMetadata: any = new Map()
trackMetadata: any = new Map(),
isSimulcast: boolean = false
): string {
if (this.getPeerId() === "") throw "Cannot add tracks before being accepted by the server";
const trackId = this.getTrackId(uuidv4());
this.localTracksWithStreams.push({ track, stream });

this.localPeer.trackIdToMetadata.set(trackId, trackMetadata);
this.localTrackIdToTrack.set(trackId, {
const trackContext = {
track,
stream,
trackId,
peer: this.localPeer,
metadata: trackMetadata,
});
isSimulcast,
};
this.localTrackIdToTrack.set(trackId, trackContext);

if (this.connection) {
this.connection.addTrack(track, stream);
this.addTrackToConnection(trackContext);

this.connection
.getTransceivers()
Expand All @@ -392,6 +401,16 @@ export class MembraneWebRTC {
return trackId;
}

private addTrackToConnection = (trackContext: TrackContext) => {
if (trackContext.isSimulcast) {
const track = trackContext.track!!;
this.connection!.addTransceiver(
track,
track.kind === "audio" ? { direction: "sendonly" } : simulcastConfig
);
} else this.connection!.addTrack(trackContext.track!!, trackContext.stream!!);
};

/**
* Replaces a track that is being sent to the RTC Engine.
* @param track - Audio or video track.
Expand Down Expand Up @@ -652,14 +671,17 @@ export class MembraneWebRTC {
return trackIdToMetadata;
};

private checkIfTrackBelongToPeer = (trackId: string, peer: Peer) =>
Array.from(peer.trackIdToMetadata.keys()).some((track) => trackId.startsWith(track));

private onOfferData = async (offerData: Map<string, number>) => {
if (!this.connection) {
this.connection = new RTCPeerConnection(this.rtcConfig);
this.connection.onicecandidate = this.onLocalCandidate();

this.localTracksWithStreams.forEach(({ track, stream }) => {
this.connection!.addTrack(track, stream);
});
Array.from(this.localTrackIdToTrack.values()).forEach((trackContext) =>
this.addTrackToConnection(trackContext)
);

this.connection.getTransceivers().forEach((trans) => (trans.direction = "sendonly"));
} else {
Expand Down Expand Up @@ -705,8 +727,10 @@ export class MembraneWebRTC {

const trackId = this.midToTrackId.get(mid)!;

if (this.checkIfTrackBelongToPeer(trackId, this.localPeer)) return;

const peer = Array.from(this.idToPeer.values()).filter((peer) =>
Array.from(peer.trackIdToMetadata.keys()).includes(trackId)
this.checkIfTrackBelongToPeer(trackId, peer)
)[0];

const metadata = peer.trackIdToMetadata.get(trackId);
Expand All @@ -716,6 +740,7 @@ export class MembraneWebRTC {
peer: peer,
trackId,
metadata,
isSimulcast: false,
};

this.trackIdToTrack.set(trackId, trackContext);
Expand Down
66 changes: 57 additions & 9 deletions lib/membrane_rtc_engine/endpoints/hls_endpoint.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,33 @@ if Enum.all?(
spec: Path.t(),
description: "Path to directory under which HLS output will be saved",
default: "hls_output"
],
owner: [
spec: pid(),
description: """
Pid of parent all notifications will be send to.

These notifications are:
* `{:playlist_playable, content_type, stream_id}`
* `{:cleanup, clean_function, stream_id}`
Comment on lines +28 to +29
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Types for these notifications should be specified somewhere so you can reference them with t:some_notification/0
If theye are not let's create issue in HLS repo to fix this

"""
],
framerate: [
spec: integer(),
description: """
Framerate of tracks
""",
default: 30
]

@impl true
def handle_init(opts) do
state = %{
tracks: %{},
stream_ids: MapSet.new(),
output_directory: opts.output_directory
output_directory: opts.output_directory,
owner: opts.owner,
framerate: opts.framerate
}

{:ok, state}
Expand All @@ -49,6 +68,28 @@ if Enum.all?(
{:ok, state}
end

def handle_notification(
{:track_playable, {content_type, _track_id}},
{:hls_sink_bin, stream_id},
_ctx,
state
) do
# notify about playable just when video becomes available
send(state.owner, {:playlist_playable, content_type, stream_id})
{:ok, state}
end

def handle_notification(
{:cleanup, clean_function},
{:hls_sink_bin, stream_id},
_ctx,
state
) do
# notify about possibility to cleanup as the stream is finished.
send(state.owner, {:cleanup, clean_function, stream_id})
{:ok, state}
end

@impl true
def handle_notification(notification, _element, _context, state) do
Membrane.Logger.warn("Unexpected notification: #{inspect(notification)}. Ignoring.")
Expand All @@ -66,7 +107,14 @@ if Enum.all?(
File.rm_rf(directory)
File.mkdir_p!(directory)

spec = hls_links_and_children(link_builder, track.encoding, track_id, track.stream_id)
spec =
hls_links_and_children(
link_builder,
track.encoding,
track_id,
track.stream_id,
state.framerate
)

{spec, state} =
if MapSet.member?(state.stream_ids, track.stream_id) do
Expand All @@ -93,7 +141,7 @@ if Enum.all?(
{{:ok, spec: spec}, state}
end

defp hls_links_and_children(link_builder, :OPUS, track_id, stream_id),
defp hls_links_and_children(link_builder, :OPUS, track_id, stream_id, _framerate),
do: %ParentSpec{
children: %{
{:opus_decoder, track_id} => Membrane.Opus.Decoder,
Expand All @@ -105,34 +153,34 @@ if Enum.all?(
|> to({:opus_decoder, track_id})
|> to({:aac_encoder, track_id})
|> to({:aac_parser, track_id})
|> via_in(Pad.ref(:input, :audio), options: [encoding: :AAC])
|> via_in(Pad.ref(:input, {:audio, track_id}), options: [encoding: :AAC])
|> to({:hls_sink_bin, stream_id})
]
}

defp hls_links_and_children(link_builder, :AAC, _track_id, stream_id),
defp hls_links_and_children(link_builder, :AAC, track_id, stream_id, _framerate),
do: %ParentSpec{
children: %{},
links: [
link_builder
|> via_in(Pad.ref(:input, :audio), options: [encoding: :AAC])
|> via_in(Pad.ref(:input, {:audio, track_id}), options: [encoding: :AAC])
|> to({:hls_sink_bin, stream_id})
]
}

defp hls_links_and_children(link_builder, :H264, track_id, stream_id),
defp hls_links_and_children(link_builder, :H264, track_id, stream_id, framerate),
do: %ParentSpec{
children: %{
{:video_parser, track_id} => %Membrane.H264.FFmpeg.Parser{
framerate: {30, 1},
framerate: {framerate, 1},
alignment: :au,
attach_nalus?: true
}
},
links: [
link_builder
|> to({:video_parser, track_id})
|> via_in(Pad.ref(:input, :video), options: [encoding: :H264])
|> via_in(Pad.ref(:input, {:video, track_id}), options: [encoding: :H264])
|> to({:hls_sink_bin, stream_id})
]
}
Expand Down
9 changes: 5 additions & 4 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,12 @@ defmodule Membrane.RTC.Engine.MixProject do
defp deps do
[
{:membrane_core, "~> 0.8.0"},
{:membrane_webrtc_plugin, "~> 0.2.0", optional: true},
{:membrane_webrtc_plugin,
github: "membraneframework/membrane_webrtc_plugin", optional: true},
{:membrane_http_adaptive_stream_plugin, "~> 0.4.0", optional: true},
{:membrane_mp4_plugin, "~> 0.9.0", optional: true},
{:membrane_aac_plugin, "~> 0.9.0", optional: true},
{:membrane_aac_format, "~> 0.5.0", optional: true},
{:membrane_mp4_plugin, "~> 0.10.0", optional: true, override: true},
{:membrane_aac_plugin, "~> 0.11.0", optional: true},
{:membrane_aac_format, "~> 0.6.0", optional: true},
{:membrane_aac_fdk_plugin, "~> 0.9.0", optional: true},
{:membrane_element_tee, "~> 0.6.0"},
{:membrane_element_fake, "~> 0.6.0"},
Expand Down
Loading