forked from mk-fg/fgtk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pa_track_history
executable file
·191 lines (167 loc) · 6.74 KB
/
pa_track_history
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
from __future__ import print_function
import os, sys, string, re, time
track_max_len = 16384
def get_bus(srv_addr=None):
import dbus
if srv_addr is None:
srv_addr = os.environ.get('PULSE_DBUS_SERVER')
if not srv_addr\
and os.access('/run/pulse/dbus-socket', os.R_OK | os.W_OK):
# Well-known system-wide daemon socket
srv_addr = 'unix:path=/run/pulse/dbus-socket'
if not srv_addr:
srv_addr = dbus.SessionBus().get_object(
'org.PulseAudio1', '/org/pulseaudio/server_lookup1')\
.Get('org.PulseAudio.ServerLookup1',
'Address', dbus_interface='org.freedesktop.DBus.Properties')
return dbus.connection.Connection(srv_addr)
def dbus_bytes(dbus_arr, strip='\0' + string.whitespace):
return bytes(bytearray(dbus_arr).strip(strip))
def _track_for_pid(pid, srv_addr=None):
bus = get_bus(srv_addr=srv_addr)
streams = bus.get_object(object_path='/org/pulseaudio/core1')\
.Get( 'org.PulseAudio.Core1', 'PlaybackStreams',
dbus_interface='org.freedesktop.DBus.Properties' )
streams = list( bus.get_object(object_path=path)\
.Get('org.PulseAudio.Core1.Stream', 'PropertyList')
for path in streams )
if not streams: return None
for stream in streams:
stream_pid = int(dbus_bytes(stream['application.process.id']))
if stream_pid == pid: return dbus_bytes(stream['media.name'])
else: log.warn('Failed to find pa stream for pid: {}'.format(pid))
child_ctl = None
def track_for_pid(pid, srv_addr=None, dbus_reuse=False):
if dbus_reuse:
return _track_for_pid(pid, srv_addr=srv_addr)
# Forking here is to prevent dbus module from keeping state,
# which seem to prevent it from working on any pulseaudio hiccup
global child_ctl
if child_ctl: os.close(child_ctl)
(r1,w1), (r2,child_ctl) = os.pipe(), os.pipe()
child_pid, track = os.fork(), None
if child_pid: # parent
os.close(w1), os.close(r2)
try:
track = os.read(r1, track_max_len + 1)
os.close(r1)
os.write(child_ctl, 'x')
except (OSError, IOError): pass
try: os.waitpid(child_pid, 0)
except OSError: pass
else: # child
os.close(r1), os.close(child_ctl)
try:
track = _track_for_pid(pid, srv_addr=srv_addr)
os.write(w1, track or '\0')
assert os.read(r2, 1) == 'x'
except Exception as err:
log.warn('Failed to query pa via dbus: %s', err)
os._exit(0)
return track if track and track != '\0' else None
def proc_list():
import glob
cmds = list()
for p in glob.glob('/proc/*/cmdline'):
try: pid = int(p.split('/', 3)[2])
except ValueError: continue # e.g. "self"
try:
with open(p) as src: cmd = src.read()
except (OSError, IOError): continue
cmd = cmd.strip('\0')
if cmd: cmds.append((pid, cmd.split('\0')))
return cmds
def proc_match(regexp, procs=None):
if procs is None: procs = proc_list()
for pid, cmd in procs:
cmd = ' '.join(cmd)
if re.search(regexp, cmd): return pid
def track_dump_loop(opts):
track_last = None
if opts.dst_file:
dst = open(opts.dst_file, 'a+b')
dst.seek(max(0, os.fstat(dst.fileno()).st_size - (track_max_len + 2)))
last_line = dst.read()
if '\n' in last_line:
track_last = last_line.rstrip('\r\n').rsplit('\n', 1)[-1].strip()
else: dst = sys.stdout
pid, ts = None, time.time()
while True:
if not opts.prog:
if not pid: pid = proc_match(opts.pgrep) if not opts.pgrep.isdigit() else int(opts.pgrep)
else:
player_poll = player.poll() if pid else None
if not pid or player_poll == 0:
global err_hook, devnull
from subprocess import Popen
player = dict()
if opts.quiet:
if not devnull: devnull = open(os.devnull, 'wb')
player.update(stdout=devnull, stderr=devnull)
player = Popen(opts.prog, **player)
pid, err_hook = player.pid, lambda p=player: p.poll() is None and p.terminate()
elif pid and player_poll is not None:
log.error('Player app failed (exit code: %s), exiting', player_poll)
return player_poll
if pid is not None:
track = track_for_pid(pid, dbus_reuse=opts.once)
if track:
if opts.strip: track = re.sub(opts.strip, '', track)
if track and not (track_last and track_last.endswith(track)):
track_last = track
if opts.timestamp: prefix = time.strftime(opts.timestamp_format)
else: prefix = ''
dst.write('{}{}\n'.format(prefix, track))
dst.flush()
else:
log.error('Failed to get stream pid')
if opts.once: break
ts, ts_to = time.time(), ts + opts.poll_interval
while ts_to <= ts: ts_to += opts.poll_interval
time.sleep(ts_to - ts)
ts = ts_to
def main(args=None):
import argparse
parser = argparse.ArgumentParser(
description='Record whatever is playing in some pid via pulse to some file.'
' Takes "media.name" parameter from PulseAudio Stream and records any changes to it.')
parser.add_argument('prog', nargs='*', help='Playback app to run.'
' Use "--" to make sure its options wont get processed by this wrapper script.')
parser.add_argument('-d', '--dst-file',
help='Path to a file to record all the stuff to. If omitted, stdout will be used.')
parser.add_argument('-t', '--timestamp', action='store_true',
help='Prepend timestamps to each track entry in the output.')
parser.add_argument('--timestamp-format',
metavar='py_ts_format', default='[%Y-%m-%d %H:%M] ',
help='Format for timestamp-prefix to be prepended to each line (default: %(default)s).'
' Should be compatible with pythons strftime() functions.')
parser.add_argument('-1', '--once', action='store_true', help='Sample once and exit.')
parser.add_argument('-i', '--poll-interval',
type=float, metavar='seconds', default=80,
help='Interval between sampling track name (default: %(default)s).')
parser.add_argument('-p', '--pgrep',
metavar='{pid | cmdline_regexp}', default=r'^mpv\s+',
help='Grep for this regexp in processes'
' to find pid that is used as a player (default: %(default)s).'
' If integer is specified, it will be used as a pid without any extra matching.'
' Disregarded, if playback app is specified in args.'
' Args in matched cmdlines are separated by spaces. Pid only gets matched on script start.')
parser.add_argument('-s', '--strip',
metavar='regexp', default=r'^mpv\s+-\s+',
help='Regexp for bits to strip from produced title (default: %(default)s).'
' Can be set to empty string to not strip anything.')
parser.add_argument('-q', '--quiet',
action='store_true', help='Supress all output from started player pid.')
parser.add_argument('--debug', action='store_true', help='Verbose operation mode.')
opts = parser.parse_args(sys.argv[1:] if args is None else args)
global log, err_hook, devnull
import logging
logging.basicConfig(level=logging.DEBUG if opts.debug else logging.WARNING)
log = logging.getLogger()
err_hook = devnull = None
try: return track_dump_loop(opts)
finally:
if err_hook is not None: err_hook()
if __name__ == '__main__': sys.exit(main())