-
Notifications
You must be signed in to change notification settings - Fork 34
/
lsx
executable file
·278 lines (251 loc) · 11.2 KB
/
lsx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
#!/usr/bin/env python
import os, sys, stat, re, pathlib as pl, collections as cs
p_err = lambda *a,**kw: print('ERROR:', *a, **kw, file=sys.stderr, flush=True)
def list_adjacent(paths, specs, files_only=False):
p_dirs, stats, p_out = dict(), dict(), dict()
op_out_sort = None
for p in paths:
try: st = stats[p] = p.stat()
except OSError: p_err(f'path inaccessible [ {p} ]'); continue
ps = set(p.parent.iterdir())
if files_only:
if not stat.S_ISREG(st.st_mode): continue # arg path discarded
for pa in list(ps):
try: st = stats[pa] = pa.stat()
except OSError: ps.remove(pa)
if not stat.S_ISREG(st.st_mode): ps.remove(pa)
p_out[p], p_dirs[p] = {p}, tuple(ps)
for opt in (opt.lower() for opt in specs):
if n := re.findall(r'(\d+)', opt): n = max(map(int, n))
else: n = 10
ops = set(opt).difference('1234567890')
if ops.difference('bants'):
parser.error(f'Unrecognized letters in -a/--adjacent spec: {opt!r}')
if ('a' in ops) ^ ('b' in ops) == 0: na = nb = n
elif 'a' in ops: na, nb = n, 0
elif 'b' in ops: na, nb = 0, n
ops.difference_update('ab')
if not ops: ops.add('n')
elif not op_out_sort:
op_out_sort = sorted(ops, key=lambda k: opt.find(k))[0]
p_lists = cs.defaultdict(set) # {p: set(ps_sorted, ...), ...}
for p, ps in p_dirs.items():
if 'n' in ops:
p_lists[p].add(tuple(sorted(ps, key=lambda pa: pa.name)))
if ops.intersection('ts'):
for pa in ps:
if pa in stats: continue
try: stats[pa] = pa.stat()
except OSError: pass
for k, sk in ('t', 'st_mtime'), ('s', 'st_size'):
if k not in ops: continue
p_lists[p].add(tuple(sorted( ps,
key=lambda pa: getattr(stats[pa], sk) )))
for p, pss in p_lists.items():
for ps in pss:
try: n = ps.index(p)
except ValueError: p_err(f'path vanished [ {p} ]'); continue
p_out[p].update(ps[max(0, n-nb):n])
p_out[p].update(ps[n+1:n+na+1])
ps_print, sort_func = dict(), dict(
n=lambda p: p.name,
t=lambda p: stats[p].st_mtime,
s=lambda p: stats[p].st_size )[op_out_sort or 'n']
for p, ps in p_out.items():
p_out[p] = sorted(ps, key=sort_func)
for ps in p_out.values():
for p in ps:
if p in ps_print: continue
if not (st := stats.get(p)):
try: st = stats[p] = p.stat()
except OSError: continue
ps_print[p] = None
return list(ps_print.keys())
def mtime_parse(mtimes, err_func):
import datetime as dt
ts_now = dt.datetime.now()
_td_days = dict(
y=365.25, yr=365.25, year=365.25,
mo=30.5, month=30.5, w=7, week=7, d=1, day=1 )
_td_s = dict( h=3600, hr=3600, hour=3600,
m=60, min=60, minute=60, s=1, sec=1, second=1 )
_td_usort = lambda d: sorted(
d.items(), key=lambda kv: (kv[1], len(kv[0])), reverse=True )
_td_re = re.compile('(?i)^[-+]?' + ''.join( fr'(?P<{k}>\d+{k}\s*)?'
for k, v in [*_td_usort(_td_days), *_td_usort(_td_s)] ) + '$')
def _ts_parse(ts_str, rel=False):
ts = (ts_str := ts_str.strip()).lower() == 'now' and dt.timedelta(0)
no_ts = lambda: ts in [None, False]
if no_ts():
try: ts = dt.timedelta(seconds=float(ts_str))
except: pass
if no_ts() and ( # short time offset like "3d 5h"
(m := _td_re.search(ts_str)) and any(m.groups()) ):
delta = list()
for units in _td_days, _td_s:
val = 0
for k, v in units.items():
if not m.group(k): continue
val += v * int(''.join(filter(str.isdigit, m.group(k))) or 1)
delta.append(val)
ts = dt.timedelta(*delta)
if no_ts() and (m := re.search( # common BE format
r'^(?P<date>(?:\d{2}|(?P<Y>\d{4}))-\d{2}-\d{2})'
r'(?:[ T](?P<time>\d{2}(?::\d{2}(?::\d{2})?)?)?)?$', ts_str )):
tpl = 'y' if not m.group('Y') else 'Y'
tpl, tss = f'%{tpl}-%m-%d', m.group('date')
if m.group('time'):
tpl_time = ['%H', '%M', '%S']
tss += ' ' + ':'.join(tss_time := m.group('time').split(':'))
tpl += ' ' + ':'.join(tpl_time[:len(tss_time)])
try: ts = dt.datetime.strptime(tss, tpl)
except ValueError: pass
if no_ts() and (m := re.search( # just time without AM/PM - treat as 24h format
r'^\d{1,2}:\d{2}(?::\d{2}(?P<us>\.\d+)?)?$', ts_str )):
us, tpl = 0, ':'.join(['%H', '%M', '%S'][:len(ts_str.split(':'))])
if m.group('us'):
ts_str, us = ts_str.rsplit('.', 1)
us = us[:6] + '0'*max(0, 6 - len(us))
try: ts = dt.datetime.strptime(ts_str, tpl)
except ValueError: pass
else:
ts = ts_now.replace( hour=ts.hour,
minute=ts.minute, second=ts.second, microsecond=int(us) )
if ts > ts_now: ts -= dt.timedelta(1)
if no_ts(): # coreutils' "date" parses everything, but is more expensive to use
import subprocess as sp
while True:
res = sp.run( ['date', '+%s', '-d', ts_str],
stdout=sp.PIPE, stderr=sp.DEVNULL )
if res.returncode:
if ',' in ts_str: ts_str = ts_str.replace(',', ' '); continue
else:
ts = dt.datetime.fromtimestamp(int(res.stdout.strip()))
if 0 < (ts - ts_now).total_seconds() <= 24*3600 and re.search(
r'(?i)^[\d:]+\s*(am|pm)?\s*([-+][\d:]+|\w+|\w+[-+][\d:]+)?$', ts_str.strip() ):
ts -= dt.timedelta(1)
break
if no_ts(): err_func(f'Failed to parse -t/--mtime spec: {ts_str}')
if not rel and isinstance(ts, dt.timedelta): ts = ts_now - ts
return ts
if (ranges := any('/' in td for td in mtimes)) and not all('/' in td for td in mtimes):
err_func( '-t/--mtime values must either be'
' all deltas or time-ranges, cannot be a mix of both' )
for n, ts_str in enumerate(mtimes):
if ranges:
a, s, b = ts_str.partition('/')
mtimes[n] = sorted(_ts_parse(ts).timestamp() for ts in [a, b])
else:
if not isinstance(td := _ts_parse(ts_str, rel=True), dt.timedelta):
err_func(f'Absolute -t/--mtime specs can only be used in A/B ranges: {ts_str}')
mtimes[n] = td.total_seconds()
return mtimes, not ranges
def mtime_list_vicinity(paths, mtimes):
ps, td = list(), max(mtimes)
for p in paths:
try: ps.append((ts := p.stat().st_mtime, p))
except OSError: p_err(f'path inaccessible [ {p} ]'); continue
for p2 in p.parent.iterdir():
try: ts2 = p2.stat().st_mtime
except OSError: continue
if abs(ts - ts2) <= td: ps.append((ts2, p2))
return list(dict.fromkeys(p for ts, p in sorted(ps)))
def mtime_list_ranges(paths, mtimes):
ps = list()
for p in paths:
try: ts = p.stat().st_mtime
except OSError: continue
for a, b in mtimes:
if a <= ts <= b: ps.append((ts, p))
return list(dict.fromkeys(p for ts, p in sorted(ps)))
def main(argv=None):
import argparse, textwrap
dd = lambda text: re.sub( r' \t+', ' ',
textwrap.dedent(text).strip('\n') + '\n' ).replace('\t', ' ')
parser = argparse.ArgumentParser(
formatter_class=argparse.RawTextHelpFormatter,
usage='%(prog)s [opts] [[--] file/dir...]',
description='Custom functionality extensions of the common "ls" tool.')
parser.add_argument('paths', nargs='*', help=dd('''
File/dir path arguments for various options below.
If no other options are provided, tool will simply list all existing
path(s), printing errors on stderr for those that can't be accessed.
Unlike "ls" tool, directory contents aren't listed
recursively, unless -r/--recursive option is used.'''))
parser.add_argument('-f', '--files', action='store_true',
help='Print only files, not dirs or any other stuff.')
parser.add_argument('-0', '--null', action='store_true',
help='Print null-terminated list, instead of default newline-terminated.')
parser.add_argument('-r', '--recursive', action='store_true',
help='Recurse into specified directories, if other options support that.')
parser.add_argument('-a', '--adjacent', metavar='opts', action='append', help=dd('''
List specified file/dir path, and also N files/dirs adjacent to it.
"Adjacent" as in located within same directory, and sorted right before/after it.
Requires a parameter, consisting of optional numbers/letters:
number (digits) - count of items (default=10) to display before/after path(s),
counting before/after separately (if enabled), and not counting path itself.
b/B - before - only list entries sorted before specified one(s).
a/A - after - same as "before" above, can be combined for before + after (default).
n/N - name - order items by filenames alphabetically (default).
t/T - time - order items by mtime.
s/S - size - order items by apparent filesize (from stat.st_size).
Default is to return 10 before/after items with alphabetical ordering.
Some file/dir arguments must be specified for this option.
Option can be used multiple times to select more items by additional criteria,
in which case they will be returned sorted by the first
parameter that's explicitly specified (or default alpha-sorted otherwise).'''))
parser.add_argument('-t', '--mtime',
metavar='delta-or-range', action='append', help=dd('''
List files/dirs that have modification time within specified range or vicinity.
Either time delta or time range(s) can be specified with this option:
delta examples: 1h30m, 1d, 2w 3d 4h, 4:10:20, 55s, 55, ...
range examples: 5d/8d, 1d/now, 2024-10-20 4:10:20 / 2mo, 1:00/6:00, ...
If delta (relative time) is specified, adjacent files/dirs within mtime-vicinity
from file/dir command-line arguments are listed (and at least one is required).
If time range is specified (two absolute or relative timestamps in any order),
then files/dirs that have mtime within that range are listed, either in the
current dir, or among/within specified file/dir arguments (see -r/--recursive option).
Multiple time ranges can be specified.
Results are ordered by mtime, pipe through "sort" for alphabetic ordering.
Works somewhat similar to "find" tool with its -mtime option,
but mostly intended to easily list files created/modified around the same time.'''))
opts = parser.parse_args(sys.argv[1:] if argv is None else argv)
pp_first, pp_nullsep = True, opts.null
def _pp(p):
nonlocal pp_first
p, (cc, cn) = str(p), ('\n', 'newline') if not pp_nullsep else ('\0', 'null-char')
if cc in p: p_err(f'Skipping entry with {cn} in it: {p!r}')
if not pp_nullsep: p = f'{p}\n'
elif not pp_first: p = f'\0{p}'
else: pp_first = False
sys.stdout.write(p)
no_opts, paths = True, list(pl.Path(p) for p in opts.paths)
if opts.adjacent:
for p in list_adjacent(paths, opts.adjacent, opts.files): _pp(p)
no_opts = False
if opts.mtime:
no_opts, (tds, td_vicinity) = False, mtime_parse(opts.mtime, parser.error)
if td_vicinity:
if not paths: parser.error(
'At least one path argument required for -t/--mtime time-delta' )
for p in mtime_list_vicinity(paths, tds): _pp(p)
else:
ps = list()
if opts.recursive and not paths: paths = [pl.Path('.')]
for p in paths:
if not p.exists(): p_err(f'path inaccessible [ {p} ]'); continue
if opts.recursive and p.is_dir(): ps.extend(p.iterdir())
elif not opts.files or p.is_file(): ps.append(p)
for p in mtime_list_ranges(ps, tds): _pp(p)
if no_opts: # no options - print filtered input paths back
if opts.recursive and not paths: paths = [pl.Path('.')]
for p in paths:
if not p.exists(): p_err(f'path inaccessible [ {p} ]'); continue
if opts.recursive and p.is_dir():
for p2 in p.iterdir(): _pp(p2)
elif not opts.files or p.is_file(): _pp(p)
if __name__ == '__main__':
try: sys.exit(main())
except BrokenPipeError: # stdout pipe closed
os.dup2(os.open(os.devnull, os.O_WRONLY), sys.stdout.fileno())
sys.exit(1)