Skip to content
This repository was archived by the owner on Sep 19, 2024. It is now read-only.

Commit 629fc0a

Browse files
authored
Fix RC in removing peers/endpoints (#108)
1 parent 7311462 commit 629fc0a

File tree

1 file changed

+36
-57
lines changed

1 file changed

+36
-57
lines changed

lib/membrane_rtc_engine/engine.ex

+36-57
Original file line numberDiff line numberDiff line change
@@ -896,7 +896,7 @@ defmodule Membrane.RTC.Engine do
896896
defp do_handle_notification(
897897
{:publish, {:new_tracks, tracks}},
898898
{:endpoint, endpoint_id},
899-
ctx,
899+
_ctx,
900900
state
901901
) do
902902
id_to_track = Map.new(tracks, &{&1.id, &1})
@@ -908,14 +908,7 @@ defmodule Membrane.RTC.Engine do
908908
&Map.merge(&1, id_to_track)
909909
)
910910

911-
tracks_msgs =
912-
do_publish(
913-
state.endpoints,
914-
ctx,
915-
{:new_tracks, tracks},
916-
{:endpoint, endpoint_id},
917-
state
918-
)
911+
tracks_msgs = do_publish({:new_tracks, tracks}, {:endpoint, endpoint_id}, state)
919912

920913
endpoint = get_in(state, [:endpoints, endpoint_id])
921914
track_id_to_track_metadata = Endpoint.get_active_track_metadata(endpoint)
@@ -945,14 +938,7 @@ defmodule Membrane.RTC.Engine do
945938
&Map.merge(&1, id_to_track)
946939
)
947940

948-
tracks_msgs =
949-
do_publish(
950-
state.endpoints,
951-
ctx,
952-
{:remove_tracks, tracks},
953-
{:endpoint, endpoint_id},
954-
state
955-
)
941+
tracks_msgs = do_publish({:remove_tracks, tracks}, {:endpoint, endpoint_id}, state)
956942

957943
track_ids = Enum.map(tracks, & &1.id)
958944

@@ -1197,8 +1183,6 @@ defmodule Membrane.RTC.Engine do
11971183

11981184
{_peer, state} = pop_in(state, [:peers, peer_id])
11991185
{_status, actions, state} = do_remove_endpoint(peer_id, ctx, state)
1200-
{_waiting, state} = pop_in(state, [:waiting_for_linking, peer_id])
1201-
{_subscriptions, state} = pop_in(state, [:subscriptions, peer_id])
12021186
{:present, actions, state}
12031187
else
12041188
{:absent, [], state}
@@ -1208,16 +1192,11 @@ defmodule Membrane.RTC.Engine do
12081192
defp do_remove_endpoint(peer_id, ctx, state) do
12091193
if Map.has_key?(state.endpoints, peer_id) do
12101194
{endpoint, state} = pop_in(state, [:endpoints, peer_id])
1195+
{_waiting, state} = pop_in(state, [:waiting_for_linking, peer_id])
1196+
{_subscriptions, state} = pop_in(state, [:subscriptions, peer_id])
12111197
tracks = Enum.map(Endpoint.get_tracks(endpoint), &%Track{&1 | active?: true})
12121198

1213-
tracks_msgs =
1214-
do_publish(
1215-
state.endpoints,
1216-
ctx,
1217-
{:remove_tracks, tracks},
1218-
{:endpoint, peer_id},
1219-
state
1220-
)
1199+
tracks_msgs = do_publish({:remove_tracks, tracks}, {:endpoint, peer_id}, state)
12211200

12221201
endpoint_bin = ctx.children[{:endpoint, peer_id}]
12231202

@@ -1255,46 +1234,46 @@ defmodule Membrane.RTC.Engine do
12551234
|> Enum.filter(&Map.has_key?(ctx.children, &1))
12561235
end
12571236

1258-
defp do_publish(_endpoints, _ctx, {_, []} = _tracks, _endpoint_bin, _state), do: []
1237+
defp do_publish({_, []} = _tracks, _endpoint_bin, _state), do: []
12591238

1260-
defp do_publish(endpoints, ctx, {:new_tracks, _tracks} = msg, endpoint_bin_name, _state) do
1261-
flat_map_children(ctx, fn
1262-
{:endpoint, endpoint_id} = other_endpoint_bin ->
1263-
endpoint = Map.get(endpoints, endpoint_id)
1239+
defp do_publish({:new_tracks, _tracks} = msg, endpoint_bin_name, state) do
1240+
Enum.flat_map(state.endpoints, fn {endpoint_id, endpoint} ->
1241+
current_endpoint_bin_name = {:endpoint, endpoint_id}
12641242

1265-
if other_endpoint_bin != endpoint_bin_name and not is_nil(endpoint) do
1266-
[forward: {other_endpoint_bin, msg}]
1267-
else
1268-
[]
1269-
end
1270-
1271-
_child ->
1243+
if current_endpoint_bin_name != endpoint_bin_name and not is_nil(endpoint) do
1244+
[forward: {current_endpoint_bin_name, msg}]
1245+
else
12721246
[]
1247+
end
12731248
end)
12741249
end
12751250

1276-
defp do_publish(endpoints, ctx, {type, tracks}, endpoint_bin_name, state) do
1277-
flat_map_children(ctx, fn
1278-
{:endpoint, endpoint_id} = other_endpoint_bin ->
1279-
endpoint = Map.get(endpoints, endpoint_id)
1251+
defp do_publish({:remove_tracks, tracks}, endpoint_bin_name, state) do
1252+
Enum.flat_map(state.endpoints, fn {endpoint_id, endpoint} ->
1253+
current_endpoint_bin_name = {:endpoint, endpoint_id}
12801254

1281-
has_subscription_on_track = fn track_id ->
1282-
state.subscriptions
1283-
|> Map.fetch!(endpoint_id)
1284-
|> Map.has_key?(track_id)
1285-
end
1255+
has_subscription_on_track = fn track_id ->
1256+
state.subscriptions
1257+
|> Map.fetch!(endpoint_id)
1258+
|> Map.has_key?(track_id)
1259+
end
12861260

1287-
new_tracks = Enum.filter(tracks, &has_subscription_on_track.(&1.id))
1288-
msg = {type, new_tracks}
1261+
tracks_to_remove = Enum.filter(tracks, &has_subscription_on_track.(&1.id))
1262+
msg = {:remove_tracks, tracks_to_remove}
12891263

1290-
if other_endpoint_bin != endpoint_bin_name and not is_nil(endpoint) do
1291-
[forward: {other_endpoint_bin, msg}]
1292-
else
1293-
[]
1294-
end
1295-
1296-
_child ->
1264+
if current_endpoint_bin_name != endpoint_bin_name and not is_nil(endpoint) do
1265+
[forward: {current_endpoint_bin_name, msg}]
1266+
else
12971267
[]
1268+
end
12981269
end)
12991270
end
1271+
1272+
defp do_publish(msg, _endpoint_bin_name, _state) do
1273+
Membrane.Logger.warn(
1274+
"Requested unknown message type to be published by RTC Engine #{inspect(msg)}. Ignoring."
1275+
)
1276+
1277+
[]
1278+
end
13001279
end

0 commit comments

Comments
 (0)