-
-
Notifications
You must be signed in to change notification settings - Fork 51
/
__main__.py
930 lines (823 loc) · 44.5 KB
/
__main__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
#
# This file incorporates work covered by the following copyright and
# permission notice:
#
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import asyncio
import http.client
import json
import logging
import os
import signal
import socket
import sys
import time
import urllib.parse
import traceback
logger = logging.getLogger("main")
logger.setLevel(logging.INFO)
from watchdog.observers import Observer
from watchdog.events import FileClosedEvent, FileSystemEventHandler
from webrtc_input import WebRTCInput
from webrtc_signalling import WebRTCSignalling, WebRTCSignallingErrorNoPeer
from gstwebrtc_app import GSTWebRTCApp
from gpu_monitor import GPUMonitor
from system_monitor import SystemMonitor
from metrics import Metrics
from resize import resize_display, get_new_res, set_dpi, set_cursor_size
from signalling_web import WebRTCSimpleServer, generate_rtc_config
DEFAULT_RTC_CONFIG = """{
"lifetimeDuration": "86400s",
"iceServers": [
{
"urls": [
"stun:stun.l.google.com:19302"
]
}
],
"blockStatus": "NOT_BLOCKED",
"iceTransportPolicy": "all"
}"""
class HMACRTCMonitor:
def __init__(self, turn_host, turn_port, turn_shared_secret, turn_username, turn_protocol='udp', turn_tls=False, stun_host=None, stun_port=None, period=60, enabled=True):
self.turn_host = turn_host
self.turn_port = turn_port
self.turn_username = turn_username
self.turn_shared_secret = turn_shared_secret
self.turn_protocol = turn_protocol
self.turn_tls = turn_tls
self.stun_host = stun_host
self.stun_port = stun_port
self.period = period
self.enabled = enabled
self.running = False
self.on_rtc_config = lambda stun_servers, turn_servers, rtc_config: logger.warning("unhandled on_rtc_config")
def start(self):
if self.enabled:
self.running = True
while self.running:
if self.enabled and int(time.time()) % self.period == 0:
try:
hmac_data = generate_rtc_config(self.turn_host, self.turn_port, self.turn_shared_secret, self.turn_username, self.turn_protocol, self.turn_tls, self.stun_host, self.stun_port)
stun_servers, turn_servers, rtc_config = parse_rtc_config(hmac_data)
self.on_rtc_config(stun_servers, turn_servers, rtc_config)
except Exception as e:
logger.warning("could not fetch TURN HMAC config in periodic monitor: {}".format(e))
time.sleep(0.5)
logger.info("HMAC RTC monitor stopped")
def stop(self):
self.running = False
class RESTRTCMonitor:
def __init__(self, turn_rest_uri, turn_rest_username, turn_rest_username_auth_header, turn_protocol='udp', turn_rest_protocol_header='x-turn-protocol', turn_tls=False, turn_rest_tls_header='x-turn-tls', period=60, enabled=True):
self.period = period
self.enabled = enabled
self.running = False
self.turn_rest_uri = turn_rest_uri
self.turn_rest_username = turn_rest_username.replace(":", "-")
self.turn_rest_username_auth_header = turn_rest_username_auth_header
self.turn_protocol = turn_protocol
self.turn_rest_protocol_header = turn_rest_protocol_header
self.turn_tls = turn_tls
self.turn_rest_tls_header = turn_rest_tls_header
self.on_rtc_config = lambda stun_servers, turn_servers, rtc_config: logger.warning("unhandled on_rtc_config")
def start(self):
if self.enabled:
self.running = True
while self.running:
if self.enabled and int(time.time()) % self.period == 0:
try:
stun_servers, turn_servers, rtc_config = fetch_turn_rest(self.turn_rest_uri, self.turn_rest_username, self.turn_rest_username_auth_header, self.turn_protocol, self.turn_rest_protocol_header, self.turn_tls, self.turn_rest_tls_header)
self.on_rtc_config(stun_servers, turn_servers, rtc_config)
except Exception as e:
logger.warning("could not fetch TURN REST config in periodic monitor: {}".format(e))
time.sleep(0.5)
logger.info("TURN REST RTC monitor stopped")
def stop(self):
self.running = False
class RTCConfigFileMonitor:
def __init__(self, rtc_file, enabled=True):
self.enabled = enabled
self.running = False
self.rtc_file = rtc_file
self.on_rtc_config = lambda stun_servers, turn_servers, rtc_config: logger.warning("unhandled on_rtc_config")
self.observer = Observer()
self.file_event_handler = FileSystemEventHandler()
self.file_event_handler.on_closed = self.event_handler
self.observer.schedule(self.file_event_handler, self.rtc_file, recursive=False)
def event_handler(self, event):
if type(event) is FileClosedEvent:
print("Detected RTC JSON file change: {}".format(event.src_path))
try:
with open(self.rtc_file, 'rb') as f:
data = f.read()
stun_servers, turn_servers, rtc_config = parse_rtc_config(data)
self.on_rtc_config(stun_servers, turn_servers, rtc_config)
except Exception as e:
logger.warning("could not read RTC JSON file: {}: {}".format(self.rtc_file, e))
def start(self):
if self.enabled:
self.observer.start()
self.running = True
def stop(self):
self.observer.stop()
logger.info("RTC config file monitor stopped")
self.running = False
def make_turn_rtc_config_json_legacy(turn_host, turn_port, username, password, protocol='udp', turn_tls=False, stun_host=None, stun_port=None):
stun_list = ["stun:{}:{}".format(turn_host, turn_port)]
if stun_host is not None and stun_port is not None and (stun_host != turn_host or str(stun_port) != str(turn_port)):
stun_list.insert(0, "stun:{}:{}".format(stun_host, stun_port))
if stun_host != "stun.l.google.com" or (str(stun_port) != "19302"):
stun_list.append("stun:stun.l.google.com:19302")
rtc_config = {}
rtc_config["lifetimeDuration"] = "86400s"
rtc_config["blockStatus"] = "NOT_BLOCKED"
rtc_config["iceTransportPolicy"] = "all"
rtc_config["iceServers"] = []
rtc_config["iceServers"].append({
"urls": stun_list
})
rtc_config["iceServers"].append({
"urls": [
"{}:{}:{}?transport={}".format('turns' if turn_tls else 'turn', turn_host, turn_port, protocol)
],
"username": username,
"credential": password
})
return json.dumps(rtc_config, indent=2)
def parse_rtc_config(data):
ice_servers = json.loads(data)['iceServers']
stun_uris = []
turn_uris = []
for ice_server in ice_servers:
for url in ice_server.get("urls", []):
if url.startswith("stun:"):
stun_host = url.split(":")[1]
stun_port = url.split(":")[2].split("?")[0]
stun_uri = "stun://%s:%s" % (
stun_host,
stun_port
)
stun_uris.append(stun_uri)
elif url.startswith("turn:"):
turn_host = url.split(':')[1]
turn_port = url.split(':')[2].split('?')[0]
turn_user = ice_server['username']
turn_password = ice_server['credential']
turn_uri = "turn://%s:%s@%s:%s" % (
urllib.parse.quote(turn_user, safe=""),
urllib.parse.quote(turn_password, safe=""),
turn_host,
turn_port
)
turn_uris.append(turn_uri)
elif url.startswith("turns:"):
turn_host = url.split(':')[1]
turn_port = url.split(':')[2].split('?')[0]
turn_user = ice_server['username']
turn_password = ice_server['credential']
turn_uri = "turns://%s:%s@%s:%s" % (
urllib.parse.quote(turn_user, safe=""),
urllib.parse.quote(turn_password, safe=""),
turn_host,
turn_port
)
turn_uris.append(turn_uri)
return stun_uris, turn_uris, data
def fetch_turn_rest(uri, user, auth_header_username='x-auth-user', protocol='udp', header_protocol='x-turn-protocol', turn_tls=False, header_tls='x-turn-tls'):
"""Fetches TURN uri from a REST API
Arguments:
uri {string} -- uri of REST API service, example: http://localhost:8081/
user {string} -- username used to generate TURN credential, for example: <hostname>
Raises:
Exception -- if response http status code is >= 400
Returns:
[string] -- TURN URI used with gstwebrtcbin in the form of:
turn://<user>:<password>@<host>:<port>
NOTE that the user and password are URI encoded to escape special characters like '/'
"""
parsed_uri = urllib.parse.urlparse(uri)
conn = http.client.HTTPConnection(parsed_uri.netloc)
if parsed_uri.scheme == "https":
conn = http.client.HTTPSConnection(parsed_uri.netloc)
auth_headers = {
auth_header_username: user,
header_protocol: protocol,
header_tls: 'true' if turn_tls else 'false'
}
conn.request("GET", parsed_uri.path, headers=auth_headers)
resp = conn.getresponse()
data = resp.read()
if resp.status >= 400:
raise Exception("error fetching REST API config. Status code: {}. {}, {}".format(resp.status, resp.reason, data))
if not data:
raise Exception("data from REST API service was empty")
return parse_rtc_config(data)
def wait_for_app_ready(ready_file, app_wait_ready = False):
"""Wait for streaming app ready signal.
returns when either app_wait_ready is True OR the file at ready_file exists.
Keyword Arguments:
app_wait_ready {bool} -- skip wait for appready file (default: {True})
"""
logger.info("Waiting for streaming app ready")
logging.debug("app_wait_ready=%s, ready_file=%s" % (app_wait_ready, ready_file))
while app_wait_ready and not os.path.exists(ready_file):
time.sleep(0.2)
def set_json_app_argument(config_path, key, value):
"""Writes kv pair to json argument file
Arguments:
config_path {string} -- path to json config file, example: /tmp/selkies_config.json
key {string} -- the name of the argument to set
value {any} -- the value of the argument to set
"""
if not os.path.exists(config_path):
# Create new file
with open(config_path, 'w') as f:
json.dump({}, f)
# Read current config JSON
json_data = json.load(open(config_path))
# Set the new value for the argument
json_data[key] = value
# Save the json file
json.dump(json_data, open(config_path, 'w'))
return True
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--json_config',
default=os.environ.get(
'SELKIES_JSON_CONFIG', '/tmp/selkies_config.json'),
help='Path to the JSON file containing argument key-value pairs that are overlaid with CLI arguments or environment variables, this path must be writable')
parser.add_argument('--addr',
default=os.environ.get(
'SELKIES_ADDR', '0.0.0.0'),
help='Host to listen to for the signaling and web server, default: "0.0.0.0"')
parser.add_argument('--port',
default=os.environ.get(
'SELKIES_PORT', '8080'),
help='Port to listen to for the signaling and web server, default: "8080"')
parser.add_argument('--web_root',
default=os.environ.get(
'SELKIES_WEB_ROOT', '/opt/gst-web'),
help='Path to directory containing web application files, default: "/opt/gst-web"')
parser.add_argument('--enable_https',
default=os.environ.get(
'SELKIES_ENABLE_HTTPS', 'false'),
help='Enable or disable HTTPS for the web application, specifying a valid server certificate is recommended')
parser.add_argument('--https_cert',
default=os.environ.get(
'SELKIES_HTTPS_CERT', '/etc/ssl/certs/ssl-cert-snakeoil.pem'),
help='Path to the TLS server certificate file when HTTPS is enabled')
parser.add_argument('--https_key',
default=os.environ.get(
'SELKIES_HTTPS_KEY', '/etc/ssl/private/ssl-cert-snakeoil.key'),
help='Path to the TLS server private key file when HTTPS is enabled, set to an empty value if the private key is included in the certificate')
parser.add_argument('--enable_basic_auth',
default=os.environ.get(
'SELKIES_ENABLE_BASIC_AUTH', 'true'),
help='Enable basic authentication on server, must set --basic_auth_password and optionally --basic_auth_user to enforce basic authentication')
parser.add_argument('--basic_auth_user',
default=os.environ.get(
'SELKIES_BASIC_AUTH_USER', os.environ.get('USER', '')),
help='Username for basic authentication, default is to use the USER environment variable or a blank username if not present, must also set --basic_auth_password to enforce basic authentication')
parser.add_argument('--basic_auth_password',
default=os.environ.get(
'SELKIES_BASIC_AUTH_PASSWORD', 'mypasswd'),
help='Password used when basic authentication is set')
parser.add_argument('--rtc_config_json',
default=os.environ.get(
'SELKIES_RTC_CONFIG_JSON', '/tmp/rtc.json'),
help='JSON file with WebRTC configuration to use, checked periodically, overriding all other STUN/TURN settings')
parser.add_argument('--turn_rest_uri',
default=os.environ.get(
'SELKIES_TURN_REST_URI', ''),
help='URI for TURN REST API service, example: http://localhost:8008')
parser.add_argument('--turn_rest_username',
default=os.environ.get(
'SELKIES_TURN_REST_USERNAME', "selkies-{}".format(socket.gethostname())),
help='URI for TURN REST API service, default set to system hostname')
parser.add_argument('--turn_rest_username_auth_header',
default=os.environ.get(
'SELKIES_TURN_REST_USERNAME_AUTH_HEADER', 'x-auth-user'),
help='Header to pass user to TURN REST API service')
parser.add_argument('--turn_rest_protocol_header',
default=os.environ.get(
'SELKIES_TURN_REST_PROTOCOL_HEADER', 'x-turn-protocol'),
help='Header to pass desired TURN protocol to TURN REST API service')
parser.add_argument('--turn_rest_tls_header',
default=os.environ.get(
'SELKIES_TURN_REST_TLS_HEADER', 'x-turn-tls'),
help='Header to pass TURN (D)TLS usage to TURN REST API service')
parser.add_argument('--turn_host',
default=os.environ.get(
'SELKIES_TURN_HOST', 'staticauth.openrelay.metered.ca'),
help='TURN host when generating RTC config from shared secret or using long-term credentials, IPv6 addresses must be enclosed with square brackets such as [::1]')
parser.add_argument('--turn_port',
default=os.environ.get(
'SELKIES_TURN_PORT', '443'),
help='TURN port when generating RTC config from shared secret or using long-term credentials')
parser.add_argument('--turn_protocol',
default=os.environ.get(
'SELKIES_TURN_PROTOCOL', 'udp'),
help='TURN protocol for the client to use ("udp" or "tcp"), set to "tcp" without the quotes if "udp" is blocked on the network, "udp" is otherwise strongly recommended')
parser.add_argument('--turn_tls',
default=os.environ.get(
'SELKIES_TURN_TLS', 'false'),
help='Enable or disable TURN over TLS (for the TCP protocol) or TURN over DTLS (for the UDP protocol), valid TURN server certificate required')
parser.add_argument('--turn_shared_secret',
default=os.environ.get(
'SELKIES_TURN_SHARED_SECRET', 'openrelayprojectsecret'),
help='Shared TURN secret used to generate HMAC credentials, also requires --turn_host and --turn_port')
parser.add_argument('--turn_username',
default=os.environ.get(
'SELKIES_TURN_USERNAME', ''),
help='Legacy non-HMAC TURN credential username, also requires --turn_host and --turn_port')
parser.add_argument('--turn_password',
default=os.environ.get(
'SELKIES_TURN_PASSWORD', ''),
help='Legacy non-HMAC TURN credential password, also requires --turn_host and --turn_port')
parser.add_argument('--stun_host',
default=os.environ.get(
'SELKIES_STUN_HOST', 'stun.l.google.com'),
help='STUN host for NAT hole punching with WebRTC, change to your internal STUN/TURN server for local networks without internet, defaults to "stun.l.google.com"')
parser.add_argument('--stun_port',
default=os.environ.get(
'SELKIES_STUN_PORT', '19302'),
help='STUN port for NAT hole punching with WebRTC, change to your internal STUN/TURN server for local networks without internet, defaults to "19302"')
parser.add_argument('--app_wait_ready',
default=os.environ.get('SELKIES_APP_WAIT_READY', 'false'),
help='Waits for --app_ready_file to exist before starting stream if set to "true"')
parser.add_argument('--app_ready_file',
default=os.environ.get('SELKIES_APP_READY_FILE', '/tmp/selkies-appready'),
help='File set by sidecar used to indicate that app is initialized and ready')
parser.add_argument('--uinput_mouse_socket',
default=os.environ.get('SELKIES_UINPUT_MOUSE_SOCKET', ''),
help='Path to the uinput mouse socket, if not provided uinput is used directly')
parser.add_argument('--js_socket_path',
default=os.environ.get('SELKIES_JS_SOCKET_PATH', '/tmp'),
help='Directory to write the Selkies Joystick Interposer communication sockets to, default: /tmp, results in socket files: /tmp/selkies_js{0-3}.sock')
parser.add_argument('--encoder',
default=os.environ.get('SELKIES_ENCODER', 'x264enc'),
help='GStreamer video encoder to use')
parser.add_argument('--gpu_id',
default=os.environ.get('SELKIES_GPU_ID', '0'),
help='GPU ID for GStreamer hardware video encoders, will use enumerated GPU ID (0, 1, ..., n) for NVIDIA and /dev/dri/renderD{128 + n} for VA-API')
parser.add_argument('--framerate',
default=os.environ.get('SELKIES_FRAMERATE', '60'),
help='Framerate of the streamed remote desktop')
parser.add_argument('--video_bitrate',
default=os.environ.get('SELKIES_VIDEO_BITRATE', '8000'),
help='Default video bitrate in kilobits per second')
parser.add_argument('--keyframe_distance',
default=os.environ.get('SELKIES_KEYFRAME_DISTANCE', '-1'),
help='Distance between video keyframes/GOP-frames in seconds, defaults to "-1" for infinite keyframe distance (ideal for low latency and preventing periodic blurs)')
parser.add_argument('--congestion_control',
default=os.environ.get('SELKIES_CONGESTION_CONTROL', 'false'),
help='Enable Google Congestion Control (GCC), suggested if network conditions fluctuate and when bandwidth is >= 2 mbps but may lead to lower quality and microstutter due to adaptive bitrate in some encoders')
parser.add_argument('--video_packetloss_percent',
default=os.environ.get('SELKIES_VIDEO_PACKETLOSS_PERCENT', '0'),
help='Expected packet loss percentage (%%) for ULP/RED Forward Error Correction (FEC) in video, use "0" to disable FEC, less effective because of other mechanisms including NACK/PLI, enabling not recommended if Google Congestion Control is enabled')
parser.add_argument('--audio_bitrate',
default=os.environ.get('SELKIES_AUDIO_BITRATE', '128000'),
help='Default audio bitrate in bits per second')
parser.add_argument('--audio_channels',
default=os.environ.get('SELKIES_AUDIO_CHANNELS', '2'),
help='Number of audio channels, defaults to stereo (2 channels)')
parser.add_argument('--audio_packetloss_percent',
default=os.environ.get('SELKIES_AUDIO_PACKETLOSS_PERCENT', '0'),
help='Expected packet loss percentage (%%) for ULP/RED Forward Error Correction (FEC) in audio, use "0" to disable FEC')
parser.add_argument('--enable_clipboard',
default=os.environ.get('SELKIES_ENABLE_CLIPBOARD', 'true'),
help='Enable or disable the clipboard features, supported values: true, false, in, out')
parser.add_argument('--enable_resize',
default=os.environ.get('SELKIES_ENABLE_RESIZE', 'false'),
help='Enable dynamic resizing to match browser size')
parser.add_argument('--enable_cursors',
default=os.environ.get('SELKIES_ENABLE_CURSORS', 'true'),
help='Enable passing remote cursors to client')
parser.add_argument('--debug_cursors',
default=os.environ.get('SELKIES_DEBUG_CURSORS', 'false'),
help='Enable cursor debug logging')
parser.add_argument('--cursor_size',
default=os.environ.get('SELKIES_CURSOR_SIZE', os.environ.get('XCURSOR_SIZE', '-1')),
help='Cursor size in points for the local cursor, set instead XCURSOR_SIZE without of this argument to configure the cursor size for both the local and remote cursors')
parser.add_argument('--enable_webrtc_statistics',
default=os.environ.get('SELKIES_ENABLE_WEBRTC_STATISTICS', 'false'),
help='Enable WebRTC Statistics CSV dumping to the directory --webrtc_statistics_dir with filenames selkies-stats-video-[timestamp].csv and selkies-stats-audio-[timestamp].csv')
parser.add_argument('--webrtc_statistics_dir',
default=os.environ.get('SELKIES_WEBRTC_STATISTICS_DIR', '/tmp'),
help='Directory to save WebRTC Statistics CSV from client with filenames selkies-stats-video-[timestamp].csv and selkies-stats-audio-[timestamp].csv')
parser.add_argument('--enable_metrics_http',
default=os.environ.get('SELKIES_ENABLE_METRICS_HTTP', 'false'),
help='Enable the Prometheus HTTP metrics port')
parser.add_argument('--metrics_http_port',
default=os.environ.get('SELKIES_METRICS_HTTP_PORT', '8000'),
help='Port to start the Prometheus metrics server on')
parser.add_argument('--debug', action='store_true',
help='Enable debug logging')
args = parser.parse_args()
if os.path.exists(args.json_config):
# Read and overlay arguments from json file
# Note that these are explicit overrides only.
try:
json_args = json.load(open(args.json_config))
for k, v in json_args.items():
if k == "framerate":
args.framerate = str(int(v))
if k == "video_bitrate":
args.video_bitrate = str(int(v))
if k == "audio_bitrate":
args.audio_bitrate = str(int(v))
if k == "enable_resize":
args.enable_resize = str((str(v).lower() == 'true')).lower()
if k == "encoder":
args.encoder = v.lower()
except Exception as e:
logger.error("failed to load json config from {}: {}".format(args.json_config, str(e)))
logging.warn(args)
# Set log level
if args.debug:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
# Wait for streaming app to initialize
wait_for_app_ready(args.app_ready_file, args.app_wait_ready.lower() == "true")
# Peer id for this app, default is 0, expecting remote peer id to be 1
my_id = 0
peer_id = 1
my_audio_id = 2
audio_peer_id = 3
# Initialize metrics server
using_metrics_http = args.enable_metrics_http.lower() == 'true'
using_webrtc_csv = args.enable_webrtc_statistics.lower() == 'true'
metrics = Metrics(int(args.metrics_http_port), using_webrtc_csv)
# Initialize the signalling client
using_https = args.enable_https.lower() == 'true'
using_basic_auth = args.enable_basic_auth.lower() == 'true'
ws_protocol = 'wss:' if using_https else 'ws:'
signalling = WebRTCSignalling('%s//127.0.0.1:%s/ws' % (ws_protocol, args.port), my_id, peer_id,
enable_https=using_https,
enable_basic_auth=using_basic_auth,
basic_auth_user=args.basic_auth_user,
basic_auth_password=args.basic_auth_password)
# Initialize signalling client for audio connection
audio_signalling = WebRTCSignalling('%s//127.0.0.1:%s/ws' % (ws_protocol, args.port), my_audio_id, audio_peer_id,
enable_https=using_https,
enable_basic_auth=using_basic_auth,
basic_auth_user=args.basic_auth_user,
basic_auth_password=args.basic_auth_password)
# Handle errors from the signalling server
async def on_signalling_error(e):
if isinstance(e, WebRTCSignallingErrorNoPeer):
# Waiting for peer to connect, retry in 2 seconds.
time.sleep(2)
await signalling.setup_call()
else:
logger.error("signalling error: %s", str(e))
app.stop_pipeline()
async def on_audio_signalling_error(e):
if isinstance(e, WebRTCSignallingErrorNoPeer):
# Waiting for peer to connect, retry in 2 seconds.
time.sleep(2)
await audio_signalling.setup_call()
else:
logger.error("signalling error: %s", str(e))
audio_app.stop_pipeline()
signalling.on_error = on_signalling_error
audio_signalling.on_error = on_audio_signalling_error
signalling.on_disconnect = lambda: app.stop_pipeline()
audio_signalling.on_disconnect = lambda: audio_app.stop_pipeline()
# After connecting, attempt to setup call to peer
signalling.on_connect = signalling.setup_call
audio_signalling.on_connect = audio_signalling.setup_call
# [START main_setup]
# Fetch the TURN server and credentials
turn_rest_username = args.turn_rest_username.replace(":", "-")
rtc_config = None
turn_protocol = 'tcp' if args.turn_protocol.lower() == 'tcp' else 'udp'
using_turn_tls = args.turn_tls.lower() == 'true'
using_turn_rest = False
using_hmac_turn = False
using_rtc_config_json = False
if os.path.exists(args.rtc_config_json):
logger.warning("using JSON file from argument for RTC config, overrides all other STUN/TURN configuration")
with open(args.rtc_config_json, 'r') as f:
stun_servers, turn_servers, rtc_config = parse_rtc_config(f.read())
using_rtc_config_json = True
else:
if args.turn_rest_uri:
try:
stun_servers, turn_servers, rtc_config = fetch_turn_rest(
args.turn_rest_uri, turn_rest_username, args.turn_rest_username_auth_header, turn_protocol, args.turn_rest_protocol_header, using_turn_tls, args.turn_rest_tls_header)
logger.info("using TURN REST API RTC configuration, overrides long-term username/password or short-term shared secret STUN/TURN configuration")
using_turn_rest = True
except Exception as e:
logger.warning("error fetching TURN REST API RTC configuration, falling back to other methods: {}".format(str(e)))
using_turn_rest = False
if not using_turn_rest:
if (args.turn_username and args.turn_password) and (args.turn_host and args.turn_port):
config_json = make_turn_rtc_config_json_legacy(args.turn_host, args.turn_port, args.turn_username, args.turn_password, turn_protocol, using_turn_tls, args.stun_host, args.stun_port)
stun_servers, turn_servers, rtc_config = parse_rtc_config(config_json)
logger.info("using TURN long-term username/password credentials, prioritized over short-term shared secret configuration")
elif args.turn_shared_secret and (args.turn_host and args.turn_port):
hmac_data = generate_rtc_config(args.turn_host, args.turn_port, args.turn_shared_secret, turn_rest_username, turn_protocol, using_turn_tls, args.stun_host, args.stun_port)
stun_servers, turn_servers, rtc_config = parse_rtc_config(hmac_data)
logger.info("using TURN short-term shared secret HMAC credentials")
using_hmac_turn = True
else:
stun_servers, turn_servers, rtc_config = parse_rtc_config(DEFAULT_RTC_CONFIG)
logger.warning("missing TURN server information, using DEFAULT_RTC_CONFIG")
logger.info("initial server RTC configuration fetched")
# Extract arguments
enable_resize = args.enable_resize.lower() == "true"
audio_channels = int(args.audio_channels)
curr_fps = int(args.framerate)
gpu_id = int(args.gpu_id)
curr_video_bitrate = int(args.video_bitrate)
curr_audio_bitrate = int(args.audio_bitrate)
enable_cursors = args.enable_cursors.lower() == "true"
cursor_debug = args.debug_cursors.lower() == "true"
cursor_size = int(args.cursor_size)
keyframe_distance = float(args.keyframe_distance)
congestion_control = args.congestion_control.lower() == "true"
video_packetloss_percent = float(args.video_packetloss_percent)
audio_packetloss_percent = float(args.audio_packetloss_percent)
# Create instance of app
app = GSTWebRTCApp(stun_servers, turn_servers, audio_channels, curr_fps, args.encoder, gpu_id, curr_video_bitrate, curr_audio_bitrate, keyframe_distance, congestion_control, video_packetloss_percent, audio_packetloss_percent)
audio_app = GSTWebRTCApp(stun_servers, turn_servers, audio_channels, curr_fps, args.encoder, gpu_id, curr_video_bitrate, curr_audio_bitrate, keyframe_distance, congestion_control, video_packetloss_percent, audio_packetloss_percent)
# [END main_setup]
# Send the local sdp to signalling when offer is generated.
app.on_sdp = signalling.send_sdp
audio_app.on_sdp = audio_signalling.send_sdp
# Send ICE candidates to the signalling server
app.on_ice = signalling.send_ice
audio_app.on_ice = audio_signalling.send_ice
# Set the remote SDP when received from signalling server
signalling.on_sdp = app.set_sdp
audio_signalling.on_sdp = audio_app.set_sdp
# Set ICE candidates received from signalling server
signalling.on_ice = app.set_ice
audio_signalling.on_ice = audio_app.set_ice
# Start the pipeline once the session is established.
def on_session_handler(session_peer_id, meta=None):
logger.info("starting session for peer id {} with meta: {}".format(session_peer_id, meta))
if str(session_peer_id) == str(peer_id):
if meta:
if enable_resize:
if meta["res"]:
on_resize_handler(meta["res"])
if meta["scale"]:
on_scaling_ratio_handler(meta["scale"])
else:
logger.info("setting cursor to default size")
set_cursor_size(16)
logger.info("starting video pipeline")
app.start_pipeline()
elif str(session_peer_id) == str(audio_peer_id):
logger.info("starting audio pipeline")
audio_app.start_pipeline(audio_only=True)
else:
logger.error("failed to start pipeline for peer_id: %s" % peer_id)
signalling.on_session = on_session_handler
audio_signalling.on_session = on_session_handler
# Initialize the Xinput instance
cursor_scale = 1.0
webrtc_input = WebRTCInput(
args.uinput_mouse_socket,
args.js_socket_path,
args.enable_clipboard.lower(),
enable_cursors,
cursor_size,
cursor_scale,
cursor_debug)
# Handle changed cursors
webrtc_input.on_cursor_change = lambda data: app.send_cursor_data(data)
# Log message when data channel is open
def data_channel_ready():
logger.info(
"opened peer data channel for user input to X11")
app.send_framerate(app.framerate)
app.send_video_bitrate(app.video_bitrate)
app.send_audio_bitrate(audio_app.audio_bitrate)
app.send_resize_enabled(enable_resize)
app.send_encoder(app.encoder)
app.send_cursor_data(app.last_cursor_sent)
app.on_data_open = lambda: data_channel_ready()
# Send incoming messages from data channel to input handler
app.on_data_message = webrtc_input.on_message
# Send video bitrate messages to app
webrtc_input.on_video_encoder_bit_rate = lambda bitrate: set_json_app_argument(args.json_config, "video_bitrate", bitrate) and (app.set_video_bitrate(int(bitrate)))
# Send audio bitrate messages to app
webrtc_input.on_audio_encoder_bit_rate = lambda bitrate: set_json_app_argument(args.json_config, "audio_bitrate", bitrate) and audio_app.set_audio_bitrate(int(bitrate))
# Send pointer visibility setting to app
webrtc_input.on_mouse_pointer_visible = lambda visible: app.set_pointer_visible(
visible)
# Send clipboard contents when requested
webrtc_input.on_clipboard_read = lambda data: app.send_clipboard_data(data)
# Write framerate argument to local configuration and then tell client to reload.
def set_fps_handler(fps):
set_json_app_argument(args.json_config, "framerate", fps)
app.set_framerate(fps)
webrtc_input.on_set_fps = lambda fps: set_fps_handler(fps)
# Handler for resize events.
app.last_resize_success = True
def on_resize_handler(res):
# Trigger resize and reload if it changed.
curr_res, new_res, _, __, ___ = get_new_res(res)
if curr_res != new_res:
if not app.last_resize_success:
logger.warning("skipping resize because last resize failed.")
return
logger.warning("resizing display from {} to {}".format(curr_res, new_res))
if resize_display(res):
app.send_remote_resolution(res)
# Initial binding of enable resize handler.
if enable_resize:
webrtc_input.on_resize = on_resize_handler
else:
webrtc_input.on_resize = lambda res: logger.warning("remote resize is disabled, skipping resize to %s" % res)
# Handle for DPI events.
def on_scaling_ratio_handler(scale):
if scale < 0.75 or scale > 2.5:
logger.error("requested scale ratio out of bounds: {}".format(scale))
return
dpi = int(96 * scale)
logger.info("Setting DPI to: {}".format(dpi))
if not set_dpi(dpi):
logger.error("failed to set DPI to {}".format(dpi))
cursor_size = int(16 * scale)
logger.info("Setting cursor size to: {}".format(cursor_size))
if not set_cursor_size(cursor_size):
logger.error("failed to set cursor size to {}".format(cursor_size))
# Bind DPI handler.
if enable_resize:
webrtc_input.on_scaling_ratio = on_scaling_ratio_handler
else:
webrtc_input.on_scaling_ratio = lambda scale: logger.warning("remote resize is disabled, skipping DPI scale change to %s" % str(scale))
webrtc_input.on_ping_response = lambda latency: app.send_latency_time(latency)
# Enable resize with resolution handler
def enable_resize_handler(enabled, enable_res):
set_json_app_argument(args.json_config, "enable_resize", enabled)
if enabled:
# Bind the handlers
webrtc_input.on_resize = on_resize_handler
webrtc_input.on_scaling_ratio = on_scaling_ratio_handler
# Trigger resize and reload if it changed.
on_resize_handler(enable_res)
else:
logger.info("removing handler for on_resize")
webrtc_input.on_resize = lambda res: logger.warning("remote resize is disabled, skipping resize to %s" % res)
webrtc_input.on_scaling_ratio = lambda scale: logger.warning("remote resize is disabled, skipping DPI scale change to %s" % str(scale))
webrtc_input.on_set_enable_resize = enable_resize_handler
# Send client FPS to metrics
webrtc_input.on_client_fps = lambda fps: metrics.set_fps(fps)
# Send client latency to metrics
webrtc_input.on_client_latency = lambda latency_ms: metrics.set_latency(latency_ms)
# Send WebRTC stats to metrics
webrtc_input.on_client_webrtc_stats = lambda webrtc_stat_type, webrtc_stats: metrics.set_webrtc_stats(webrtc_stat_type, webrtc_stats)
# Initialize GPU monitor
gpu_mon = GPUMonitor(enabled=args.encoder.startswith("nv"))
# Send the GPU stats when available.
def on_gpu_stats(load, memory_total, memory_used):
app.send_gpu_stats(load, memory_total, memory_used)
metrics.set_gpu_utilization(load * 100)
gpu_mon.on_stats = on_gpu_stats
# Initialize the system monitor
system_mon = SystemMonitor()
def on_sysmon_timer(t):
webrtc_input.ping_start = t
app.send_system_stats(system_mon.cpu_percent, system_mon.mem_total, system_mon.mem_used)
app.send_ping(t)
system_mon.on_timer = on_sysmon_timer
# [START main_start]
# Connect to the signalling server and process messages.
loop = asyncio.get_event_loop()
# Handle SIGINT and SIGTERM where KeyboardInterrupt has issues with asyncio
loop.add_signal_handler(signal.SIGINT, lambda: sys.exit(1))
loop.add_signal_handler(signal.SIGTERM, lambda: sys.exit(1))
webrtc_input.loop = loop
# Initialize the signaling and web server
options = argparse.Namespace()
options.addr = args.addr
options.port = args.port
options.enable_basic_auth = using_basic_auth
options.basic_auth_user = args.basic_auth_user
options.basic_auth_password = args.basic_auth_password
options.enable_https = using_https
options.https_cert = args.https_cert
options.https_key = args.https_key
options.health = "/health"
options.web_root = os.path.abspath(args.web_root)
options.keepalive_timeout = 30
options.cert_restart = False # using_https
options.rtc_config_file = args.rtc_config_json
options.rtc_config = rtc_config
options.turn_shared_secret = args.turn_shared_secret if using_hmac_turn else ''
options.turn_host = args.turn_host if using_hmac_turn else ''
options.turn_port = args.turn_port if using_hmac_turn else ''
options.turn_protocol = turn_protocol
options.turn_tls = using_turn_tls
options.turn_auth_header_name = args.turn_rest_username_auth_header
options.stun_host = args.stun_host
options.stun_port = args.stun_port
server = WebRTCSimpleServer(loop, options)
# Callback method to update TURN servers of a running pipeline.
def mon_rtc_config(stun_servers, turn_servers, rtc_config):
if app.webrtcbin:
logger.info("updating STUN server")
app.webrtcbin.set_property("stun-server", stun_servers[0])
for i, turn_server in enumerate(turn_servers):
logger.info("updating TURN server")
if i == 0:
app.webrtcbin.set_property("turn-server", turn_server)
else:
app.webrtcbin.emit("add-turn-server", turn_server)
server.set_rtc_config(rtc_config)
# Initialize periodic monitor to refresh TURN RTC config when using shared secret.
hmac_turn_mon = HMACRTCMonitor(
args.turn_host,
args.turn_port,
args.turn_shared_secret,
turn_rest_username,
turn_protocol=turn_protocol,
turn_tls=using_turn_tls,
stun_host=args.stun_host,
stun_port=args.stun_port,
period=60, enabled=using_hmac_turn)
hmac_turn_mon.on_rtc_config = mon_rtc_config
# Initialize REST API RTC config monitor to periodically refresh the REST API RTC config.
turn_rest_mon = RESTRTCMonitor(
args.turn_rest_uri,
turn_rest_username,
args.turn_rest_username_auth_header,
turn_protocol=turn_protocol,
turn_rest_protocol_header=args.turn_rest_protocol_header,
turn_tls=using_turn_tls,
turn_rest_tls_header=args.turn_rest_tls_header,
period=60, enabled=using_turn_rest)
turn_rest_mon.on_rtc_config = mon_rtc_config
# Initialize file watcher for RTC config JSON file.
rtc_file_mon = RTCConfigFileMonitor(
rtc_file=args.rtc_config_json,
enabled=using_rtc_config_json)
rtc_file_mon.on_rtc_config = mon_rtc_config
try:
asyncio.ensure_future(server.run(), loop=loop)
if using_metrics_http:
metrics.start_http()
loop.run_until_complete(webrtc_input.connect())
loop.run_in_executor(None, lambda: webrtc_input.start_clipboard())
loop.run_in_executor(None, lambda: webrtc_input.start_cursor_monitor())
loop.run_in_executor(None, lambda: gpu_mon.start(gpu_id))
loop.run_in_executor(None, lambda: hmac_turn_mon.start())
loop.run_in_executor(None, lambda: turn_rest_mon.start())
loop.run_in_executor(None, lambda: rtc_file_mon.start())
loop.run_in_executor(None, lambda: system_mon.start())
while True:
if using_webrtc_csv:
metrics.initialize_webrtc_csv_file(args.webrtc_statistics_dir)
asyncio.ensure_future(app.handle_bus_calls(), loop=loop)
asyncio.ensure_future(audio_app.handle_bus_calls(), loop=loop)
loop.run_until_complete(signalling.connect())
loop.run_until_complete(audio_signalling.connect())
# asyncio.ensure_future(signalling.start(), loop=loop)
asyncio.ensure_future(audio_signalling.start(), loop=loop)
loop.run_until_complete(signalling.start())
app.stop_pipeline()
audio_app.stop_pipeline()
webrtc_input.stop_js_server()
except Exception as e:
logger.error("Caught exception: %s" % e)
traceback.print_exc()
sys.exit(1)
finally:
app.stop_pipeline()
audio_app.stop_pipeline()
webrtc_input.stop_clipboard()
webrtc_input.stop_cursor_monitor()
webrtc_input.stop_js_server()
webrtc_input.disconnect()
gpu_mon.stop()
hmac_turn_mon.stop()
turn_rest_mon.stop()
rtc_file_mon.stop()
system_mon.stop()
loop.run_until_complete(server.stop())
sys.exit(0)
# [END main_start]
if __name__ == '__main__':
main()