@@ -165,10 +165,10 @@ defmodule Membrane.RTC.Engine do
165
165
Where `caps` are `t:Membrane.Caps.t/0` or `:any`.
166
166
167
167
* publish for some tracks using actions `t:publish_action_t/0` and subscribe for some tracks using
168
- function `#{ inspect ( __MODULE__ ) } .subscribe/3 `. The first will cause RTC Engine to send a message in
168
+ function `#{ inspect ( __MODULE__ ) } .subscribe/5 `. The first will cause RTC Engine to send a message in
169
169
form of `{:new_tracks, tracks}` where `tracks` is a list of `t:#{ inspect ( __MODULE__ ) } .Track.t/0` to all other Endpoints.
170
170
When an Endpoint receives such a message it can subscribe for new tracks by
171
- using `#{ inspect ( __MODULE__ ) } .subscribe/3 ` function. An Endpoint will be notified about track readiness
171
+ using `#{ inspect ( __MODULE__ ) } .subscribe/5 ` function. An Endpoint will be notified about track readiness
172
172
it subscribed for in `c:Membrane.Bin.handle_pad_added/3` callback. An example implementation of `handle_pad_added`
173
173
callback can look like this
174
174
@@ -240,6 +240,14 @@ defmodule Membrane.RTC.Engine do
240
240
node: node ( )
241
241
]
242
242
243
+ @ typedoc """
244
+ Subscription options.
245
+
246
+ * `default_simulcast_encoding` - initial encoding that
247
+ endpoint making subscription wants to receive
248
+ """
249
+ @ type subscription_opts_t ( ) :: [ default_simulcast_encoding: String . t ( ) ]
250
+
243
251
@ typedoc """
244
252
Membrane action that will cause RTC Engine to publish some message to all other endpoints.
245
253
"""
@@ -406,8 +414,6 @@ defmodule Membrane.RTC.Engine do
406
414
:ok
407
415
end
408
416
409
- @ type subscription_opts_t ( ) :: [ default_simulcast_encoding: String . t ( ) ]
410
-
411
417
@ doc """
412
418
Subscribes endpoint for tracks.
413
419
@@ -588,14 +594,12 @@ defmodule Membrane.RTC.Engine do
588
594
endpoint_id: endpoint_id ,
589
595
track_id: track_id ,
590
596
format: format ,
591
- opts: opts ,
592
- status: :created
597
+ opts: opts
593
598
}
594
599
595
600
case check_subscription ( subscription , state ) do
596
601
:ok ->
597
602
{ links , state } = try_fulfill_subscription ( subscription , ctx , state )
598
-
599
603
parent_spec = % ParentSpec { links: links , log_metadata: [ rtc: state . id ] }
600
604
send ( endpoint_pid , { ref , :ok } )
601
605
{ { :ok , [ spec: parent_spec ] } , state }
@@ -639,34 +643,6 @@ defmodule Membrane.RTC.Engine do
639
643
{ :ok , state }
640
644
end
641
645
642
- defp check_subscription ( subscription , state ) do
643
- # checks whether subscription is proper
644
- track =
645
- state . endpoints
646
- |> Map . values ( )
647
- |> Enum . flat_map ( & Endpoint . get_tracks / 1 )
648
- |> Map . new ( & { & 1 . id , & 1 } )
649
- |> Map . get ( subscription . track_id )
650
-
651
- default_simulcast_encoding = subscription . opts [ :default_simulcast_encoding ]
652
-
653
- cond do
654
- track == nil ->
655
- { :error , :invalid_track_id }
656
-
657
- subscription . format not in track . format ->
658
- { :error , :invalid_format }
659
-
660
- # TODO maybe simulcast_encodings should be always a list
661
- default_simulcast_encoding != nil and track . simulcast_encodings != nil and
662
- default_simulcast_encoding not in track . simulcast_encodings ->
663
- { :error , :invalid_default_simulcast_encoding }
664
-
665
- true ->
666
- :ok
667
- end
668
- end
669
-
670
646
defp handle_media_event ( % { type: :join , data: data } , peer_id , _ctx , state ) do
671
647
peer = Peer . new ( peer_id , data . metadata || % { } )
672
648
dispatch ( % Message.NewPeer { rtc_engine: self ( ) , peer: peer } )
@@ -995,45 +971,67 @@ defmodule Membrane.RTC.Engine do
995
971
{ endpoint_to_tee_links ++ links , state }
996
972
end
997
973
974
+ defp check_subscription ( subscription , state ) do
975
+ # checks whether subscription is correct
976
+ track =
977
+ state . endpoints
978
+ |> Map . values ( )
979
+ |> Enum . flat_map ( & Endpoint . get_tracks / 1 )
980
+ |> Map . new ( & { & 1 . id , & 1 } )
981
+ |> Map . get ( subscription . track_id )
982
+
983
+ default_simulcast_encoding = subscription . opts [ :default_simulcast_encoding ]
984
+
985
+ cond do
986
+ track == nil ->
987
+ { :error , :invalid_track_id }
988
+
989
+ subscription . format not in track . format ->
990
+ { :error , :invalid_format }
991
+
992
+ # TODO maybe simulcast_encodings should be always a list
993
+ default_simulcast_encoding != nil and track . simulcast_encodings != nil and
994
+ default_simulcast_encoding not in track . simulcast_encodings ->
995
+ { :error , :invalid_default_simulcast_encoding }
996
+
997
+ true ->
998
+ :ok
999
+ end
1000
+ end
1001
+
998
1002
defp try_fulfill_subscription ( subscription , ctx , state ) do
1003
+ # if tee for this track is already spawned, fulfill subscription
1004
+ # otherwise, save subscription as pending, we will fulfill it
1005
+ # when tee appears
999
1006
if Map . has_key? ( ctx . children , { :tee , subscription . track_id } ) do
1000
1007
fulfill_subscription ( subscription , ctx , state )
1001
1008
else
1002
- subscription = % Subscription { subscription | status: :pending }
1003
1009
state = update_in ( state , [ :pending_subscriptions ] , & [ subscription | & 1 ] )
1004
1010
{ [ ] , state }
1005
1011
end
1006
1012
end
1007
1013
1008
1014
defp fulfill_subscription ( % Subscription { format: :raw } = s , ctx , state ) do
1009
- links =
1015
+ raw_format_links =
1010
1016
if Map . has_key? ( ctx . children , { :raw_format_tee , s . track_id } ) do
1011
1017
[ ]
1012
1018
else
1013
1019
prepare_raw_format_links ( s . track_id , state )
1014
- end ++
1015
- prepare_track_to_endpoint_links ( s , :raw_format_tee , state )
1020
+ end
1016
1021
1017
- state =
1018
- update_in (
1019
- state ,
1020
- [ :subscriptions , s . endpoint_id ] ,
1021
- & Map . put ( & 1 , s . track_id , % Subscription { s | status: :active } )
1022
- )
1022
+ { links , state } = do_fulfill_subscription ( s , :raw_format_tee , state )
1023
1023
1024
- { links , state }
1024
+ { raw_format_links ++ links , state }
1025
1025
end
1026
1026
1027
- defp fulfill_subscription ( s , _ctx , state ) do
1028
- links = prepare_track_to_endpoint_links ( s , :tee , state )
1029
-
1030
- state =
1031
- update_in (
1032
- state ,
1033
- [ :subscriptions , s . endpoint_id ] ,
1034
- & Map . put ( & 1 , s . track_id , % Subscription { s | status: :active } )
1035
- )
1027
+ defp fulfill_subscription ( % Subscription { format: _remote_format } = s , _ctx , state ) do
1028
+ do_fulfill_subscription ( s , :tee , state )
1029
+ end
1036
1030
1031
+ defp do_fulfill_subscription ( s , tee_kind , state ) do
1032
+ links = prepare_track_to_endpoint_links ( s , tee_kind , state )
1033
+ s = % Subscription { s | status: :active }
1034
+ state = update_in ( state , [ :subscriptions , s . endpoint_id ] , & Map . put ( & 1 , s . track_id , s ) )
1037
1035
{ links , state }
1038
1036
end
1039
1037
@@ -1046,6 +1044,8 @@ defmodule Membrane.RTC.Engine do
1046
1044
end
1047
1045
1048
1046
defp prepare_track_to_endpoint_links ( subscription , :tee , state ) do
1047
+ # if someone subscribed for simulcast track, prepare options
1048
+ # for SimulcastTee
1049
1049
track =
1050
1050
state . endpoints
1051
1051
|> Map . values ( )
@@ -1175,28 +1175,27 @@ defmodule Membrane.RTC.Engine do
1175
1175
end
1176
1176
end
1177
1177
1178
- # TODO `peer_id` -> `endpoint_id`
1179
- defp do_remove_endpoint ( peer_id , ctx , state ) do
1180
- if Map . has_key? ( state . endpoints , peer_id ) do
1181
- { endpoint , state } = pop_in ( state , [ :endpoints , peer_id ] )
1178
+ defp do_remove_endpoint ( endpoint_id , ctx , state ) do
1179
+ if Map . has_key? ( state . endpoints , endpoint_id ) do
1180
+ { endpoint , state } = pop_in ( state , [ : endpoints, endpoint_id ] )
1181
+ { _subscriptions , state } = pop_in ( state , [ :subscriptions , endpoint_id ] )
1182
1182
1183
1183
state =
1184
- update_in ( state , [ :waiting_subscriptions , peer_id ] , fn subscriptions ->
1185
- Enum . filter ( subscriptions , fn s -> s . endpoint_id != peer_id end )
1184
+ update_in ( state , [ :pending_subscriptions , endpoint_id ] , fn subscriptions ->
1185
+ Enum . filter ( subscriptions , fn s -> s . endpoint_id != endpoint_id end )
1186
1186
end )
1187
1187
1188
- { _subscriptions , state } = pop_in ( state , [ :subscriptions , peer_id ] )
1189
1188
tracks = Enum . map ( Endpoint . get_tracks ( endpoint ) , & % Track { & 1 | active?: true } )
1190
1189
1191
- tracks_msgs = do_publish ( { :remove_tracks , tracks } , { :endpoint , peer_id } , state )
1190
+ tracks_msgs = do_publish ( { :remove_tracks , tracks } , { :endpoint , endpoint_id } , state )
1192
1191
1193
- endpoint_bin = ctx . children [ { :endpoint , peer_id } ]
1192
+ endpoint_bin = ctx . children [ { :endpoint , endpoint_id } ]
1194
1193
1195
1194
actions =
1196
1195
if endpoint_bin == nil or endpoint_bin . terminating? do
1197
1196
[ ]
1198
1197
else
1199
- [ remove_child: find_children_for_endpoint ( endpoint , peer_id , ctx ) ]
1198
+ [ remove_child: find_children_for_endpoint ( endpoint , endpoint_id , ctx ) ]
1200
1199
end
1201
1200
1202
1201
{ :present , tracks_msgs ++ actions , state }
@@ -1205,13 +1204,13 @@ defmodule Membrane.RTC.Engine do
1205
1204
end
1206
1205
end
1207
1206
1208
- defp find_children_for_endpoint ( endpoint , peer_id , ctx ) do
1207
+ defp find_children_for_endpoint ( endpoint , endpoint_id , ctx ) do
1209
1208
children =
1210
1209
endpoint
1211
1210
|> Endpoint . get_tracks ( )
1212
- |> Enum . flat_map ( fn track -> get_track_elements ( peer_id , track . id , ctx ) end )
1211
+ |> Enum . flat_map ( fn track -> get_track_elements ( endpoint_id , track . id , ctx ) end )
1213
1212
1214
- [ endpoint: peer_id ] ++ children
1213
+ [ endpoint: endpoint_id ] ++ children
1215
1214
end
1216
1215
1217
1216
defp get_track_elements ( endpoint_id , track_id , ctx ) do
0 commit comments