-
Notifications
You must be signed in to change notification settings - Fork 34
/
ssh-r-sync
executable file
·387 lines (337 loc) · 16.5 KB
/
ssh-r-sync
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
#!/usr/bin/env python3
import itertools as it, operator as op, functools as ft
import os, sys, contextlib, collections, subprocess, socket, pwd
import hashlib, math, json, base64, signal, time, re, tempfile, pathlib as pl
p_fmt = lambda fmt,*a,**k: (
[fmt.format(*a,**k) if a or k else fmt] if isinstance(fmt, str) else [[fmt, *a], k] )
p = lambda fmt,*a,**k: print(*p_fmt(fmt,*a,**k), file=sys.stderr, flush=True)
class SSHRsyncConfig:
# recv_port_* settings should be same on server/client
recv_port_bind = '127.0.0.1'
recv_port_range = 22000, 22999
recv_port_retries = 4
recv_port_hash_key = b'J\xbc\xed\xa3\xa5\x02E(\xde\xdc#h\xa4\xa5\xa4t'
hs_hello = b'ssh-r-sync o/ 1'
hs_timeout = 20
hs_key_size = 64
rsync_bind = '127.0.0.1'
rsync_ready_wait = 10, 3.0 # attempts, timeout
ssh_opts = '''
-oControlPath=none -oControlMaster=no
-oConnectTimeout=180 -oServerAliveInterval=6 -oServerAliveCountMax=10
-oBatchMode=yes -oPasswordAuthentication=no -oNumberOfPasswordPrompts=0
-oExitOnForwardFailure=yes -T'''.split()
class SSHRsyncError(Exception):
def __init__(self, *args):
super().__init__(str(p_fmt(*args)[0]))
def str_part(s, sep, default=None):
'Examples: str_part("user@host", "<@", "root"), str_part("host:port", ":>")'
c = sep.strip('<>')
if sep.strip(c) == '<': return (default, s) if c not in s else s.split(c, 1)
else: return (s, default) if c not in s else s.rsplit(c, 1)
b64_str = lambda v: base64.urlsafe_b64encode(v).decode()
b64_bytes = lambda v: base64.urlsafe_b64decode(v.encode())
def hash_to_int(name, retry, n_max, **blake2_kws):
'Returns integer hash within n_max range from name/retry values.'
assert n_max > 0, n_max
n_bits = math.ceil(math.log(n_max + 1, 2))
n_bytes = math.ceil(n_bits / 8)
mac = retry.to_bytes(1, 'big') + name.encode()
for n in range(1000):
mac = hashlib.blake2b(mac, **blake2_kws).digest()
for o in range(0, 8//n_bytes):
n = int.from_bytes(mac[o*n_bytes:(o+1)*n_bytes], 'big')
if n_bits % 8: n >>= 8 - n_bits % 8
if n <= n_max: return n # (n % n_max) would add bias for small values
raise ValueError('Failed to get int within range after many hashes')
@contextlib.contextmanager
def err_timeout(timeout, err_t, *err_args):
def timeout_handler(sig, frm): raise err_t(*err_args)
handler_prev = signal.signal(signal.SIGALRM, timeout_handler)
delay, interval = signal.setitimer(signal.ITIMER_REAL, timeout)
assert not delay or interval
try: yield
finally:
signal.setitimer(signal.ITIMER_REAL, 0)
if handler_prev: signal.signal(signal.SIGALRM, handler_prev)
def proc_close(proc, wait=1, wait_base=0.3):
if not proc or proc.poll() is not None: return
close_ops = [proc.terminate, proc.kill]
if proc.stdin: close_ops = [proc.stdin.close] + close_ops
if isinstance(wait, (int, float)) and wait > 0:
wait = list(n*wait for n, func in enumerate(close_ops, 1))
wait = dict(enumerate(wait or list()))
for n, func in enumerate(close_ops):
with contextlib.suppress(OSError): func()
with contextlib.suppress(subprocess.TimeoutExpired):
proc.wait(wait.get(n, wait_base))
break
def temp_file(prefix, contents=None):
tmp = tempfile.NamedTemporaryFile('w', prefix=prefix)
try:
if contents:
tmp.write(contents)
tmp.flush()
except:
tmp.close()
raise
return tmp
def retries_within_timeout( tries, timeout,
backoff_func=lambda e,n: ((e**n-1)/e), slack=1e-2 ):
if timeout == 0: return list() # single attempt, regardless of tries
a, b = 0, timeout
while True:
m = (a + b) / 2
delays = list(backoff_func(m, n) for n in range(tries))
error = sum(delays) - timeout
if abs(error) < slack: return delays
elif error > 0: b = m
else: a = m
SSHPipe = collections.namedtuple('SSHPipe', 'send recv')
RsyncInfo = collections.namedtuple('RsyncInfo', 'user key mod su opts')
def create_ssh_tunnel(ctx, conf, dst, port, name, rsync_info):
hs_timeout_ctx = ft.partial( err_timeout,
conf.hs_timeout, SSHRsyncError, 'Timeout waiting for receiver' )
sock = ctx.enter_context(
socket.socket(socket.AF_INET, socket.SOCK_STREAM) )
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((conf.rsync_bind, 0)) # to get os-picked open port to tunnel to
addr, rsync_port = sock.getsockname()
sock.listen(1)
for attempt in range(conf.recv_port_retries+1):
tun_port = conf.recv_port_range[0] + hash_to_int( name, attempt,
conf.recv_port_range[1] - conf.recv_port_range[0], key=conf.recv_port_hash_key )
ssh = subprocess.Popen(
[ 'ssh', *(conf.ssh_opts or list()),
'-R', f'{conf.recv_port_bind}:{tun_port}:{conf.rsync_bind}:{rsync_port}',
*([f'-p{port}'] if port else []), dst ],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=sys.stderr )
ssh_recv = open(ssh.stdout.fileno(), 'rb', 0)
ssh_send = open(ssh.stdin.fileno(), 'wb', 0)
with hs_timeout_ctx(): hello = ssh_recv.readline().rstrip(b'\n')
if hello:
ctx.callback(proc_close, ssh)
ssh = None
break
else: raise SSHRsyncError('SSH connection failed')
if hello != conf.hs_hello:
raise SSHRsyncError( 'Hello-string mismatch:'
' local={!r} remote={!r}', conf.hs_hello, hello )
key = os.urandom(conf.hs_key_size)
info = dict( name=name, port=tun_port,
key=b64_str(key), rsync=rsync_info._asdict() )
with hs_timeout_ctx():
ssh_send.write(conf.hs_hello + b'\n')
ssh_send.write(json.dumps(info).encode() + b'\n')
key_recv = hashlib.blake2b(key, person=b'server', key=conf.hs_hello).digest()
key_push = hashlib.blake2b(key, person=b'client', key=conf.hs_hello).digest()
assert len(key_recv) == len(key_push) == conf.hs_key_size, [len(key_recv), len(key_push)]
sock.settimeout(conf.hs_timeout)
sock_conn, sock_addr = sock.accept()
ctx.callback(sock_conn.close)
sock_conn.settimeout(conf.hs_timeout)
sock_conn.send(key_push)
key = sock_conn.recv(conf.hs_key_size)
if len(key) != conf.hs_key_size or key != key_recv:
raise SSHRsyncError('ssh/socket mismatch (len={}/{})', len(key), conf.hs_key_size)
sock_conn.close()
sock.close()
return rsync_port, SSHPipe(ssh_send, ssh_recv)
def create_rsync_conf( ctx, path, rsync_info,
filter_file=None, filter_dir_file=None, debug=False ):
path = pl.Path(path).resolve()
if filter_file or filter_dir_file:
rsync_filter = ctx.enter_context(
tempfile.NamedTemporaryFile('w', prefix='.ssh-r-sync.filter.') )
if filter_dir_file: rsync_filter.write(f'dir-merge {filter_dir_file}\n')
if filter_file: rsync_filter.write(pl.Path(filter_file).read_text())
rsync_filter.flush()
else: rsync_filter = None
rsync_secrets = ctx.enter_context(
temp_file('.ssh-r-sync.passwd.', f'{rsync_info.user}:{rsync_info.key}\n') )
rsync_lock = ctx.enter_context(temp_file('.ssh-r-sync.lock.'))
rsync_conf = [
'hosts allow = localhost', 'read only = true', 'numeric ids = true',
'max connections = 1', f'lock file = {rsync_lock.name}', 'timeout = 600',
f'secrets file = {rsync_secrets.name}', f'auth users = {rsync_info.user}' ]
if rsync_info.su: rsync_conf.extend(['uid = root', 'gid = *'])
else: rsync_conf.append('use chroot = no')
if debug: rsync_conf.append('log file = /dev/stderr')
rsync_conf.extend([f'[{rsync_info.mod}]', f' path = {path}'])
if rsync_filter: rsync_conf.append(f' filter = merge {rsync_filter.name}')
rsync_conf = ctx.enter_context(
temp_file('.ssh-r-sync.conf.', '\n'.join(rsync_conf + [''])) )
return rsync_conf.name
def wait_for_socket(addr, tries, timeout):
delay, wait_iter = None, iter(retries_within_timeout(tries, timeout))
while True:
try: delay = next(wait_iter)
except StopIteration: delay = None
ts = time.monotonic() + (delay or 0)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock_conn:
sock_conn.settimeout(delay or 5e-2)
try: sock_conn.connect(addr)
except OSError as err: pass # refused, timeout, etc
else: break
if delay is None:
raise SSHRsyncError('Timeout waiting for socket: {!r}', addr)
time.sleep(max(0, ts - time.monotonic()))
def main(args=None):
conf = SSHRsyncConfig()
import argparse, textwrap
dedent = lambda text: (textwrap.dedent(text).strip('\n') + '\n').replace('\t', ' ')
class SmartHelpFormatter(argparse.HelpFormatter):
def __init__(self, *args, **kws):
return super().__init__(*args, **kws, width=100)
def _fill_text(self, text, width, indent):
if '\n' not in text: return super()._fill_text(text, width, indent)
return ''.join( indent + line
for line in text.replace('\t', ' ').splitlines(keepends=True) )
def _split_lines(self, text, width):
return super()._split_lines(text, width)\
if '\n' not in text else dedent(text).splitlines()
parser = argparse.ArgumentParser(
formatter_class=SmartHelpFormatter,
usage='%(prog)s [options] [user@]host[:port] path',
description=dedent('''
"ssh -R rsync" script to backup stuff via ssh/rsync.
It works by ssh'ing into remote backup server, requesting it to make a backup,
which then happens over same ssh connection, using reverse-tunnel opened there.
ssh console channel is used to send backup parameters.
This avoids following problematic things:
- Pushing stuff to remote, which can be exploited to delete things.
- Using insecure network channels and/or rsync auth - ssh only.
- Having any kind of insecure auth or port open on backup server - ssh only.
- Requiring backed-up machine to be accessible on the net for backup-pulls.
On the backup-server side, only one
locked-down single-command ssh account is necessary.
To get rsync with full fs access, use -r/--rsync option and binary with
cap_dac_read_search (read) / cap_dac_override (write) posix capabilities.
Idea is to for backup process to be as simple as ssh'ing into remote host.'''))
group = parser.add_argument_group('Basic required arguments')
group.add_argument('host', metavar='[user@]host[:port]',
help='''
Destination [user@]host[:port] spec for ssh command.
If "user" part is omitted, current local user name will be added.''')
group.add_argument('path', help='Local path to backup.')
group = parser.add_argument_group('Rsync/backup options')
group.add_argument('-r', '--rsync', metavar='path',
help='''
Path to custom rsync binary, e.g. one with all necessary posix caps enabled.
If specified, elevated privileges for rsync are assumed, and parameters like -HaAXS are used.
Passing -u/--rsync-user option allows to disable this
behavior and only copy stuff that regular users can access.''')
group.add_argument('-u', '--rsync-user', action='store_true',
help='Do not assume elevated privileges when -r/--rsync path is specified.')
group.add_argument('-n', '--name', metavar='name',
help='Local machine name, which is used for backup'
' cleanup and deduplication. Picked as uname.nodename, if not specified.')
group.add_argument('--name-rpi', action='store_true',
help='Use "Serial" field from /proc/cpuinfo,'
' provided there by RPi kernel, as board-specific name.')
group.add_argument('--name-file', metavar='path',
help='Read specified file contents as the name, stripping spaces, slashes and such.'
' E.g. /sys/class/dmi/id/board_serial can be used on x86 platforms, if accessible.')
group.add_argument('-f', '--filter-file', metavar='path',
help='Local file to use as rsync daemon filter rules,'
' to be passed as "filter = merge <file>" in its config.')
group.add_argument('-d', '--filter-dir-file', metavar='name',
help='Simplified form of -f/--filter-file,'
' adding "dir-merge <name>" to rsync filter rule.'
' Same as using one-liner --filter-file with that directive.')
group.add_argument('-x', '--one-file-system', action='store_true',
help='Instruct receiver rsync-pull to use -x/--one-file-system option.')
group.add_argument('-X', '--fake-super', action='store_true',
help='Instruct receiver rsync-pull to use --fake-super option.')
group.add_argument('-t', '--handshake-timeout',
type=float, metavar='seconds', default=conf.hs_timeout,
help='Timeout waiting for handshake responses from remote side (default: %(default)ss).')
group = parser.add_argument_group('SSH options')
group.add_argument('-p', '--port', metavar='port',
help='Alternative way to specify port, similar to ssh binary.')
group.add_argument('-k', '--ssh-key', metavar='path', help='Path to ssh key file to use.')
group.add_argument('--no-ssh-opts', action='store_true',
help='''
Do not pass any advanced -o<stuff> ssh options which script does by default.
ssh will still read stuff from ~/.ssh/config, so any new ones can be specified there.
Default options are:\n{}'''.format('\n'.join(textwrap.wrap(
' '.join(conf.ssh_opts), width=90, initial_indent='\t'*4, subsequent_indent='\t'*4 ))))
group = parser.add_argument_group('Misc options')
group.add_argument('--nice', metavar='[prio:][io-class[.io-level]]',
help='''
Set "nice" and/or "ionice" (CFQ I/O) priorities, inherited by hooks and rsync.
"nice" prio value, if specified, must be
in -20-20 range, where lower = higher prio, and base=0.
"ionice" value should be in class[:level] format, where
"class" is one of [rt, be, idle] and "level" in 0-7 range (0=highest prio).
See setpriority(2) / ioprio_set(2) for more info.''')
group.add_argument('--debug', action='store_true', help='Verbose operation mode.')
opts = parser.parse_args(sys.argv[1:] if args is None else args)
def print_error_msg_only(err_t, err, err_tb):
if isinstance(err, SSHRsyncError):
print(f'ERROR: {err}', file=sys.stderr)
else: sys.__excepthook__(err_t, err, err_tb)
sys.excepthook = print_error_msg_only
if opts.handshake_timeout: conf.hs_timeout = opts.handshake_timeout
for sig in 'int hup term'.upper().split():
signal.signal(getattr(signal, f'SIG{sig}'), lambda sig,frm: sys.exit(1))
if opts.nice is not None:
nice, ionice = (opts.nice + ':').split(':', 1)
if not ionice and not nice.isdigit(): nice, ionice = None, nice
if nice: os.setpriority(os.PRIO_PROCESS, os.getpid(), int(nice))
if ionice:
# grep -r ioprio_set /usr/share/gdb/syscalls/
syscall_id = os.uname().machine
try: syscall_id = dict(x86_64=251, armv7l=314, aarch64=30)[syscall_id]
except KeyError: parser.error(f'--nice ionice is not supported on arch: {syscall_id}')
ionice = ionice.rstrip(':').split('.', 1)
if len(ionice) == 1: ionice.append(0)
ionice[0] = dict(rt=1, be=2, idle=3)[ionice[0].lower()]
if ionice[0] == 3: ionice[1] = 0
elif 0 <= ionice[1] <= 7: parser.error('--nice ionice prio level must be in 0-7 range')
ionice = (ionice[0] << 13) | ionice[1]
import ctypes as ct
err = ct.CDLL('libc.so.6', use_errno=True).syscall(syscall_id, 1, os.getpid(), ionice)
if err != 0:
err = ct.get_errno()
raise OSError(err, 'ionice_set failed - ' + os.strerror(err))
ssh_dst, ssh_port = str_part(opts.host, ':>')
if opts.port:
if ssh_port and ssh_port != opts.port:
parser.error('Two different ssh ports specified via -p/--port option and in destination')
ssh_port = opts.port
if opts.name_rpi:
with open('/proc/cpuinfo') as src:
for line in src:
m = re.search(r'^\s*Serial\s*:\s*(\S+)\s*$', line)
if m: break
else: parser.error('Failed to find "Serial : ..." line in /proc/cpuinfo (non-RPi kernel?)')
name = m.group(1)
elif opts.name_file:
name = pl.Path(opts.name_file).read_text().strip()
for sub_re, sub in {
r'[\\/]': '_', r'^\.+': '_', r'[\x00-\x1f]': '_', r':': '-_',
r'<': '(', r'>': ')', r'\*': '+', r'[|!"]': '-', r'[\?\*]': '_',
'[\'’]': '', r'\.+$': '_', r'\s+$': '', r'\s': '_' }.items():
name = re.sub(sub_re, sub, name)
else: name = opts.name or os.uname().nodename
if opts.no_ssh_opts: conf.ssh_opts.clear()
if opts.ssh_key: conf.ssh_opts.extend(['-i', opts.ssh_key])
if opts.debug: p(f'----- sync name={name} dst={ssh_dst}:{ssh_port or 22}')
with contextlib.ExitStack() as ctx:
rsync_info = RsyncInfo(
key=b64_str(os.urandom(6)), user='sshrsync', mod='sshrsync',
su=(os.getuid() == 0 or opts.rsync) and not opts.rsync_user,
opts=dict(one_file_system=opts.one_file_system, fake_super=opts.fake_super) )
rsync_conf_path = create_rsync_conf( ctx, opts.path, rsync_info,
filter_file=opts.filter_file, filter_dir_file=opts.filter_dir_file, debug=opts.debug )
rsync_port, ssh = create_ssh_tunnel(ctx, conf, ssh_dst, ssh_port, name, rsync_info)
rsync = pl.Path(opts.rsync).expanduser().resolve() if opts.rsync else 'rsync'
rsync = subprocess.Popen([ rsync, '--daemon', '--no-detach',
f'--address={conf.rsync_bind}', f'--port={rsync_port}', f'--config={rsync_conf_path}' ])
ctx.callback(proc_close, rsync)
wait_for_socket((conf.rsync_bind, rsync_port), *conf.rsync_ready_wait)
ssh.send.write(b'start\n')
ack = ssh.recv.readline().rstrip(b'\n')
if ack != b'done': raise SSHRsyncError('Invalid/missing backup completion ack: {!r}', ack)
if __name__ == '__main__': sys.exit(main())