diff --git a/src/xpra/client/ui_client_base.py b/src/xpra/client/ui_client_base.py index 9128e47b45..dc3a2b3d2f 100644 --- a/src/xpra/client/ui_client_base.py +++ b/src/xpra/client/ui_client_base.py @@ -45,7 +45,7 @@ from xpra.simple_stats import std_unit from xpra.net import compression, packet_encoding from xpra.daemon_thread import make_daemon_thread -from xpra.os_util import thread, Queue, os_info, platform_name, get_machine_id, get_user_uuid, bytestostr +from xpra.os_util import Queue, os_info, platform_name, get_machine_id, get_user_uuid, bytestostr from xpra.util import nonl, std, AtomicInteger, AdHocStruct, log_screen_sizes, typedict, CLIENT_EXIT try: from xpra.clipboard.clipboard_base import ALL_CLIPBOARDS @@ -1551,8 +1551,8 @@ def sound_source_state_changed(*args): def sound_source_bitrate_changed(*args): self.emit("microphone-changed") try: - from xpra.sound.gstreamer_util import start_sending_sound - self.sound_source = start_sending_sound(self.sound_source_plugin, None, 1.0, self.server_sound_decoders, self.microphone_codecs, self.server_pulseaudio_server, self.server_pulseaudio_id) + from xpra.sound.wrapper import start_sending_sound + self.sound_source = start_sending_sound(self.sound_source_plugin, None, 1.0, self.server_sound_decoders, self.server_pulseaudio_server, self.server_pulseaudio_id) if not self.sound_source: return False self.sound_source.connect("new-buffer", self.new_sound_buffer) @@ -1571,15 +1571,11 @@ def stop_sending_sound(self): ss = self.sound_source self.microphone_enabled = False self.sound_source = None - def stop_sending_sound_thread(): - soundlog("UIXpraClient.stop_sending_sound_thread()") - if ss is None: - log.warn("stop_sending_sound: sound not started!") - return - ss.cleanup() - self.emit("microphone-changed") - soundlog("UIXpraClient.stop_sending_sound_thread() done") - thread.start_new_thread(stop_sending_sound_thread, ()) + if ss is None: + log.warn("stop_sending_sound: sound not started!") + return + ss.cleanup() + self.emit("microphone-changed") def start_receiving_sound(self): """ ask the server to start sending sound and emit the client signal """ @@ -1617,15 +1613,8 @@ def stop_receiving_sound(self, tell_server=True): if ss is None: return self.sound_sink = None - def stop_receiving_sound_thread(): - soundlog("UIXpraClient.stop_receiving_sound_thread()") - if ss is None: - log("stop_receiving_sound: sound not started!") - return - ss.cleanup() - self.emit("speaker-changed") - soundlog("UIXpraClient.stop_receiving_sound_thread() done") - thread.start_new_thread(stop_receiving_sound_thread, ()) + ss.cleanup() + self.emit("speaker-changed") def bump_sound_sequence(self): if self.server_sound_sequence: @@ -1652,14 +1641,17 @@ def sound_sink_bitrate_changed(self, sound_sink, bitrate): #not shown in the UI, so don't bother with emitting a signal: #self.emit("speaker-changed") def sound_sink_error(self, sound_sink, error): - log.warn("stopping speaker because of error: %s", error) + soundlog.warn("stopping speaker because of error: %s", error) + self.stop_receiving_sound() + def sound_process_stopped(self, sound_sink, *args): + soundlog("the sound sink process has stopped (%s)", args) self.stop_receiving_sound() def sound_sink_overrun(self, *args): if self.sink_restart_pending: soundlog("overrun re-start is already pending") return - log.warn("re-starting speaker because of overrun") + soundlog.warn("re-starting speaker because of overrun") codec = self.sound_sink.codec self.sink_restart_pending = True if self.server_sound_sequence: @@ -1680,12 +1672,16 @@ def start_sound_sink(self, codec): assert self.sound_sink is None, "sound sink already exists!" try: soundlog("starting %s sound sink", codec) - from xpra.sound.sink import SoundSink - self.sound_sink = SoundSink(codec=codec) + from xpra.sound.wrapper import start_receiving_sound + self.sound_sink = start_receiving_sound(codec) + if not self.sound_sink: + return False self.sound_sink.connect("state-changed", self.sound_sink_state_changed) self.sound_sink.connect("bitrate-changed", self.sound_sink_bitrate_changed) self.sound_sink.connect("error", self.sound_sink_error) self.sound_sink.connect("overrun", self.sound_sink_overrun) + from xpra.net.protocol import Protocol + self.sound_sink.connect(Protocol.CONNECTION_LOST, self.sound_process_stopped) self.sound_sink.start() soundlog("%s sound sink started", codec) return True diff --git a/src/xpra/net/bytestreams.py b/src/xpra/net/bytestreams.py index 007dd95d1a..949541d686 100644 --- a/src/xpra/net/bytestreams.py +++ b/src/xpra/net/bytestreams.py @@ -21,6 +21,15 @@ errno.ECONNRESET : "ECONNRESET", errno.EPIPE : "EPIPE"} continue_wait = 0 + +#default to using os.read and os.write for both tty devices and regular streams +#(but overriden for win32 below for tty devices to workaround an OS "feature") +OS_READ = os.read +OS_WRITE = os.write +TTY_READ = os.read +TTY_WRITE = os.write + + if sys.platform.startswith("win"): #on win32, we have to deal with a few more odd error codes: #(it would be nicer if those were wrapped using errno instead..) @@ -45,6 +54,16 @@ #on win32, we want to wait just a little while, #to prevent servers spinning wildly on non-blocking sockets: continue_wait = 5 + if sys.version[0]<"3": + #win32 has problems writing more than 32767 characters to stdout! + #see: http://bugs.python.org/issue11395 + #(this is fixed in python 3.2 and we don't care about 3.0 or 3.1) + def win32ttywrite(fd, buf): + #this awful limitation only applies to tty devices: + if len(buf)>32767: + buf = buf[:32767] + return os.write(fd, buf) + TTY_WRITE = win32ttywrite def untilConcludes(is_active_cb, f, *a, **kw): @@ -120,6 +139,16 @@ def __init__(self, writeable, readable, abort_test=None, target=None, info="", c Connection.__init__(self, target, info) self._writeable = writeable self._readable = readable + self._read_fd = self._readable.fileno() + self._write_fd = self._writeable.fileno() + if os.isatty(self._read_fd): + self._osread = TTY_READ + else: + self._osread = OS_READ + if os.isatty(self._write_fd): + self._oswrite = TTY_WRITE + else: + self._oswrite = OS_WRITE self._abort_test = abort_test self._close_cb = close_cb @@ -130,19 +159,22 @@ def may_abort(self, action): def read(self, n): self.may_abort("read") - return self._read(os.read, self._readable.fileno(), n) + return self._read(self._osread, self._read_fd, n) def write(self, buf): self.may_abort("write") - return self._write(os.write, self._writeable.fileno(), buf) + return self._write(self._oswrite, self._write_fd, buf) def close(self): Connection.close(self) try: - self._writeable.close() self._readable.close() except: pass + try: + self._writeable.close() + except: + pass if self._close_cb: self._close_cb() diff --git a/src/xpra/os_util.py b/src/xpra/os_util.py index 55fc7c5847..ffb900c013 100644 --- a/src/xpra/os_util.py +++ b/src/xpra/os_util.py @@ -173,6 +173,26 @@ def force_quit(status=1): os._exit(status) +def disable_stdout_buffering(): + import gc + # Appending to gc.garbage is a way to stop an object from being + # destroyed. If the old sys.stdout is ever collected, it will + # close() stdout, which is not good. + gc.garbage.append(sys.stdout) + sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0) + +def setbinarymode(fd): + if sys.platform.startswith("win"): + #turn on binary mode: + try: + import msvcrt + msvcrt.setmode(fd, os.O_BINARY) #@UndefinedVariable + except: + from xpra.log import Logger + log = Logger("util") + log.error("setting stdin to binary mode failed", exc_info=True) + + def find_lib(libname): #it would be better to rely on dlopen to find the paths #but I cannot find a way of getting ctypes to tell us the path diff --git a/src/xpra/platform/darwin/paths.py b/src/xpra/platform/darwin/paths.py index 93a1f0d115..66a574f3bc 100644 --- a/src/xpra/platform/darwin/paths.py +++ b/src/xpra/platform/darwin/paths.py @@ -67,3 +67,8 @@ def get_system_conf_dir(): #the system wide configuration directory default_conf_dir = "/Library/Application Support/Xpra" return os.environ.get("XPRA_SYSCONF_DIR", default_conf_dir) + + +def get_sound_executable(): + helper = os.path.join(get_app_dir(), "MacOS", "Xpra") + return os.environ.get("XPRA_SOUND_EXECUTABLE", helper) diff --git a/src/xpra/platform/paths.py b/src/xpra/platform/paths.py index 0564f9d6c5..769dc5da57 100644 --- a/src/xpra/platform/paths.py +++ b/src/xpra/platform/paths.py @@ -127,12 +127,17 @@ def get_license_text(self): return LICENSE_TEXT +def get_sound_executable(): + return os.environ.get("XPRA_SOUND_EXECUTABLE", "xpra") + + from xpra.platform import platform_import platform_import(globals(), "paths", True, "get_resources_dir", "get_app_dir", "get_icon_dir") platform_import(globals(), "paths", False, + "get_sound_executable", "get_install_prefix", "get_default_conf_dir", "get_system_conf_dir", @@ -151,6 +156,7 @@ def get_info(): "resources" : get_resources_dir(), "icons" : get_icon_dir(), "home" : os.path.expanduser("~"), + "sound_executable" : get_sound_executable(), } diff --git a/src/xpra/platform/win32/paths.py b/src/xpra/platform/win32/paths.py index 41c73dc9a0..2ea1754451 100644 --- a/src/xpra/platform/win32/paths.py +++ b/src/xpra/platform/win32/paths.py @@ -77,3 +77,6 @@ def get_app_dir(): return APP_DIR from xpra.platform.paths import default_get_app_dir #imported here to prevent import loop return default_get_app_dir() + +def get_sound_executable(): + return os.environ.get("XPRA_SOUND_EXECUTABLE", "xpra_cmd.exe") diff --git a/src/xpra/scripts/main.py b/src/xpra/scripts/main.py index c6ad931c25..64dc4a283c 100755 --- a/src/xpra/scripts/main.py +++ b/src/xpra/scripts/main.py @@ -803,7 +803,7 @@ def dump_frames(*arsg): def configure_logging(options, mode): - if mode in ("start", "upgrade", "attach", "shadow", "proxy"): + if mode in ("start", "upgrade", "attach", "shadow", "proxy", "_sound_record", "_sound_play"): if "help" in options.speaker_codec or "help" in options.microphone_codec: from xpra.sound.gstreamer_util import show_sound_codec_help info = show_sound_codec_help(mode!="attach", options.speaker_codec, options.microphone_codec) diff --git a/src/xpra/server/source.py b/src/xpra/server/source.py index b77dd50f14..d877428624 100644 --- a/src/xpra/server/source.py +++ b/src/xpra/server/source.py @@ -36,7 +36,7 @@ from xpra.net import compression from xpra.net.compression import compressed_wrapper, Compressed, Uncompressed from xpra.daemon_thread import make_daemon_thread -from xpra.os_util import platform_name, thread, Queue, get_machine_id, get_user_uuid +from xpra.os_util import platform_name, Queue, get_machine_id, get_user_uuid from xpra.server.background_worker import add_work_item from xpra.util import std, typedict, updict, get_screen_info, CLIENT_PING_TIMEOUT, WORKSPACE_UNSET, DEFAULT_METADATA_SUPPORTED @@ -764,7 +764,8 @@ def start_sending_sound(self, codec, volume=1.0): log.warn("not starting sound as we are suspended") return try: - from xpra.sound.gstreamer_util import start_sending_sound, ALLOW_SOUND_LOOP + from xpra.sound.gstreamer_util import ALLOW_SOUND_LOOP + from xpra.sound.wrapper import start_sending_sound if self.machine_id and self.machine_id==get_machine_id() and not ALLOW_SOUND_LOOP: #looks like we're on the same machine, verify it's a different user: if self.uuid==get_user_uuid(): @@ -773,16 +774,11 @@ def start_sending_sound(self, codec, volume=1.0): assert self.supports_speaker, "cannot send sound: support not enabled on the server" assert self.sound_source is None, "a sound source already exists" assert self.sound_receive, "cannot send sound: support is not enabled on the client" - self.sound_source = start_sending_sound(self.sound_source_plugin, codec, volume, self.sound_decoders, self.microphone_codecs, self.pulseaudio_server, self.pulseaudio_id) + self.sound_source = start_sending_sound(self.sound_source_plugin, codec, volume, self.sound_decoders, self.pulseaudio_server, self.pulseaudio_id) soundlog("start_sending_sound() sound source=%s", self.sound_source) if self.sound_source: - if self.server_driven: - #tell the client this is the start: - self.send("sound-data", self.sound_source.codec, "", - {"start-of-stream" : True, - "codec" : self.sound_source.codec, - "sequence" : self.sound_source_sequence}) self.sound_source.connect("new-buffer", self.new_sound_buffer) + self.sound_source.connect("new-stream", self.new_stream) self.sound_source.start() except Exception as e: log.error("error setting up sound: %s", e, exc_info=True) @@ -795,11 +791,17 @@ def stop_sending_sound(self): if self.server_driven: #tell the client this is the end: self.send("sound-data", ss.codec, "", {"end-of-stream" : True}) - def stop_sending_sound_thread(*args): - soundlog("stop_sending_sound_thread(%s)", args) - ss.cleanup() - soundlog("stop_sending_sound_thread(%s) done", args) - thread.start_new_thread(stop_sending_sound_thread, ()) + ss.cleanup() + + def new_stream(self, sound_source, codec): + soundlog("new_stream(%s)", codec) + self.sound_source.codec = codec + if self.server_driven: + #tell the client this is the start: + self.send("sound-data", self.sound_source.codec, "", + {"start-of-stream" : True, + "codec" : self.sound_source.codec, + "sequence" : self.sound_source_sequence}) def new_sound_buffer(self, sound_source, data, metadata): soundlog("new_sound_buffer(%s, %s, %s) suspended=%s, sequence=%s", @@ -815,11 +817,7 @@ def stop_receiving_sound(self): soundlog("stop_receiving_sound() sound_sink=%s", ss) if ss: self.sound_sink = None - def stop_receiving_sound_thread(*args): - soundlog("stop_receiving_sound_thread() sound_sink=%s", ss) - ss.cleanup() - soundlog("stop_receiving_sound_thread() done") - thread.start_new_thread(stop_receiving_sound_thread, ()) + ss.cleanup() def sound_control(self, action, *args): diff --git a/src/xpra/sound/gstreamer_util.py b/src/xpra/sound/gstreamer_util.py index d5e1a0bae7..3d0482cfda 100755 --- a/src/xpra/sound/gstreamer_util.py +++ b/src/xpra/sound/gstreamer_util.py @@ -7,7 +7,6 @@ import sys import os -from xpra.util import AdHocStruct from xpra.log import Logger log = Logger("sound") @@ -87,7 +86,8 @@ def get_queue_time(default_value=450): ] CODECS = {} -CODEC_ORDER = [MP3, WAVPACK, WAV, FLAC, SPEEX] +#CODEC_ORDER = [MP3, WAVPACK, WAV, FLAC, SPEEX] +CODEC_ORDER = [MP3, FLAC, SPEEX] #code to temporarily redirect stderr and restore it afterwards, adapted from: @@ -486,40 +486,6 @@ def parse_sound_source(sound_source_plugin, remote): return gst_sound_source_plugin, options -def start_sending_sound(sound_source_plugin, codec, volume, remote_decoders, local_decoders, remote_pulseaudio_server, remote_pulseaudio_id): - assert has_gst - try: - #info about the remote end: - remote = AdHocStruct() - remote.pulseaudio_server = remote_pulseaudio_server - remote.pulseaudio_id = remote_pulseaudio_id - remote.remote_decoders = remote_decoders - plugin, options = parse_sound_source(sound_source_plugin, remote) - if not plugin: - log.error("failed to setup '%s' sound stream source", (sound_source_plugin or "auto")) - return None - log("parsed '%s':", sound_source_plugin) - log("plugin=%s", plugin) - log("options=%s", options) - matching_codecs = [x for x in remote_decoders if x in local_decoders] - ordered_codecs = [x for x in CODEC_ORDER if x in matching_codecs] - if len(ordered_codecs)==0: - log.error("no matching codecs between remote (%s) and local (%s) - sound disabled", remote_decoders, local_decoders) - return None - if codec is not None and codec not in matching_codecs: - log.warn("invalid codec specified: %s", codec) - codec = None - if codec is None: - codec = ordered_codecs[0] - log("using sound codec %s", codec) - from xpra.sound.src import SoundSource - log.info("starting sound stream capture using %s source", PLUGIN_TO_DESCRIPTION.get(plugin, plugin)) - return SoundSource(plugin, options, codec, volume, {}) - except Exception as e: - log.error("error setting up sound: %s", e, exc_info=True) - return None - - def get_info(receive=True, send=True, receive_codecs=[], send_codecs=[]): if not has_gst: return {} diff --git a/src/xpra/sound/sink.py b/src/xpra/sound/sink.py index 9f670bbb56..7b42d1a838 100755 --- a/src/xpra/sound/sink.py +++ b/src/xpra/sound/sink.py @@ -8,7 +8,7 @@ from xpra.sound.sound_pipeline import SoundPipeline, gobject, one_arg_signal from xpra.sound.pulseaudio_util import has_pa -from xpra.sound.gstreamer_util import plugin_str, get_decoder_parser, get_queue_time, normv, MP3, CODECS, gst, QUEUE_LEAK, MS_TO_NS +from xpra.sound.gstreamer_util import plugin_str, get_decoder_parser, get_queue_time, normv, MP3, CODECS, CODEC_ORDER, gst, QUEUE_LEAK, MS_TO_NS from xpra.os_util import thread from xpra.log import Logger @@ -28,8 +28,13 @@ if os.name=="posix": SINKS += ["alsasink", "osssink", "oss4sink", "jackaudiosink"] -SINK_SHARED_DEFAULT_ATTRIBUTES = {"sync" : False, - "async" : True} +#SINK_SHARED_DEFAULT_ATTRIBUTES = {"sync" : False, +# "async" : True} +SINK_SHARED_DEFAULT_ATTRIBUTES = {"sync" : False, + "async" : True, + "qos" : True + } + SINK_DEFAULT_ATTRIBUTES = { "pulsesink" : {"client" : "Xpra"} } @@ -40,7 +45,10 @@ DEFAULT_SINK = SINKS[0] QUEUE_SILENT = 0 QUEUE_TIME = get_queue_time(450) +QUEUE_MIN = QUEUE_TIME//2 + +GST_FORMAT_BUFFERS = 4 def sink_has_device_attribute(sink): return sink not in ("autoaudiosink", "jackaudiosink", "directsoundsink") @@ -55,12 +63,18 @@ class SoundSink(SoundPipeline): "eos" : one_arg_signal, }) - def __init__(self, sink_type=DEFAULT_SINK, options={}, codec=MP3, decoder_options={}): + def __init__(self, sink_type=None, sink_options={}, codecs=CODECS, codec_options={}, volume=1.0): + if not sink_type: + sink_type = DEFAULT_SINK assert sink_type in SINKS, "invalid sink: %s" % sink_type + matching = [x for x in CODEC_ORDER if (x in codecs and x in CODECS)] + log("SoundSink(..) found matching codecs %s", matching) + assert len(matching)>0, "no matching codecs between arguments %s and supported list %s" % (codecs, CODECS) + codec = matching[0] decoder, parser = get_decoder_parser(codec) SoundPipeline.__init__(self, codec) self.sink_type = sink_type - decoder_str = plugin_str(decoder, decoder_options) + decoder_str = plugin_str(decoder, codec_options) pipeline_els = [] pipeline_els.append("appsrc"+ " name=src"+ @@ -69,13 +83,15 @@ def __init__(self, sink_type=DEFAULT_SINK, options={}, codec=MP3, decoder_option " block=0"+ " is-live=0"+ " stream-type=stream"+ - " format=4") + " format=%s" % GST_FORMAT_BUFFERS) pipeline_els.append(parser) pipeline_els.append(decoder_str) pipeline_els.append("audioconvert") pipeline_els.append("audioresample") + pipeline_els.append("volume name=volume volume=%s" % volume) queue_el = ["queue", "name=queue", + "min-threshold-time=%s" % QUEUE_MIN, "max-size-buffers=0", "max-size-bytes=0", "max-size-time=%s" % QUEUE_TIME, @@ -85,11 +101,13 @@ def __init__(self, sink_type=DEFAULT_SINK, options={}, codec=MP3, decoder_option pipeline_els.append(" ".join(queue_el)) sink_attributes = SINK_SHARED_DEFAULT_ATTRIBUTES.copy() sink_attributes.update(SINK_DEFAULT_ATTRIBUTES.get(sink_type, {})) + sink_attributes.update(sink_options) sink_str = plugin_str(sink_type, sink_attributes) pipeline_els.append(sink_str) self.setup_pipeline_and_bus(pipeline_els) - self.src = self.pipeline.get_by_name("src") - self.queue = self.pipeline.get_by_name("queue") + self.volume = self.pipeline.get_by_name("volume") + self.src = self.pipeline.get_by_name("src") + self.queue = self.pipeline.get_by_name("queue") self.overruns = 0 self.queue_state = "starting" if QUEUE_SILENT==0: @@ -101,6 +119,12 @@ def __init__(self, sink_type=DEFAULT_SINK, options={}, codec=MP3, decoder_option def __repr__(self): return "SoundSink('%s' - %s)" % (self.pipeline_str, self.state) + def cleanup(self): + SoundPipeline.cleanup(self) + self.sink_type = "" + self.src = None + + def queue_pushing(self, *args): ltime = int(self.queue.get_property("current-level-time")/MS_TO_NS) log("sound sink queue pushing: level=%s", ltime) @@ -109,11 +133,19 @@ def queue_pushing(self, *args): def queue_running(self, *args): ltime = int(self.queue.get_property("current-level-time")/MS_TO_NS) log("sound sink queue running: level=%s", ltime) + if self.queue_state=="underrun": + #lift min time restrictions: + #gobject.timeout_add(400, self.queue.set_property, "min-threshold-time", 0) + self.queue.set_property("min-threshold-time", 0) + #pass self.queue_state = "running" def queue_underrun(self, *args): ltime = int(self.queue.get_property("current-level-time")/MS_TO_NS) log("sound sink queue underrun: level=%s", ltime) + if self.queue_state!="underrun": + #lift min time restrictions: + self.queue.set_property("min-threshold-time", QUEUE_MIN) self.queue_state = "underrun" def queue_overrun(self, *args): @@ -121,18 +153,13 @@ def queue_overrun(self, *args): self.queue_state = "overrun" #no overruns for the first 2 seconds: elapsed = time.time()-self.start_time - if elapsed<2.0 or ltime<(QUEUE_TIME/MS_TO_NS/2*75/100): + if ltime<(QUEUE_TIME/MS_TO_NS/2*75/100): log("sound sink queue overrun ignored: level=%s, elapsed time=%.1f", ltime, elapsed) return log("sound sink queue overrun: level=%s", ltime) self.overruns += 1 self.emit("overrun", ltime) - def cleanup(self): - SoundPipeline.cleanup(self) - self.sink_type = "" - self.src = None - def eos(self): log("eos()") if self.src: @@ -150,10 +177,18 @@ def get_info(self): def add_data(self, data, metadata=None): #debug("sound sink: adding %s bytes to %s, metadata: %s, level=%s", len(data), self.src, metadata, int(self.queue.get_property("current-level-time")/MS_TO_NS)) + log("add_data(%s bytes, %s) queue_state=%s, src=%s", len(data), metadata, self.queue_state, self.src) if not self.src: return + if self.queue_state == "overrun": + clt = self.queue.get_property("current-level-time") + qpct = int(min(QUEUE_TIME, clt)*100.0/QUEUE_TIME) + if qpct<50: + self.queue_state = "running" + else: + log("dropping new data because of overrun: %s%%", qpct) + return buf = gst.new_buffer(data) - d = 10*MS_TO_NS if metadata: ts = metadata.get("timestamp") if ts is not None: @@ -161,7 +196,15 @@ def add_data(self, data, metadata=None): d = metadata.get("duration") if d is not None: buf.duration = normv(d) - log("add_data(..) queue_state=%s", self.queue_state) + #for seeing how the elapsed time evolves + #(cannot be used for much else as client and server may have different times!) + #t = metadata.get("time") + #if t: + # log("elapsed=%s (..)", int(time.time()*1000)-t) + #if we have caps, use them: + #caps = metadata.get("caps") + #if caps: + # buf.set_caps(gst.caps_from_string(caps)) if self.push_buffer(buf): self.buffer_count += 1 self.byte_count += len(data) diff --git a/src/xpra/sound/sound_pipeline.py b/src/xpra/sound/sound_pipeline.py index db697afb15..86b346a3a5 100644 --- a/src/xpra/sound/sound_pipeline.py +++ b/src/xpra/sound/sound_pipeline.py @@ -20,6 +20,7 @@ class SoundPipeline(gobject.GObject): "state-changed" : one_arg_signal, "bitrate-changed" : one_arg_signal, "error" : one_arg_signal, + "new-stream" : one_arg_signal, } def __init__(self, codec): @@ -37,6 +38,9 @@ def __init__(self, codec): self.buffer_count = 0 self.byte_count = 0 + def idle_emit(self, sig, *args): + gobject.idle_add(self.emit, sig, *args) + def get_info(self): info = {"codec" : self.codec, "codec_description" : self.codec_description, @@ -44,6 +48,7 @@ def get_info(self): "buffers" : self.buffer_count, "bytes" : self.byte_count, "pipeline" : self.pipeline_str, + "volume" : self.get_volume(), } if self.codec_mode: info["codec_mode"] = self.codec_mode @@ -79,25 +84,48 @@ def update_bitrate(self, new_bitrate): log("new bitrate: %s", self.bitrate) #self.emit("bitrate-changed", new_bitrate) + + def set_volume(self, volume=100): + if self.volume: + self.volume.set_property("volume", volume/100.0) + + def get_volume(self): + if self.volume: + return int(self.volume.get_property("volume")*100) + return 0 + + def start(self): log("SoundPipeline.start()") + self.idle_emit("new-stream", self.codec) self.state = "active" self.pipeline.set_state(gst.STATE_PLAYING) log("SoundPipeline.start() done") def stop(self): + if not self.pipeline: + return log("SoundPipeline.stop()") + #uncomment this to see why we end up calling stop() + #import traceback + #for x in traceback.format_stack(): + # for s in x.split("\n"): + # v = s.replace("\r", "").replace("\n", "") + # if v: + # log(v) self.state = "stopped" self.pipeline.set_state(gst.STATE_NULL) + self.volume = None log("SoundPipeline.stop() done") def cleanup(self): - log("SoundPipeline.cleanup()") self.stop() - if self.bus: - self.bus.remove_signal_watch() - if self.bus_message_handler_id: - self.bus.disconnect(self.bus_message_handler_id) + if not self.bus: + return + log("SoundPipeline.cleanup()") + self.bus.remove_signal_watch() + if self.bus_message_handler_id: + self.bus.disconnect(self.bus_message_handler_id) self.bus = None self.pipeline = None self.codec = None @@ -112,13 +140,13 @@ def on_message(self, bus, message): self.pipeline.set_state(gst.STATE_NULL) log.info("sound source EOS") self.state = "stopped" - self.emit("state-changed", self.state) + self.idle_emit("state-changed", self.state) elif t == gst.MESSAGE_ERROR: self.pipeline.set_state(gst.STATE_NULL) err, details = message.parse_error() log.error("sound source pipeline error: %s / %s", err, details) self.state = "error" - self.emit("state-changed", self.state) + self.idle_emit("state-changed", self.state) elif t == gst.MESSAGE_TAG: try: #Gst 0.10: @@ -138,12 +166,15 @@ def on_message(self, bus, message): _, new_state, _ = message.parse_state_changed() log("new-state=%s", gst.element_state_get_name(new_state)) self.state = self.do_get_state(new_state) - self.emit("state-changed", self.state) + self.idle_emit("state-changed", self.state) else: log("state changed: %s", message) elif t == gst.MESSAGE_DURATION: d = message.parse_duration() - log("duration changed: %s", d) + try: + log("duration changed: %s", d[1]) + except: + log("duration changed: %s", d) elif t == gst.MESSAGE_LATENCY: log.info("Latency message from %s: %s", message.src, message) elif t == gst.MESSAGE_INFO: diff --git a/src/xpra/sound/src.py b/src/xpra/sound/src.py index 96c034dfb5..921a89cefe 100755 --- a/src/xpra/sound/src.py +++ b/src/xpra/sound/src.py @@ -5,10 +5,12 @@ # later version. See the file COPYING for details. import sys +import time +from xpra.os_util import SIGNAMES from xpra.sound.sound_pipeline import SoundPipeline, gobject from xpra.gtk_common.gobject_util import n_arg_signal -from xpra.sound.gstreamer_util import plugin_str, get_encoder_formatter, get_source_plugins, get_queue_time, normv, MP3, CODECS, QUEUE_LEAK +from xpra.sound.gstreamer_util import plugin_str, get_encoder_formatter, get_source_plugins, get_queue_time, normv, MP3, CODECS, CODEC_ORDER, QUEUE_LEAK from xpra.log import Logger log = Logger("sound") @@ -25,13 +27,32 @@ class SoundSource(SoundPipeline): "new-buffer" : n_arg_signal(2), }) - def __init__(self, src_type, src_options={}, codec=MP3, volume=1.0, encoder_options={}): + def __init__(self, src_type=None, src_options={}, codecs=CODECS, codec_options={}, volume=1.0): + if not src_type: + from xpra.sound.pulseaudio_util import get_pa_device_options + monitor_devices = get_pa_device_options(True, False) + log.info("found pulseaudio monitor devices: %s", monitor_devices) + if len(monitor_devices)==0: + log.warn("could not detect any pulseaudio monitor devices - will use a test source") + src_type = "audiotestsrc" + default_src_options = {"wave":2, "freq":100, "volume":0.4} + else: + monitor_device = monitor_devices.items()[0][0] + log.info("using pulseaudio source device: %s", monitor_device) + src_type = "pulsesrc" + default_src_options = {"device" : monitor_device} + src_options = default_src_options + src_options.update(src_options) assert src_type in get_source_plugins(), "invalid source plugin '%s'" % src_type + matching = [x for x in CODEC_ORDER if (x in codecs and x in CODECS)] + log("SoundSource(..) found matching codecs %s", matching) + assert len(matching)>0, "no matching codecs between arguments %s and supported list %s" % (codecs, CODECS) + codec = matching[0] encoder, fmt = get_encoder_formatter(codec) SoundPipeline.__init__(self, codec) self.src_type = src_type source_str = plugin_str(src_type, src_options) - encoder_str = plugin_str(encoder, encoder_options) + encoder_str = plugin_str(encoder, codec_options) pipeline_els = [source_str] if AUDIOCONVERT: pipeline_els += ["audioconvert"] @@ -55,9 +76,9 @@ def __init__(self, src_type, src_options={}, codec=MP3, volume=1.0, encoder_opti self.volume = self.pipeline.get_by_name("volume") self.sink = self.pipeline.get_by_name("sink") self.sink.set_property("emit-signals", True) - self.sink.set_property("max-buffers", 10) + self.sink.set_property("max-buffers", 10) #0? self.sink.set_property("drop", False) - self.sink.set_property("sync", True) + self.sink.set_property("sync", True) #False? self.sink.set_property("qos", False) try: #Gst 1.0: @@ -71,15 +92,6 @@ def __init__(self, src_type, src_options={}, codec=MP3, volume=1.0, encoder_opti def __repr__(self): return "SoundSource('%s' - %s)" % (self.pipeline_str, self.state) - def set_volume(self, volume=1.0): - if self.sink and self.volume: - self.volume.set_property("volume", volume) - - def get_volume(self): - if self.sink and self.volume: - return self.volume.get_property("volume") - return 0 - def cleanup(self): SoundPipeline.cleanup(self) self.src_type = "" @@ -102,7 +114,8 @@ def emit_buffer1(self, sample): size = buf.get_size() data = buf.extract_dup(0, size) self.do_emit_buffer(data, {"timestamp" : normv(buf.pts), - "duration" : normv(buf.duration)}) + "duration" : normv(buf.duration), + }) def on_new_preroll0(self, appsink): @@ -125,82 +138,107 @@ def emit_buffer0(self, buf, metadata={}): # "duration" : buf.duration, # "offset" : buf.offset, # "offset_end": buf.offset_end} - self.do_emit_buffer(buf.data, {"timestamp" : normv(buf.timestamp), - "duration" : normv(buf.duration)}) + self.do_emit_buffer(buf.data, { + "caps" : buf.get_caps().to_string(), + "timestamp" : normv(buf.timestamp), + "duration" : normv(buf.duration) + }) def do_emit_buffer(self, data, metadata={}): self.buffer_count += 1 self.byte_count += len(data) - self.emit("new-buffer", data, metadata) + metadata["time"] = int(time.time()*1000) + self.idle_emit("new-buffer", data, metadata) + gobject.type_register(SoundSource) def main(): from xpra.platform import init, clean - init("Sound-Play") + init("Xpra-Sound-Source") try: import os.path + if "-v" in sys.argv: + log.enable_debug() + sys.argv.remove("-v") + if len(sys.argv) not in (2, 3): - print("usage: %s filename [codec]" % sys.argv[0]) + log.error("usage: %s filename [codec] [--encoder=rencode]", sys.argv[0]) return 1 filename = sys.argv[1] - if os.path.exists(filename): - print("file %s already exists" % filename) - return 2 + if filename=="-": + from xpra.os_util import disable_stdout_buffering + disable_stdout_buffering() + elif os.path.exists(filename): + log.error("file %s already exists", filename) + return 1 codec = None + if len(sys.argv)==3: codec = sys.argv[2] if codec not in CODECS: - print("invalid codec: %s, codecs supported: %s" % (codec, CODECS)) - return 2 + log.error("invalid codec: %s, codecs supported: %s", codec, CODECS) + return 1 else: parts = filename.split(".") if len(parts)>1: extension = parts[-1] if extension.lower() in CODECS: codec = extension.lower() - print("guessed codec %s from file extension %s" % (codec, extension)) + log.info("guessed codec %s from file extension %s", codec, extension) if codec is None: codec = MP3 - print("using default codec: %s" % codec) + log.info("using default codec: %s", codec) + + #in case we're running against pulseaudio, + #try to setup the env: + try: + from xpra.platform.paths import get_icon_filename + f = get_icon_filename("xpra.png") + from xpra.sound.pulseaudio_util import add_audio_tagging_env + add_audio_tagging_env(f) + except Exception as e: + log.warn("failed to setup pulseaudio tagging: %s", e) - log.enable_debug() from threading import Lock - f = open(filename, "wb") - from xpra.sound.pulseaudio_util import get_pa_device_options - monitor_devices = get_pa_device_options(True, False) - log.info("found pulseaudio monitor devices: %s", monitor_devices) - if len(monitor_devices)==0: - log.warn("could not detect any pulseaudio monitor devices - will use a test source") - ss = SoundSource("audiotestsrc", src_options={"wave":2, "freq":100, "volume":0.4}, codec=codec) + if filename=="-": + f = sys.stdout else: - monitor_device = monitor_devices.items()[0][0] - log.info("using pulseaudio source device: %s", monitor_device) - ss = SoundSource("pulsesrc", {"device" : monitor_device}, codec) + f = open(filename, "wb") + ss = SoundSource(codecs=[codec]) lock = Lock() def new_buffer(ss, data, metadata): - log.info("new buffer: %s bytes, metadata=%s" % (len(data), metadata)) + log.info("new buffer: %s bytes (%s), metadata=%s", len(data), type(data), metadata) with lock: if f: f.write(data) - ss.connect("new-buffer", new_buffer) - ss.start() + f.flush() gobject_mainloop = gobject.MainLoop() gobject.threads_init() + ss.connect("new-buffer", new_buffer) + ss.start() + import signal - def deadly_signal(*args): + def deadly_signal(sig, frame): + log.warn("got deadly signal %s", SIGNAMES.get(sig, sig)) + gobject.idle_add(ss.stop) gobject.idle_add(gobject_mainloop.quit) signal.signal(signal.SIGINT, deadly_signal) signal.signal(signal.SIGTERM, deadly_signal) - gobject_mainloop.run() + try: + gobject_mainloop.run() + except Exception as e: + log.error("main loop error: %s", e) + ss.stop() f.flush() - log.info("wrote %s bytes to %s", f.tell(), filename) + if f!=sys.stdout: + log.info("wrote %s bytes to %s", f.tell(), filename) with lock: f.close() f = None diff --git a/src/xpra/sound/wrapper.py b/src/xpra/sound/wrapper.py new file mode 100644 index 0000000000..ee2a4da878 --- /dev/null +++ b/src/xpra/sound/wrapper.py @@ -0,0 +1,396 @@ +# This file is part of Xpra. +# Copyright (C) 2015 Antoine Martin +# Xpra is released under the terms of the GNU GPL v2, or, at your option, any +# later version. See the file COPYING for details. + +import os +import sys +import signal +import time +import subprocess + +import gobject +gobject.threads_init() + +from xpra.platform.paths import get_sound_executable +from xpra.os_util import Queue, setbinarymode +from xpra.util import AdHocStruct +from xpra.log import Logger +log = Logger("sound") +DEBUG_SOUND = os.environ.get("XPRA_SOUND_DEBUG", "0")=="1" +SUBPROCESS_DEBUG = os.environ.get("XPRA_SOUND_SUBPROCESS_DEBUG", "").split(",") +EXPORT_INFO_TIME = int(os.environ.get("XPRA_SOUND_INFO_TIME", "1000")) + + +#this wrapper takes care of launching src.py or sink.py +#wrapped so that we can interact with them using a standard xpra protocol layer +#it is generic enough to be used with other processes +# +#the command line should look something like: +# xpra MODE IN OUT PLUGIN PLUGIN_OPTIONS CODECS CODEC_OPTIONS VOLUME +# * MODE can be _sound_record or _sound_play +# * IN is where we read the encoded commands from, specify "-" for stdin +# * OUT is where we write the encoded output stream, specify "-" for stdout +# * PLUGIN is the sound source (for recording) or sink (for playing) to use, can be omitted (will be auto detected) +# ie: pulsesrc, autoaudiosink +# * PLUGIN_OPTIONS is a string containing options specific to this plugin +# ie: device=somedevice,otherparam=somevalue +# * CODECS: the list of codecs that we are willing to support +# ie: mp3,flac +# * CODECS_OPTIONS: a string containing options to apply to the codec +# ie: blocksize=1024,otherparam=othervalue +# * VOLUME: optional, a number from 0.0 to 1.0 +# ie: 1.0 +# FIXME: CODEC_OPTIONS should allow us to specify different options for each CODEC +# The output will be a regular xpra packet, containing serialized signals that we receive +# The input can be a regular xpra packet, those are converted into method calls + +#to make it possible to inspect files (more human readable): +HEXLIFY_PACKETS = os.environ.get("XPRA_HEXLIFY_PACKETS", "0")=="1" +#use a packet encoder on the data: +ENCODE_PACKETS = os.environ.get("XPRA_ENCODE_PACKETS", "1")=="1" + + +#by default we just print the exported signals: +def printit(*args): + log.info("export %s", [str(x)[:128] for x in args]) + +export_callback = printit + +def export(*args): + global export_callback + signame = args[-1] + data = args[1:-1] + export_callback(*([signame] + list(data))) + + +def run_sound(mode, error_cb, options, args): + assert len(args)>=6, "not enough arguments" + mainloop = gobject.MainLoop() + + #common to both sink and src: + signal_handlers = { + "state-changed" : export, + "bitrate-changed" : export, + "error" : export, + "new-stream" : export, + } + #these definitions should probably be introspected somehow: + #(to make it more generic / abstracted) + functions = ["set_volume", "stop"] + if mode=="_sound_record": + from xpra.sound.src import SoundSource + gst_wrapper = SoundSource + signal_handlers["new-buffer"] = export + elif mode=="_sound_play": + from xpra.sound.sink import SoundSink + gst_wrapper = SoundSink + def eos(*args): + gobject.idle_add(mainloop.quit) + signal_handlers["eos"] = eos + signal_handlers["underrun"] = export + signal_handlers["overrun"] = export + functions += ["add_data"] + else: + raise Exception("unknown mode: %s" % mode) + + #the plugin to use (ie: 'pulsesrc' for src.py or 'autoaudiosink' for sink.py) + plugin = args[2] + #plugin options (ie: "device=monitor_device,something=value") + from xpra.sound.gstreamer_util import parse_element_options + options = parse_element_options(args[3]) + #codecs: + codecs = [x.strip() for x in args[4].split(",")] + #codec options: + codec_options = parse_element_options(args[5]) + #volume (optional): + try: + volume = int(args[6]) + except: + volume = 1.0 + + #figure out where we read from and write to: + input_filename = args[0] + if input_filename=="-": + #disable stdin buffering: + _input = os.fdopen(sys.stdin.fileno(), 'r', 0) + setbinarymode(_input.fileno()) + else: + _input = open(input_filename, 'rb') + output_filename = args[1] + if output_filename=="-": + #disable stdout buffering: + _output = os.fdopen(sys.stdout.fileno(), 'w', 0) + setbinarymode(_output.fileno()) + else: + _output = open(output_filename, 'wb') + + try: + pipeline = gst_wrapper(plugin, options, codecs, codec_options, volume) + + def stop(): + pipeline.cleanup() + mainloop.quit() + + def handle_signal(*args): + gobject.idle_add(stop) + + if ENCODE_PACKETS: + from xpra.net.bytestreams import TwoFileConnection + conn = TwoFileConnection(_output, _input, abort_test=None, target=mode, info=mode, close_cb=stop) + conn.timeout = 0 + from xpra.net.protocol import Protocol + def process_packet(proto, packet): + #log("process_packet(%s, %s)", proto, str(packet)[:128]) + command = packet[0] + if command==Protocol.CONNECTION_LOST: + log("connection-lost: %s, terminating", packet[1:]) + stop() + return + method = getattr(pipeline, command, None) + if not method: + log.warn("unknown command: %s", command) + return + if DEBUG_SOUND: + log("calling %s.%s%s", pipeline, command, str(tuple(packet[1:]))[:128]) + gobject.idle_add(method, *packet[1:]) + + queue = Queue() + def get_packet_cb(): + try: + item = queue.get(False) + except: + item = None + return (item, None, None, queue.qsize()>0) + protocol = Protocol(gobject, conn, process_packet, get_packet_cb=get_packet_cb) + protocol.large_packets = ["new-buffer"] + try: + protocol.enable_encoder("rencode") + except Exception as e: + log.warn("failed to enable rencode: %s", e) + protocol.enable_compressor("none") + protocol.start() + global export_callback + def send_via_protocol(*args): + if HEXLIFY_PACKETS: + import binascii + args = args[:1]+[binascii.hexlify(str(x)[:32]) for x in args[1:]] + log("send_via_protocol: adding '%s' message (%s items already in queue)", args[0], queue.qsize()) + queue.put(args) + protocol.source_has_more() + export_callback = send_via_protocol + #export signal before shutting down: + from xpra.os_util import SIGNAMES + def handle_signal(sig, frame): + signame = SIGNAMES.get(sig, sig) + log("handle_signal(%s, %s)", signame, frame) + send_via_protocol("signal", signame) + #give time for the network layer to send the signal + time.sleep(0.1) + stop() + + signal.signal(signal.SIGINT, handle_signal) + signal.signal(signal.SIGTERM, handle_signal) + + for x,handler in signal_handlers.items(): + log("registering signal %s", x) + pipeline.connect(x, handler, x) + + if EXPORT_INFO_TIME>0: + def export_info(): + send_via_protocol("info", pipeline.get_info()) + gobject.timeout_add(EXPORT_INFO_TIME, export_info) + + gobject.idle_add(pipeline.start) + mainloop.run() + return 0 + except Exception as e: + log.error("run_sound%s error", (mode, error_cb, options, args), exc_info=True) + return 1 + finally: + if _input!=sys.stdin: + try: + _input.close() + except: + pass + if _output!=sys.stdout: + try: + _output.close() + except: + pass + + +class sound_subprocess_wrapper(object): + + def __init__(self): + self.state = "stopped" + self.codec = "unknown" + self.codec_description = "" + self.process = None + self.protocol = None + self.command = None + self.send_queue = Queue() + self.signal_callbacks = {} + self.last_info = {} + #hook some default packet handlers: + from xpra.net.protocol import Protocol + self.connect("state-changed", self.state_changed) + self.connect("info", self.info_update) + self.connect(Protocol.CONNECTION_LOST, self.connection_lost) + + + def state_changed(self, sink, new_state): + self.state = new_state + + def get_state(self): + return self.state + + def get_info(self): + return self.last_info + + def info_update(self, sink, info): + self.last_info = info + self.last_info["time"] = int(time.time()) + self.codec_description = info.get("codec_description") + + def set_volume(self, v): + self.send("set_volume", int(v*100)) + + def get_volume(self): + return self.last_info.get("volume", 100)/100.0 + + + + def start(self): + log("starting sound source using %s", self.command) + kwargs = {} + if os.name=="posix": + kwargs["close_fds"] = True + self.process = subprocess.Popen(self.command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=sys.stderr.fileno(), **kwargs) + #make a connection using the process stdin / stdout + from xpra.net.bytestreams import TwoFileConnection + def sound_process_exit(): + log("sound_process_exit()") + conn = TwoFileConnection(self.process.stdin, self.process.stdout, abort_test=None, target="sound", info="sound", close_cb=sound_process_exit) + conn.timeout = 0 + from xpra.net.protocol import Protocol + self.protocol = Protocol(gobject, conn, self.process_packet, get_packet_cb=self.get_packet) + self.protocol.large_packets = ["new-buffer", "add_data"] + self.protocol.enable_encoder("rencode") + self.protocol.enable_compressor("none") + self.protocol.start() + + + def cleanup(self): + #TODO: rename in callers? + self.stop() + + def stop(self): + log("%s.stop()", self) + if self.process: + try: + self.process.terminate() + self.protocol.close() + except Exception as e: + log.warn("failed to stop sound process %s: %s", self.process, e) + + def connection_lost(self, *args): + log("connection_lost%s", args) + self.stop() + + def get_packet(self): + try: + item = self.send_queue.get(False) + except: + item = None + return (item, None, None, self.send_queue.qsize()>0) + + def send(self, *packet_data): + assert self.protocol + self.send_queue.put(packet_data) + self.protocol.source_has_more() + + def process_packet(self, proto, packet): + if DEBUG_SOUND: + log("process_packet(%s, %s)", proto, [str(x)[:32] for x in packet]) + command = packet[0] + callbacks = self.signal_callbacks.get(command) + log("process_packet callbacks(%s)=%s", command, callbacks) + if callbacks: + for cb, args in callbacks: + try: + all_args = list(packet[1:]) + args + cb(self, *all_args) + except Exception as e: + log.error("error processing callback %s for %s packet: %s", cb, command, e, exc_info=True) + + def connect(self, signal, cb, *args): + self.signal_callbacks.setdefault(signal, []).append((cb, list(args))) + + def _add_debug_args(self): + from xpra.log import debug_enabled_categories + debug = SUBPROCESS_DEBUG[:] + if (DEBUG_SOUND or "sound" in debug_enabled_categories) and ("sound" not in debug): + debug.append("sound") + if debug: + #forward debug flags: + self.command += ["-d", ",".join(debug)] + +class source_subprocess_wrapper(sound_subprocess_wrapper): + + def __init__(self, plugin, options, codecs, volume, element_options): + sound_subprocess_wrapper.__init__(self) + self.command = [get_sound_executable(), "_sound_record", "-", "-", plugin or "", "", ",".join(codecs), "", str(volume)] + self._add_debug_args() + + def __repr__(self): + return "source_subprocess_wrapper(%s)" % self.process + + +class sink_subprocess_wrapper(sound_subprocess_wrapper): + + def __init__(self, plugin, options, codec, volume, element_options): + sound_subprocess_wrapper.__init__(self) + self.codec = codec + self.command = [get_sound_executable(), "_sound_play", "-", "-", plugin or "", "", codec, "", str(volume)] + self._add_debug_args() + + def add_data(self, data, metadata): + if DEBUG_SOUND: + log("add_data(%s bytes, %s) forwarding to %s", len(data), metadata, self.protocol) + self.send("add_data", data, dict(metadata)) + + def __repr__(self): + return "sink_subprocess_wrapper(%s)" % self.process + + +def start_sending_sound(sound_source_plugin, codec, volume, remote_decoders, remote_pulseaudio_server, remote_pulseaudio_id): + log("start_sending_sound%s", (sound_source_plugin, codec, volume, remote_decoders, remote_pulseaudio_server, remote_pulseaudio_id)) + from xpra.sound.gstreamer_util import has_gst, parse_sound_source + assert has_gst + try: + #info about the remote end: + remote = AdHocStruct() + remote.pulseaudio_server = remote_pulseaudio_server + remote.pulseaudio_id = remote_pulseaudio_id + remote.remote_decoders = remote_decoders + plugin, options = parse_sound_source(sound_source_plugin, remote) + if not plugin: + log.error("failed to setup '%s' sound stream source", (sound_source_plugin or "auto")) + return None + log("parsed '%s':", sound_source_plugin) + log("plugin=%s", plugin) + log("options=%s", options) + return source_subprocess_wrapper(plugin, options, remote_decoders, volume, {}) + except Exception as e: + log.error("error setting up sound: %s", e, exc_info=True) + return None + + +def start_receiving_sound(codec): + log("start_receiving_sound(%s)", codec) + try: + return sink_subprocess_wrapper(None, {}, codec, {}, 1.0) + except: + log.error("failed to start sound sink", exc_info=True) + return None