Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor rtcp synchronization #20

Merged
merged 1 commit into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions lib/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ defmodule RecordingConverter.Pipeline do
depayloader: Membrane.RTP.H264.Depayloader,
clock_rate: track["clock_rate"]
})
|> child({:rtcp_sync, track.id}, %RecordingConverter.RTCPSynchronizer{
clock_rate: track["clock_rate"]
})
|> child({:input_parser, track.id}, %Membrane.H264.Parser{
output_alignment: :nalu,
output_stream_structure: :annexb
Expand Down
44 changes: 44 additions & 0 deletions lib/report_parser.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ defmodule RecordingConverter.ReportParser do
calculate_duration_in_ns(track) < Compositor.avatar_threshold_ns()
end)
|> Enum.map(fn {key, value} -> Map.put(value, :id, key) end)
|> recalculate_offsets()
end

@spec get_all_track_actions(tracks :: list()) :: list()
Expand Down Expand Up @@ -153,4 +154,47 @@ defmodule RecordingConverter.ReportParser do
nil
end
end

# Not every track will have a `start_timestamp_wallclock` value since this requires an RTCP sender packet.
# For this reason, the algorithm does not override track offsets lacking a `start_timestamp_wallclock`.
# However, for tracks that do come with a `start_timestamp_wallclock` value
# the algorithm recalculates the offset using the following formula:
# new_offset = ft.offset + (ct.start_timestamp_wallclock - ft.start_timstamp_wallclock)
# where:
# * ft - first track that have `start_timestamp_wallclock` value set
# * ct - current track for wchich we calculate new offset
defp recalculate_offsets(tracks) do
{tracks, _acc} =
tracks
|> Enum.sort_by(fn track -> track["offset"] end)
|> Enum.map_reduce(nil, fn track, acc ->
cond do
not Map.has_key?(track, "start_timestamp_wallclock") ->
{track, acc}

is_nil(acc) ->
{track, track}

true ->
offset =
acc["offset"] + track["start_timestamp_wallclock"] -
acc["start_timestamp_wallclock"]

{%{track | "offset" => trunc(offset)}, acc}
end
end)

%{"offset" => first_offset} =
Enum.min_by(tracks, fn track -> track["offset"] end, fn -> %{"offset" => 0} end)

if first_offset > 0,
do:
raise("The lower track offset is #{first_offset}, this offset cannot be greater than 0.")

# After RTCP synchronization, tracks can switch places.
# For example, a track that was second before synchronization can now be first.
# In this case, it will have a negative offset and we will need to correct it to 0.
# We also need to correct all other offsets to maintain the correct offsets between tracks.
Enum.map(tracks, fn track -> Map.update!(track, "offset", &(&1 - first_offset)) end)
end
end
78 changes: 78 additions & 0 deletions lib/rtcp_synchronizer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
defmodule RecordingConverter.RTCPSynchronizer do
@moduledoc false

use Membrane.Filter

alias Membrane.Buffer
alias Membrane.RTCP.SenderReportPacket
alias Membrane.RTCPEvent

@sec_to_ns 10 ** 9

def_input_pad :input, accepted_format: Membrane.H264

def_output_pad :output, accepted_format: Membrane.H264

def_options clock_rate: [
spec: pos_integer()
]

@impl true
def handle_init(_ctx, %{clock_rate: clock_rate}) do
{[], %{clock_rate: clock_rate, queue: :queue.new(), first_sender_info: nil, offset: 0}}
end

@impl true
def handle_event(
_pad,
%RTCPEvent{rtcp: %SenderReportPacket{sender_info: sender_info}},
_ctx,
%{first_sender_info: nil, queue: queue} = state
) do
queue = :queue.in({sender_info.rtp_timestamp, 0}, queue)
{[], %{state | first_sender_info: sender_info, queue: queue}}
end

@impl true
def handle_event(
_pad,
%RTCPEvent{rtcp: %SenderReportPacket{sender_info: sender_info}},
_ctx,
%{clock_rate: clock_rate, queue: queue} = state
) do
offset = timestamp(sender_info, clock_rate) - timestamp(state.first_sender_info, clock_rate)
queue = :queue.in({sender_info.rtp_timestamp, offset}, queue)

{[], %{state | queue: queue}}
end

@impl true
def handle_event(pad, event, context, state) do
super(pad, event, context, state)
end

@impl true
def handle_buffer(
_pad,
%Buffer{} = buffer,
_ctx,
%{queue: queue, offset: offset} = state
) do
{offset, queue} =
if :queue.is_empty(queue),
do: {offset, queue},
else: maybe_update_offset(queue, buffer.metadata.rtp.timestamp, offset)

buffer = Map.update!(buffer, :pts, &(&1 + offset))
{[buffer: {:output, buffer}], %{state | queue: queue, offset: offset}}
end

defp maybe_update_offset(queue, timestamp, offset) do
{{:value, {next_offset_timestamp, next_offset}}, updated_queue} = :queue.out(queue)
if next_offset_timestamp < timestamp, do: {next_offset, updated_queue}, else: {offset, queue}
end

defp timestamp(%{rtp_timestamp: timestamp, wallclock_timestamp: wallclock}, clock_rate) do
trunc(wallclock - timestamp / clock_rate * @sec_to_ns)
end
end
Loading