forked from mk-fg/fgtk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
prometheus-grafana-simplejson-aggregator
executable file
·626 lines (550 loc) · 24.6 KB
/
prometheus-grafana-simplejson-aggregator
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
#!/usr/bin/env python
import urllib.request as ur, urllib.parse as up, urllib.error as ue
import itertools as it, functools as ft, operator as op
import contextlib as cl, datetime as dt, collections as cs
import os, sys, json, logging, enum, calendar, time, re
class LogMessage:
def __init__(self, fmt, a, k): self.fmt, self.a, self.k = fmt, a, k
def __str__(self): return self.fmt.format(*self.a, **self.k) if self.a or self.k else self.fmt
class LogStyleAdapter(logging.LoggerAdapter):
def __init__(self, logger, extra=None): super().__init__(logger, extra or dict())
def log(self, level, msg, *args, **kws):
if not self.isEnabledFor(level): return
log_kws = {} if 'exc_info' not in kws else dict(exc_info=kws.pop('exc_info'))
msg, kws = self.process(msg, kws)
self.logger._log(level, LogMessage(msg, args, kws), (), **log_kws)
get_logger = lambda name: LogStyleAdapter(logging.getLogger(f'grafana-agg.{name}'))
def str_part(s, sep, default=None):
'''Splits string by separator, always returning two values.
Separator can be empty to split on spaces. Examples:
str_part("user@host", "<@", "root"), str_part("host:port", ":>")
word, rest = str_part("some random words", "<<")'''
seps = sep.strip(c := sep.strip('<>') or None)
fill_left = seps.startswith('<')
ss = s.split(c, 1) if fill_left else s.rsplit(c, 1)
if len(ss) == 2: return ss
if not fill_left: return [s, default]
return [default, s] if seps != '<<' else [s, default]
def val_bytes_iec(v):
if v < 1000: return f'{v:,} B'
for n, u in enumerate('KMG', 1):
div = 2**(n*10)
if v < 1000 * div: break
return f'{v/div:,.1f} {u}iB'
def date_inc(date, d=0, w=0, m=0, y=0):
if y: date = date.replace(year=date.year + y)
if m:
month = date.month - 1 + m
year = date.year + month // 12
month = month % 12 + 1
day = min(date.day, calendar.monthrange(year, month)[1])
date = dt.date(year, month, day)
if w: date += dt.timedelta(days=w * 7)
if d: date += dt.timedelta(days=d)
return date
def date_round(date, span, up=False):
if span is Span.year: date = date.replace(day=1, month=1)
elif span is Span.month: date = date.replace(day=1)
elif span is Span.week: date -= dt.timedelta(days=date.weekday())
if up: date = date_inc(date, **{span.value: 1})
return date
def date_span_repr(date, span):
if span == Span.day: date = date.strftime('%Y-%m-%d')
elif span == Span.week:
date_we = date + dt.timedelta(days=7)
date = date.strftime('%b %d %Y').split()
date = date[0] + date[1] + '-' + date_we.strftime('%d') + date[2]
elif span == Span.month: date = date.strftime('%b %Y')
elif span == Span.year: date = date.strftime('%Y')
return date
class Span(enum.Enum):
day = 'd'
week = 'w'
month = 'm'
year = 'y'
class AggMode(enum.Enum):
total = 'total'
# sum = 'sum'
class AggDB:
# db_migrations can have multiple statements, separated by ;
_db, _db_migrations = None, ['''
create table if not exists meta (
var text not null primary key, val text not null );
create table if not exists data (
id integer not null primary key autoincrement,
metric text not null, span text not null,
date text not null, labels text not null, value int not null );
create unique index if not exists data_points on data (metric, span, date, labels);''']
def __init__(self, path, lock_timeout=60, lazy=True):
import sqlite3
self._sqlite, self._ts_activity = sqlite3, 0
self._db_kws = dict( database=path,
isolation_level='IMMEDIATE', timeout=lock_timeout )
self.log = get_logger('db')
if not lazy: self._db_init()
def close(self, inactive_timeout=None):
if ( inactive_timeout is not None
and (time.monotonic() - self._ts_activity) < inactive_timeout ): return
if self._db:
self._db.close()
self._db = None
def __enter__(self): return self
def __exit__(self, *err): self.close()
def _db_init(self):
self._db = self._sqlite.connect(**self._db_kws)
sk, sv_new = 'schema-version', len(self._db_migrations)
with self._db_cursor() as c:
c.execute('pragma journal_mode=wal')
c.execute('savepoint sv')
try:
# Ideally this would be "select ... for update", but not with sqlite
c.execute('select val from meta where var = ?', (sk,))
row = c.fetchall()
except self._sqlite.OperationalError as err:
if not err.args[0].startswith('no such table:'): raise
c.execute('rollback to sv')
row = None
c.execute('release sv')
sv = int(row[0][0]) if row else 0
if sv_new <= sv: return
for sv, sql in enumerate(self._db_migrations[sv:], sv+1):
for st in sql.split(';'): c.execute(st)
c.execute('replace into meta (var, val) values (?, ?)', (sk, str(sv)))
@cl.contextmanager
def _db_cursor(self):
self._ts_activity = time.monotonic()
if not self._db: self._db_init()
with self._db as conn, cl.closing(conn.cursor()) as c: yield c
def add_point(self, metric, span, date, labels, value):
with self._db_cursor() as c:
c.execute(
'replace into data (metric, span, date, labels, value)'
' values (?, ?, ?, ?, ?)', (metric, span.value, date, labels, value) )
def get_last_date(self, metric, span, labels):
'''Returns date (str) of last stored value for metric/span/labels,
but only if it's not the only value for these parameters, None otherwise.'''
with self._db_cursor() as c:
c.execute(
'select date from data where metric = ? and span = ?'
' and labels = ? order by date desc limit 2', (metric, span.value, labels) )
v = c.fetchall()
return v[0][0] if v and len(v) == 2 else None
def get_metric_list(self):
with self._db_cursor() as c:
c.execute( 'select metric, labels from data'
' group by metric, labels order by metric, labels' )
return list(f'{row[0]}[{row[1]}]' for row in (c.fetchall() or list()))
_metric_row = cs.namedtuple('metric', 'date metric labels value')
def get_metric_data(self, metric_label_list, date_range=None):
if not metric_label_list: return list()
with self._db_cursor() as c:
q_where, q_vals, q_order = list(), list(), 'metric, labels'
for metric, labels in metric_label_list:
if labels:
q_where.append('(metric = ? and labels = ?)')
q_vals.extend([metric, labels])
else:
q_where.append('metric = ?')
q_vals.append(metric)
q_where = '(' + ' or '.join(q_where) + ')'
if date_range:
date0, date1 = date_range
q_where, q_order = q_where + ' and (date >= ? and date <= ?)', q_order + ', date'
q_vals.extend([date0, date1])
q_keys = ', '.join(self._metric_row._fields)
q = f'select {q_keys} from data where {q_where} order by {q_order}'
self.log.debug('Q: {!r} {}', q, q_vals)
c.execute(q, q_vals)
return list(self._metric_row(*row) for row in (c.fetchall() or list()))
### ----- Aggregator mode
class AggError(Exception): pass
class Aggregator:
def __init__(self, db, src, dst, labels, mode, url, spans):
self.src, self.dst, self.labels = src, dst, labels
self.db, self.mode, self.url = db, mode, url
self.spans, self.val_cache = list(Span(s) for s in spans), dict()
self.log = get_logger('agg')
if self.mode != AggMode.total: raise NotImplementedError(self.mode)
def _q_all(self, m, q, **kws):
url = f'{self.url}/api/v1/{m}'
if q or kws:
req_qs = up.urlencode(dict(query=q, **kws))
url = f'{url}?{req_qs}'
if url in self.val_cache: return self.val_cache[url]
# self.log.debug('prms-q: {}', url)
with ur.urlopen(url) as res_http:
res = json.loads(res_http.read().decode())
if res.get('status') != 'success': raise AggError(m, q, res)
self.val_cache[url] = res['data']
return res['data']
def _q(self, m='query', q=None, res_type=None, lvs_filter=None, **q_kws):
qi = lambda: f'({m} {q!r} {q_kws})'
try: res = self._q_all(m, q, **q_kws)
except ue.HTTPError as err: raise AggError(f'{qi()} http error: {err}')
if res_type and res['resultType'] != res_type:
raise AggError(f'{qi()} incorrect result type [{res_type} {lvs_filter}]: {res}')
if lvs_filter:
if lvs_filter is True:
rs = res.get('result', list())
if len(rs) == 1: return rs[0]
raise AggError(f'{qi()} expecting single value [{lvs_filter}]: {res}')
for rs in res.get('result') or list():
for k, v in lvs_filter:
if rs['metric'][k] != v: break
else: return rs
else: raise AggError(f'{qi()} no result for label(s) [{lvs_filter}]: {res}')
return res
def run(self):
label_values = list()
for v in self.labels: label_values.append(self._q(f'label/{v}/values'))
for s, *lvs in it.product(self.spans, *label_values): self.agg_span(s, lvs)
def _agg_span_ext_val(self, t, m, ts, delta_days, lvs_filter):
# It can be inefficient to scan whole delta_days range (e.g. a year),
# so compromise is to try and only get values for small timespans on each end here
err_last, q = None, ft.partial(self._q, 'query', lvs_filter=lvs_filter, res_type='vector')
for delta_max in [1, 7, 30, 120, 400]:
d = min(delta_max, delta_days)
ts_q = ts if t == 'max' else (ts + d * 24 * 3600)
try:
rs = q(f'{t}_over_time({m}[{d}d])', time=ts_q)
try: ts_v, v = rs['value']
except: raise AggError('value-end') from None
return d, ts_v, v
except AggError as err: err_last = err
if delta_max >= delta_days: break
raise err_last
def agg_span(self, span, lvs):
lvs_filter = list(zip(self.labels, lvs))
labels = ' '.join(sorted(f'{k}={v}' for k,v in lvs_filter))
metric_src = f'{self.src}_total'
metric_dst = self.dst.format(span=span.name)
date0 = self.db.get_last_date(metric_dst, span, labels)
if date0: date0_ts, date0 = None, dt.date(*map(int, date0.split('-')))
else:
metric_ts = f'{self.src}_created'
rs = self._q(
q=f'min_over_time({metric_ts}[10y])',
res_type='vector', lvs_filter=lvs_filter )
date0_ts = float(rs['value'][1]) # note: not usable for precise queries
date0 = date_round(dt.date.fromtimestamp(date0_ts), span)
date_now = dt.date.today()
self.log.debug( 'agg-span [{} {}]: {}+ [{}]', span.value,
labels, date0, date0_ts and dt.datetime.fromtimestamp(date0_ts) )
date_ts = lambda date: dt.datetime(date.year, date.month, date.day).timestamp()
while date0 <= date_now:
value, err, date1 = None, None, date_inc(date0, **{span.value: 1})
delta_days = (date1 - date0).days
try:
d0, ts0, v0 = self._agg_span_ext_val( 'min', metric_src,
date0_ts or date_ts(date0), delta_days, lvs_filter )
d1, ts1, v1 = self._agg_span_ext_val(
'max', metric_src, date_ts(date1), delta_days, lvs_filter )
value = int(v1) - int(v0)
# self.log.debug(
# 'agg-span-values [{} {} {}]: {} [{}:{}] - {} [{}:{}]',
# span.value, date0, date1, v0, ts0, d0, v1, ts1, d1 )
except AggError as err:
self.log.debug(
'Missing/bogus value {!r} [{}]: {} - {} [{}]',
self.src, labels, date0, date1, err )
value = 0
self.log.debug('agg-span-db [{} {} {}]: {!r} {:,}', span.value, date0, date1, labels, value)
self.db.add_point(metric_dst, span, date0, labels, value)
date0, date0_ts = date1, None
### ----- Export-httpd mode
import io, base64
log_uid = lambda: base64.urlsafe_b64encode(os.urandom(4 * 6 // 8)).rstrip(b'=').decode()
def log_lines(log_func, lines, log_func_last=False, uid=None, **log_func_kws):
if isinstance(lines, str): lines = (line.rstrip() for line in lines.rstrip().split('\n'))
if uid is None: uid = log_uid()
n_last = len(lines := list(lines))
for n, line in enumerate(lines, 1):
if not isinstance(line, str): line = line[0].format(*line[1:])
if uid is not False: line = f'[{uid}] {line}'
if log_func_last and n == n_last: log_func_last(line)
else: log_func(line, **log_func_kws)
def log_req_body( body, buff=None,
tail=20, len_repr=64 * 2**10, len_max=4 * 2**20, bs=2**20 ):
body_repr = body.read(len_repr)
if buff: buff.write(body_repr)
c = body.read(1)
if c:
if buff: buff.write(c)
n, chunk = len(body_repr) + 1, b''
while n < len_max:
chunk = body.read(bs)
if not chunk: break
n += len(chunk)
if buff: buff.write(chunk)
n = f'{n:,d} B' if not chunk else f'{n:,d}+ B'
body_repr += f' ...(len=[{n}])'.encode()
try: return body_repr.decode()
except UnicodeDecodeError:
h = body_repr[:tail].decode('utf-8', 'surrogateescape')
t = body_repr[-tail:].decode('utf-8', 'surrogateescape')
return f'<binary [{len(body_repr):,d} B] [{h} ... {t}]>'
def log_req(e, body=None, err_file=sys.stderr, buff=None):
headers = dict((k[5:], e.pop(k)) for k in list(e) if k.startswith('HTTP_'))
body_lines = (log_req_body(body, buff) or list()) if body else list()
if body_lines: body_lines = list((' ' + line) for line in body_lines.split('\n'))
req_get = lambda k: e.get(k.upper()) or '???'
log_func = ft.partial(log_lines, lambda line: err_file.write(line + '\n'))
log_func([
( '--- req: {} {} {}', req_get('request_method'),
req_get('path_info'), req_get('server_protocol') ),
'- headers:', *((' {}: {}', k, v) for k, v in headers.items()),
'- env:', *((' {}: {}', k, v) for k, v in sorted(e.items(), key=op.itemgetter(0))),
('- body [{:,d} line(s)]:', len(body_lines)), *body_lines, '--- req end' ])
def log_req_print_res(res, **print_kws):
info_lines = res.info().as_string().strip().split('\n')
body = res.read().decode('utf-8', 'surrogateescape')
print('Reponse:', str(res.getcode()), str(res.geturl()).strip(), **print_kws)
print('Info:\n' + '\n'.join(f' {line}' for line in info_lines), **print_kws)
print(f'Body:\n----------\n{body}\n----------', **print_kws)
class HTTPResponse(Exception):
def __init__(self, res_code='200 OK', res=None, res_ct='text/plain'):
if isinstance(res, bytes): pass
elif isinstance(res, str): res = res.encode()
elif res is None: res = (res_code + '\n').encode()
else: res, res_ct = json.dumps(res).encode(), 'application/json'
super().__init__(res_code, res, res_ct)
class ExportHTTPd:
db_check_sig, db_check_timeout, db_check_interval = 153, 60, 450
def __init__(self, db_path, debug=False):
self.db_path, self.debug = db_path, debug
self.log = get_logger('httpd')
__db = None
@property
def _db(self):
if not self.__db:
self.__db = AggDB(self.db_path)
# Timer gets rescheduled so that it can stop on error/gc/etc
uwsgi.register_signal( self.db_check_sig,
f'worker{uwsgi.worker_id()}', self._db_close_check )
uwsgi.add_rb_timer(self.db_check_sig, self.db_check_interval, 1)
return self.__db
def _db_close_check(self, sig):
self.__db.close(self.db_check_timeout)
uwsgi.add_rb_timer(self.db_check_sig, self.db_check_interval, 1)
def req(self, e, body, err_file, start_response):
if self.debug:
body, body_raw = io.BytesIO(), body
log_req(e, body_raw, err_file, buff=body)
body.seek(0)
res_code, res = '500 Internal Server Error', b''
res_headers, res_ct = dict(), 'text/plain'
try: self._req_proc(e, body)
except HTTPResponse as res_ex: res_code, res, res_ct = res_ex.args
if e.get('REQUEST_METHOD', '').lower() == 'head': res = b''
start_response(res_code, [
('Access-Control-Allow-Origin', '*'),
('Access-Control-Allow-Methods', 'GET, POST, OPTIONS'),
('Access-Control-Allow-Headers',
'Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token'),
('Content-type', res_ct),
('Content-Length', str(len(res))) ])
if self.debug: print(f'--- res-data: {res}', file=err_file, flush=True)
return [res]
def _req_proc(self, e, body):
req = up.urlparse(e.get('REQUEST_URI', ''))
req_verb = e.get('REQUEST_METHOD', '').lower()
if req.path == '/':
if req_verb not in ['get', 'head', 'options']:
raise HTTPResponse('405 Method Not Allowed')
raise HTTPResponse('200 OK', b'OK\n')
else:
if e.get('REQUEST_METHOD', '').lower() not in ['get', 'post', 'head']:
raise HTTPResponse('405 Method Not Allowed')
if req.path == '/search':
raise HTTPResponse(res=self._db.get_metric_list())
elif req.path == '/query':
query = body.read(256 * 2**10)
if body.read(1): raise ValueError('body too large')
try: query = json.loads(query.decode('utf-8', 'backslashreplace'))
except json.JSONDecodeError: raise HTTPResponse('400 Bad Request')
self._req_query(query)
raise HTTPResponse('400 Bad Request')
def _req_query(self, q):
res = list()
parse_date = lambda ts: dt.date.fromtimestamp(
dt.datetime.strptime(ts, '%Y-%m-%dT%H:%M:%S.%fZ').timestamp() )
date0, date1 = (parse_date(d) for d in [q['range']['from'], q['range']['to']])
for qt in q['targets']:
data, qts = None, qt.get('target') or ''
if m := re.search(r'^\[(\w+)(?::(\w))?\]\s+(\S+.*)', qts):
fmt, qs, qts = m.groups()
if fmt == 'table': data = self._query_table(qt, qs, qts, date0, date1)
else: self.log.debug('Unrecognized format in query: {!r}', fmt)
else: data = self._query_metric(qt, date0, date1)
res.append(data)
raise HTTPResponse(res=res)
def _query_metric(self, qt, date0, date1):
# Query example: iface_traffic_bytes_day[dir=in]:m@traffic-in
# ":m" - optional span (default=d), "@traffic-in" - resulting value name
# https://github.com/grafana/simple-json-datasource
# https://grafana.com/grafana/plugins/grafana-simple-json-datasource
qt, qid, qtt = qt.get('target'), qt.get('refId'), qt.get('type')
qk, qa = str_part(qt, '@>')
qk, qs = str_part(qk, ':>')
qs = Span(qs) if qs else Span.day
date_range = date_round(date0, qs), date_round(date1, qs, up=True)
if qk and (m := re.findall(r'^([^\]]+)(?:\[([^\]]+)\])?$', qk)):
metric, labels = m[0]
data = self._db.get_metric_data([(metric, labels)], date_range)
data = list( # (value, timestamp[ms])
(d.value, int(dt.datetime.fromisoformat(d.date).timestamp())*1000)
for d in data )
else:
self.log.warning('Unrecognized query metric format: {!r}', qk)
data = list()
if qtt == 'timeserie':
return dict(target=qt, refId=qid, type=qtt, datapoints=data)
elif qtt == 'table':
return dict(
target=qt, refId=qid, type=qtt, rows=data,
columns=[dict(text=qa or qk, type='number'), dict(text='Time', type='time')] )
else: self.log.debug('Unrecognized query target type: {!r}', qtt)
def _query_table(self, qt, qs, qts, date0, date1):
# Query example: [table:m] date:-1@Date iface_traffic_bytes_day[dir=in]@'traffic in' ...
# Same as export_query_metric, except returns a table with specified metrics, same field order
# Special "date" column is a string representation of the specified timespan
# Metric name alias can be parsed as json string value if it's in double quotes
# -1 in "date:-1@Date" is a "sort by this first, reverse order", comes before alias part
qid, qtt, qs = qt.get('refId'), qt.get('type'), Span(qs or 'd')
date_range = date_round(date0, qs), date_round(date1, qs, up=True)
# Parse metrics/labels and processing tags from query
m_label_list, m_meta, m_date_idx, m_sort = list(), list(), list(), list()
qts_base, alias_dec = qts, json.JSONDecoder()
while qts:
if re.search(r'^\S+@\S', qts):
qk, qts = qts.split('@', 1)
if qts[0] in '"': # decode as json string
qa, pos = alias_dec.raw_decode(qts, 0)
qts = qts[pos:].lstrip()
else: qa, qts = str_part(qts, '<<')
else: qa, (qk, qts) = None, str_part(qts, '<<')
qk, qo = str_part(qk, ':')
if not qa: qa = qk
if not (m := re.findall(r'^([^\]]+)(?:\[([^\]]+)\])?$', qk)):
self.log.warning('Unrecognized query metric format: {!r}', qk)
continue
metric, labels = m = m[0]
if qo := int(qo or 0): m_sort.append((abs(qo), qo<0, len(m_label_list)))
if metric != 'date': m_label_list.append(m)
else: m_date_idx.append(len(m_label_list))
m_meta.append((m, qa))
# Build table with fields from m_meta in same order, sorted according to m_sort
# Group metrics by date
data = self._db.get_metric_data(m_label_list, date_range)
table = cs.defaultdict(dict)
for row in data:
k_row = row.metric, row.labels
for k, alias in m_meta:
if k != k_row: continue
# if proc == 'bytes': v = val_bytes_iec(v)
table[row.date][alias] = row.value
table[row.date]['date'] = row.date
# Sort columns as per m_meta
table_header = sorted(map(op.itemgetter(1), m_meta))
table_types = ['number'] * len(table_header)
table, get_vals = list(table.items()), op.itemgetter(*table_header)
for n, (date, metrics) in enumerate(table): table[n] = list(get_vals(metrics))
# Sort rows by m_sort specs
for _, rev, n in sorted(m_sort, reverse=True):
table.sort(key=lambda ms: ms[n], reverse=rev) # python has stable sort
# Format dates, now that they no longer have to be compared
if m_date_idx:
for n in m_date_idx: table_types[n] = 'string'
for ms in table:
date = ms[m_date_idx[0]] = date_span_repr(
dt.date.fromisoformat(ms[m_date_idx[0]]), qs )
for n in m_date_idx[1:]: ms[n] = date
# Results
if qtt == 'timeserie':
data = list(dict(zip(table_header, m)) for m in table)
return dict(target=qt, refId=qid, type=qtt, datapoints=data)
elif qtt == 'table':
return dict( target=qt, refId=qid, type=qtt, rows=table,
columns=list(dict(text=k, type=t) for k, t in zip(table_header, table_types)) )
else: self.log.debug('Unrecognized query target type: {!r}', qtt)
### ----- CLI / uwsgi
default_db_path = 'data.sqlite'
def main(args=None):
import argparse, textwrap
dd = lambda text: (textwrap.dedent(text).strip('\n') + '\n').replace('\t', ' ')
fill = lambda s,w=90,ind='',ind_next=' ',**k: textwrap.fill(
s, w, initial_indent=ind, subsequent_indent=ind if ind_next is None else ind_next, **k )
parser = argparse.ArgumentParser(
usage='%(prog)s [options] [-a|--aggregate]',
formatter_class=argparse.RawTextHelpFormatter,
description=dd('''
Aggregate data queried from prometheus by-day/w/mo/etc and store in sqlite
with -a/--aggregate option, or export it via Simple-JSON API for Grafana.
Default is to run export-httpd under uwsgi, latter is required.'''))
group = parser.add_argument_group('Common/basic options')
group.add_argument('-d', '--db',
metavar='path', default=default_db_path,
help=dd('''
Path to sqlite database to update and load values from.
Can be specified as PMA_DB_PATH env var in uwsgi mode.'''))
group.add_argument('--debug', action='store_true',
help='Verbose operation mode.'
' Non-empty PMA_DEBUG env will enable it in uwsgi mode.')
group = parser.add_argument_group('Aggregation mode')
group.add_argument('-a', '--aggregate', metavar='src:dst',
help=dd('''
Aggregate src -> dst metric.
Source metric will be queried for specified timespans,
and will be auto-suffixed by _total and _created and such, as necessary.
Destination metric name should be a template in python str.format,
with one templated parameter - "span" (daily, weekly, monthly, yearly).'''))
group.add_argument('-m', '--agg-mode',
metavar='mode', default='total', choices=['total'],
help=dd('''
Aggregation mode (default: %(default)s):
- "total" - compare values at the start and end of each period.
- "sum" - not implemented, sum values for each period.'''))
group.add_argument('-p', '--prometheus-url', metavar='url',
default='http://localhost:9090/', help='Prometheus base URL. Default: %(default)s')
group.add_argument('--agg-spans', metavar='letters', default='dwmy',
help='Letters for timespans for which'
' to aggregate data (d=day, w=week, etc). Default: %(default)s')
group.add_argument('--agg-labels', metavar='labels',
help='Space/comma-separated labels to query for metric.')
opts = parser.parse_args(sys.argv[1:] if args is None else args)
logging.basicConfig(level=logging.DEBUG if opts.debug else logging.WARNING)
log = get_logger('main')
if not opts.aggregate: parser.error('Running in server mode, but uwsgi not detected')
if set(opts.agg_spans).difference(s.value for s in Span):
parser.error(f'Unknown/extra span type(s): {opts.agg_spans!r}')
if not opts.agg_labels: parser.error('Option --agg-labels is required')
if ':' not in opts.aggregate:
parser.error( '-a/--aggregate must be in'
f' "src-metric:dst-metric" format: {opts.aggregate!r}' )
src, dst = opts.aggregate.split(':', 1)
log.debug('Running in aggregator mode...')
with AggDB(opts.db) as db:
Aggregator( db, src, dst, labels=opts.agg_labels.replace(',', ' ').split(),
mode=AggMode(opts.agg_mode), url=opts.prometheus_url, spans=opts.agg_spans ).run()
log.debug('Finished')
def init_uwsgi_handler():
httpd = ExportHTTPd(
db_path=os.environ.get('PMA_DB_PATH') or default_db_path,
debug=bool(os.environ.get('PMA_DEBUG')) )
logging.basicConfig(level=logging.DEBUG if httpd.debug else logging.WARNING)
log = get_logger('main')
log.debug('Running in export-httpd mode...')
log.debug('uwsgi-env config: debug={} db-path={!r}', httpd.debug, httpd.db_path)
import traceback as tb
def application(e, start_response):
body, err_file = map(e.pop, ['wsgi.input', 'wsgi.errors'])
try: yield from httpd.req(e, body, err_file, start_response)
except Exception as err:
tb.print_exc(file=err_file)
raise
err_file.flush()
return application
if __name__ == '__main__': sys.exit(main())
elif __name__.startswith('uwsgi_'):
import uwsgi # to make sure uwsgi is there
application = init_uwsgi_handler()