@@ -165,10 +165,10 @@ defmodule Membrane.RTC.Engine do
165
165
Where `caps` are `t:Membrane.Caps.t/0` or `:any`.
166
166
167
167
* publish for some tracks using actions `t:publish_action_t/0` and subscribe for some tracks using
168
- function `#{ inspect ( __MODULE__ ) } .subscribe/3 `. The first will cause RTC Engine to send a message in
168
+ function `#{ inspect ( __MODULE__ ) } .subscribe/5 `. The first will cause RTC Engine to send a message in
169
169
form of `{:new_tracks, tracks}` where `tracks` is a list of `t:#{ inspect ( __MODULE__ ) } .Track.t/0` to all other Endpoints.
170
170
When an Endpoint receives such a message it can subscribe for new tracks by
171
- using `#{ inspect ( __MODULE__ ) } .subscribe/3 ` function. An Endpoint will be notified about track readiness
171
+ using `#{ inspect ( __MODULE__ ) } .subscribe/5 ` function. An Endpoint will be notified about track readiness
172
172
it subscribed for in `c:Membrane.Bin.handle_pad_added/3` callback. An example implementation of `handle_pad_added`
173
173
callback can look like this
174
174
@@ -240,6 +240,14 @@ defmodule Membrane.RTC.Engine do
240
240
node: node ( )
241
241
]
242
242
243
+ @ typedoc """
244
+ Subscription options.
245
+
246
+ * `default_simulcast_encoding` - initial encoding that
247
+ endpoint making subscription wants to receive
248
+ """
249
+ @ type subscription_opts_t ( ) :: [ default_simulcast_encoding: String . t ( ) ]
250
+
243
251
@ typedoc """
244
252
Membrane action that will cause RTC Engine to publish some message to all other endpoints.
245
253
"""
@@ -406,8 +414,6 @@ defmodule Membrane.RTC.Engine do
406
414
:ok
407
415
end
408
416
409
- @ type subscription_opts_t ( ) :: [ default_simulcast_encoding: String . t ( ) ]
410
-
411
417
@ doc """
412
418
Subscribes endpoint for tracks.
413
419
@@ -588,14 +594,12 @@ defmodule Membrane.RTC.Engine do
588
594
endpoint_id: endpoint_id ,
589
595
track_id: track_id ,
590
596
format: format ,
591
- opts: opts ,
592
- status: :created
597
+ opts: opts
593
598
}
594
599
595
600
case check_subscription ( subscription , state ) do
596
601
:ok ->
597
602
{ links , state } = try_fulfill_subscription ( subscription , ctx , state )
598
-
599
603
parent_spec = % ParentSpec { links: links , log_metadata: [ rtc: state . id ] }
600
604
send ( endpoint_pid , { ref , :ok } )
601
605
{ { :ok , [ spec: parent_spec ] } , state }
@@ -639,34 +643,6 @@ defmodule Membrane.RTC.Engine do
639
643
{ :ok , state }
640
644
end
641
645
642
- defp check_subscription ( subscription , state ) do
643
- # checks whether subscription is proper
644
- track =
645
- state . endpoints
646
- |> Map . values ( )
647
- |> Enum . flat_map ( & Endpoint . get_tracks / 1 )
648
- |> Map . new ( & { & 1 . id , & 1 } )
649
- |> Map . get ( subscription . track_id )
650
-
651
- default_simulcast_encoding = subscription . opts [ :default_simulcast_encoding ]
652
-
653
- cond do
654
- track == nil ->
655
- { :error , :invalid_track_id }
656
-
657
- subscription . format not in track . format ->
658
- { :error , :invalid_format }
659
-
660
- # TODO maybe simulcast_encodings should be always a list
661
- default_simulcast_encoding != nil and track . simulcast_encodings != nil and
662
- default_simulcast_encoding not in track . simulcast_encodings ->
663
- { :error , :invalid_default_simulcast_encoding }
664
-
665
- true ->
666
- :ok
667
- end
668
- end
669
-
670
646
defp handle_media_event ( % { type: :join , data: data } , peer_id , _ctx , state ) do
671
647
peer = Peer . new ( peer_id , data . metadata || % { } )
672
648
dispatch ( % Message.NewPeer { rtc_engine: self ( ) , peer: peer } )
@@ -995,45 +971,61 @@ defmodule Membrane.RTC.Engine do
995
971
{ endpoint_to_tee_links ++ links , state }
996
972
end
997
973
974
+ defp check_subscription ( subscription , state ) do
975
+ # checks whether subscription is correct
976
+ track = get_track ( subscription . track_id , state . endpoints )
977
+ default_simulcast_encoding = subscription . opts [ :default_simulcast_encoding ]
978
+
979
+ cond do
980
+ track == nil ->
981
+ { :error , :invalid_track_id }
982
+
983
+ subscription . format not in track . format ->
984
+ { :error , :invalid_format }
985
+
986
+ # TODO maybe simulcast_encodings should be always a list
987
+ default_simulcast_encoding != nil and track . simulcast_encodings != nil and
988
+ default_simulcast_encoding not in track . simulcast_encodings ->
989
+ { :error , :invalid_default_simulcast_encoding }
990
+
991
+ true ->
992
+ :ok
993
+ end
994
+ end
995
+
998
996
defp try_fulfill_subscription ( subscription , ctx , state ) do
997
+ # if tee for this track is already spawned, fulfill subscription
998
+ # otherwise, save subscription as pending, we will fulfill it
999
+ # when tee appears
999
1000
if Map . has_key? ( ctx . children , { :tee , subscription . track_id } ) do
1000
1001
fulfill_subscription ( subscription , ctx , state )
1001
1002
else
1002
- subscription = % Subscription { subscription | status: :pending }
1003
1003
state = update_in ( state , [ :pending_subscriptions ] , & [ subscription | & 1 ] )
1004
1004
{ [ ] , state }
1005
1005
end
1006
1006
end
1007
1007
1008
1008
defp fulfill_subscription ( % Subscription { format: :raw } = s , ctx , state ) do
1009
- links =
1009
+ raw_format_links =
1010
1010
if Map . has_key? ( ctx . children , { :raw_format_tee , s . track_id } ) do
1011
1011
[ ]
1012
1012
else
1013
1013
prepare_raw_format_links ( s . track_id , state )
1014
- end ++
1015
- prepare_track_to_endpoint_links ( s , :raw_format_tee , state )
1014
+ end
1016
1015
1017
- state =
1018
- update_in (
1019
- state ,
1020
- [ :subscriptions , s . endpoint_id ] ,
1021
- & Map . put ( & 1 , s . track_id , % Subscription { s | status: :active } )
1022
- )
1016
+ { links , state } = do_fulfill_subscription ( s , :raw_format_tee , state )
1023
1017
1024
- { links , state }
1018
+ { raw_format_links ++ links , state }
1025
1019
end
1026
1020
1027
- defp fulfill_subscription ( s , _ctx , state ) do
1028
- links = prepare_track_to_endpoint_links ( s , :tee , state )
1029
-
1030
- state =
1031
- update_in (
1032
- state ,
1033
- [ :subscriptions , s . endpoint_id ] ,
1034
- & Map . put ( & 1 , s . track_id , % Subscription { s | status: :active } )
1035
- )
1021
+ defp fulfill_subscription ( % Subscription { format: _remote_format } = s , _ctx , state ) do
1022
+ do_fulfill_subscription ( s , :tee , state )
1023
+ end
1036
1024
1025
+ defp do_fulfill_subscription ( s , tee_kind , state ) do
1026
+ links = prepare_track_to_endpoint_links ( s , tee_kind , state )
1027
+ s = % Subscription { s | status: :active }
1028
+ state = update_in ( state , [ :subscriptions , s . endpoint_id ] , & Map . put ( & 1 , s . track_id , s ) )
1037
1029
{ links , state }
1038
1030
end
1039
1031
@@ -1046,12 +1038,9 @@ defmodule Membrane.RTC.Engine do
1046
1038
end
1047
1039
1048
1040
defp prepare_track_to_endpoint_links ( subscription , :tee , state ) do
1049
- track =
1050
- state . endpoints
1051
- |> Map . values ( )
1052
- |> Enum . flat_map ( & Endpoint . get_tracks / 1 )
1053
- |> Map . new ( & { & 1 . id , & 1 } )
1054
- |> Map . get ( subscription . track_id )
1041
+ # if someone subscribed for simulcast track, prepare options
1042
+ # for SimulcastTee
1043
+ track = get_track ( subscription . track_id , state . endpoints )
1055
1044
1056
1045
options =
1057
1046
if track . simulcast_encodings != nil do
@@ -1142,6 +1131,14 @@ defmodule Membrane.RTC.Engine do
1142
1131
defp get_outbound_tracks ( endpoints ) ,
1143
1132
do: Enum . flat_map ( endpoints , fn { _id , endpoint } -> Endpoint . get_tracks ( endpoint ) end )
1144
1133
1134
+ defp get_track ( track_id , endpoints ) do
1135
+ endpoints
1136
+ |> Map . values ( )
1137
+ |> Enum . flat_map ( & Endpoint . get_tracks / 1 )
1138
+ |> Map . new ( & { & 1 . id , & 1 } )
1139
+ |> Map . get ( track_id )
1140
+ end
1141
+
1145
1142
defp handle_remove_peer ( peer_id , reason , ctx , state ) do
1146
1143
case do_remove_peer ( peer_id , reason , ctx , state ) do
1147
1144
{ :absent , [ ] , state } ->
@@ -1175,28 +1172,27 @@ defmodule Membrane.RTC.Engine do
1175
1172
end
1176
1173
end
1177
1174
1178
- # TODO `peer_id` -> `endpoint_id`
1179
- defp do_remove_endpoint ( peer_id , ctx , state ) do
1180
- if Map . has_key? ( state . endpoints , peer_id ) do
1181
- { endpoint , state } = pop_in ( state , [ :endpoints , peer_id ] )
1175
+ defp do_remove_endpoint ( endpoint_id , ctx , state ) do
1176
+ if Map . has_key? ( state . endpoints , endpoint_id ) do
1177
+ { endpoint , state } = pop_in ( state , [ : endpoints, endpoint_id ] )
1178
+ { _subscriptions , state } = pop_in ( state , [ :subscriptions , endpoint_id ] )
1182
1179
1183
1180
state =
1184
- update_in ( state , [ :waiting_subscriptions , peer_id ] , fn subscriptions ->
1185
- Enum . filter ( subscriptions , fn s -> s . endpoint_id != peer_id end )
1181
+ update_in ( state , [ :pending_subscriptions , endpoint_id ] , fn subscriptions ->
1182
+ Enum . filter ( subscriptions , fn s -> s . endpoint_id != endpoint_id end )
1186
1183
end )
1187
1184
1188
- { _subscriptions , state } = pop_in ( state , [ :subscriptions , peer_id ] )
1189
1185
tracks = Enum . map ( Endpoint . get_tracks ( endpoint ) , & % Track { & 1 | active?: true } )
1190
1186
1191
- tracks_msgs = do_publish ( { :remove_tracks , tracks } , { :endpoint , peer_id } , state )
1187
+ tracks_msgs = do_publish ( { :remove_tracks , tracks } , { :endpoint , endpoint_id } , state )
1192
1188
1193
- endpoint_bin = ctx . children [ { :endpoint , peer_id } ]
1189
+ endpoint_bin = ctx . children [ { :endpoint , endpoint_id } ]
1194
1190
1195
1191
actions =
1196
1192
if endpoint_bin == nil or endpoint_bin . terminating? do
1197
1193
[ ]
1198
1194
else
1199
- [ remove_child: find_children_for_endpoint ( endpoint , peer_id , ctx ) ]
1195
+ [ remove_child: find_children_for_endpoint ( endpoint , endpoint_id , ctx ) ]
1200
1196
end
1201
1197
1202
1198
{ :present , tracks_msgs ++ actions , state }
@@ -1205,13 +1201,13 @@ defmodule Membrane.RTC.Engine do
1205
1201
end
1206
1202
end
1207
1203
1208
- defp find_children_for_endpoint ( endpoint , peer_id , ctx ) do
1204
+ defp find_children_for_endpoint ( endpoint , endpoint_id , ctx ) do
1209
1205
children =
1210
1206
endpoint
1211
1207
|> Endpoint . get_tracks ( )
1212
- |> Enum . flat_map ( fn track -> get_track_elements ( peer_id , track . id , ctx ) end )
1208
+ |> Enum . flat_map ( fn track -> get_track_elements ( endpoint_id , track . id , ctx ) end )
1213
1209
1214
- [ endpoint: peer_id ] ++ children
1210
+ [ endpoint: endpoint_id ] ++ children
1215
1211
end
1216
1212
1217
1213
defp get_track_elements ( endpoint_id , track_id , ctx ) do
0 commit comments