Skip to content

Commit

Permalink
Merge pull request #20 from fishjam-dev/refactor-recording-synchroniz…
Browse files Browse the repository at this point in the history
…ation

Refactor rtcp synchronization
  • Loading branch information
Karolk99 authored Jun 6, 2024
2 parents 3cba999 + e9a3c3e commit a17b991
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 7 deletions.
3 changes: 3 additions & 0 deletions lib/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ defmodule RecordingConverter.Pipeline do
depayloader: Membrane.RTP.H264.Depayloader,
clock_rate: track["clock_rate"]
})
|> child({:rtcp_sync, track.id}, %RecordingConverter.RTCPSynchronizer{
clock_rate: track["clock_rate"]
})
|> child({:input_parser, track.id}, %Membrane.H264.Parser{
output_alignment: :nalu,
output_stream_structure: :annexb
Expand Down
44 changes: 44 additions & 0 deletions lib/report_parser.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ defmodule RecordingConverter.ReportParser do
calculate_duration_in_ns(track) < Compositor.avatar_threshold_ns()
end)
|> Enum.map(fn {key, value} -> Map.put(value, :id, key) end)
|> recalculate_offsets()
end

@spec get_all_track_actions(tracks :: list()) :: list()
Expand Down Expand Up @@ -153,4 +154,47 @@ defmodule RecordingConverter.ReportParser do
nil
end
end

# Not every track will have a `start_timestamp_wallclock` value since this requires an RTCP sender packet.
# For this reason, the algorithm does not override track offsets lacking a `start_timestamp_wallclock`.
# However, for tracks that do come with a `start_timestamp_wallclock` value
# the algorithm recalculates the offset using the following formula:
# new_offset = ft.offset + (ct.start_timestamp_wallclock - ft.start_timstamp_wallclock)
# where:
# * ft - first track that have `start_timestamp_wallclock` value set
# * ct - current track for wchich we calculate new offset
defp recalculate_offsets(tracks) do
{tracks, _acc} =
tracks
|> Enum.sort_by(fn track -> track["offset"] end)
|> Enum.map_reduce(nil, fn track, acc ->
cond do
not Map.has_key?(track, "start_timestamp_wallclock") ->
{track, acc}

is_nil(acc) ->
{track, track}

true ->
offset =
acc["offset"] + track["start_timestamp_wallclock"] -
acc["start_timestamp_wallclock"]

{%{track | "offset" => trunc(offset)}, acc}
end
end)

%{"offset" => first_offset} =
Enum.min_by(tracks, fn track -> track["offset"] end, fn -> %{"offset" => 0} end)

if first_offset > 0,
do:
raise("The lower track offset is #{first_offset}, this offset cannot be greater than 0.")

# After RTCP synchronization, tracks can switch places.
# For example, a track that was second before synchronization can now be first.
# In this case, it will have a negative offset and we will need to correct it to 0.
# We also need to correct all other offsets to maintain the correct offsets between tracks.
Enum.map(tracks, fn track -> Map.update!(track, "offset", &(&1 - first_offset)) end)
end
end
78 changes: 78 additions & 0 deletions lib/rtcp_synchronizer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
defmodule RecordingConverter.RTCPSynchronizer do
@moduledoc false

use Membrane.Filter

alias Membrane.Buffer
alias Membrane.RTCP.SenderReportPacket
alias Membrane.RTCPEvent

@sec_to_ns 10 ** 9

def_input_pad :input, accepted_format: Membrane.H264

def_output_pad :output, accepted_format: Membrane.H264

def_options clock_rate: [
spec: pos_integer()
]

@impl true
def handle_init(_ctx, %{clock_rate: clock_rate}) do
{[], %{clock_rate: clock_rate, queue: :queue.new(), first_sender_info: nil, offset: 0}}
end

@impl true
def handle_event(
_pad,
%RTCPEvent{rtcp: %SenderReportPacket{sender_info: sender_info}},
_ctx,
%{first_sender_info: nil, queue: queue} = state
) do
queue = :queue.in({sender_info.rtp_timestamp, 0}, queue)
{[], %{state | first_sender_info: sender_info, queue: queue}}
end

@impl true
def handle_event(
_pad,
%RTCPEvent{rtcp: %SenderReportPacket{sender_info: sender_info}},
_ctx,
%{clock_rate: clock_rate, queue: queue} = state
) do
offset = timestamp(sender_info, clock_rate) - timestamp(state.first_sender_info, clock_rate)
queue = :queue.in({sender_info.rtp_timestamp, offset}, queue)

{[], %{state | queue: queue}}
end

@impl true
def handle_event(pad, event, context, state) do
super(pad, event, context, state)
end

@impl true
def handle_buffer(
_pad,
%Buffer{} = buffer,
_ctx,
%{queue: queue, offset: offset} = state
) do
{offset, queue} =
if :queue.is_empty(queue),
do: {offset, queue},
else: maybe_update_offset(queue, buffer.metadata.rtp.timestamp, offset)

buffer = Map.update!(buffer, :pts, &(&1 + offset))
{[buffer: {:output, buffer}], %{state | queue: queue, offset: offset}}
end

defp maybe_update_offset(queue, timestamp, offset) do
{{:value, {next_offset_timestamp, next_offset}}, updated_queue} = :queue.out(queue)
if next_offset_timestamp < timestamp, do: {next_offset, updated_queue}, else: {offset, queue}
end

defp timestamp(%{rtp_timestamp: timestamp, wallclock_timestamp: wallclock}, clock_rate) do
trunc(wallclock - timestamp / clock_rate * @sec_to_ns)
end
end
Loading

0 comments on commit a17b991

Please sign in to comment.