-
Notifications
You must be signed in to change notification settings - Fork 867
/
caching_metric.py
159 lines (143 loc) · 4.58 KB
/
caching_metric.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
"""
Caching metric class for TS
"""
import logging
import socket
import time
from ts.metrics.dimension import Dimension
from ts.metrics.metric_abstract import MetricAbstract
from ts.metrics.metric_type_enum import MetricTypes
logger = logging.getLogger(__name__)
class CachingMetric(MetricAbstract):
"""
Class for generating metrics and printing it to stdout of the worker
"""
def __init__(
self,
metric_name: str,
unit: str,
dimension_names: list = None,
metric_type: MetricTypes = MetricTypes.COUNTER,
):
"""
Constructor for CachingMetric
CachingMetric reports collected metrics to stdout of worker
Parameters
----------
metric_name str
Name of metric
unit str
unit can be one of ms, percent, count, MB, GB or a generic string
dimension_names list
list of dimension names which should be strings
metric_type MetricTypes
Type of metric Counter, Gauge, Histogram
"""
super().__init__(metric_name, unit, dimension_names, metric_type)
def _validate_and_get_dimensions(
self,
dimension_values: list,
) -> list:
"""
Validates that the dimension values match the dimension names
amd creates dimension objs
Parameters
----------
dimension_values
values corresponding to the metrics dimension names
Returns
-------
list of dimension objects or ValueError
"""
if dimension_values is None or len(dimension_values) != len(self.dimension_names):
raise ValueError(
f"Dimension values: {dimension_values} should "
f"correspond to Dimension names: {self.dimension_names}"
)
dim_objs = []
for dim_name, dim_value in zip(self.dimension_names, dimension_values):
dim_objs.append(Dimension(dim_name, dim_value))
return dim_objs
def _validate_metric_value(
self,
value: int or float,
) -> None:
"""
Validations for metric value
Parameters
----------
value
metric value to be updated
"""
if self.metric_type == MetricTypes.COUNTER and value < 0:
raise ValueError("Counter metric update value cannot be negative")
def emit_metrics(
self,
request_id: str,
value: int or float,
dimension_string: str,
) -> None:
"""
Logs metrics to
Parameters
----------
request_id
value
dimension_string
"""
metric_str = f"[METRICS]{self.metric_name}.{self.unit}:{value}|#{dimension_string}|" \
f"#hostname:{socket.gethostname()},{int(time.time())}"
if request_id:
logger.info(f"{metric_str},{request_id}")
else:
logger.info(metric_str)
def add_or_update(
self,
value: int or float,
dimension_values: list = [],
request_id: str = "",
):
"""
Update metric value, request id and dimensions
Parameters
----------
value : int, float
metric to be updated
dimension_values : list
list of dimension values
request_id : str
request id to be associated with the metric
"""
dimension_str = ""
try:
dimension_objs = self._validate_and_get_dimensions(dimension_values)
dimension_str = ",".join([str(d) for d in dimension_objs])
self._validate_metric_value(value)
except ValueError as ex:
logger.error(
f"[METRICS]Failed to update metric with name:"
f"{self.metric_name} and dimensions: {dimension_str} "
f"with value: {value}: {ex}"
)
else:
self.emit_metrics(request_id, value, dimension_str)
def update(
self,
value: int or float,
request_id: str = "",
dimensions: list = [],
):
"""
BACKWARDS COMPATIBILITY: Update metric value
Parameters
----------
value : int, float
metric to be updated
request_id : str
request id to be associated with the metric
dimensions : list
list of dimension values
"""
logger.warning("Overriding existing dimensions")
self.dimension_names = [dim.name for dim in dimensions]
self.add_or_update(value, [dim.value for dim in dimensions], request_id)