-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworker.py
executable file
·165 lines (144 loc) · 7.51 KB
/
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
"""
This module operates as the entry-point of this project.
"""
import json
import logging.config
import time
import schedule
from kafka import KafkaProducer
from kafka.errors import KafkaError
from prometheus_client.v1 import query_range
from settings import LOGGING, SCHEDULER_SECONDS, PROMETHEUS_METRICS_LIST, KAFKA_API_VERSION, \
KAFKA_SERVER, KAFKA_KUBERNETES_TOPIC
from utils import convert_unix_timestamp_to_datetime_str, retrieve_values, \
calculate_packet_loss_values
logging.config.dictConfig(LOGGING)
logger = logging.getLogger("publisher")
error_logger = logging.getLogger("errors")
def main():
"""main process"""
producer = KafkaProducer(bootstrap_servers=KAFKA_SERVER,
api_version=KAFKA_API_VERSION,
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
prom_ql = query_range.QueryRange(token=None)
# Keep metrics for the packet loss calculation
tx_rx_metrics = {}
for metric in PROMETHEUS_METRICS_LIST:
try:
# Load the values for the requested metric for any running container
# having the label 'vim_id'
response = retrieve_values(prom_ql, metric['name'])
if response.status_code != 200:
error_logger.error("GET {} - {}".format(response.url, response.text))
continue
response_body = response.json()
result_type = response_body['data'].get('resultType')
if result_type != "matrix":
logger.warning("The `resultType` is not the matrix. It is `{}`".format(result_type))
# The response of the `query_range` request returns the result type (`matrix`) and
# a list of results. Each object in results list includes one or more values of the
# requested metric per container ID; actually, the `label_vim_id` key reflects
# the container ID.
for result in response_body['data'].get('result', []):
metric_values = []
osm_container_id = result.get('metric', {}).get('label_vim_id', None)
# Skip process if the container is not relevant with OSM
if osm_container_id is None:
continue
# Add an empty dict for an OSM container
if osm_container_id not in tx_rx_metrics.keys():
tx_rx_metrics[osm_container_id] = {}
# Keep all the values for the requested metric by given container ID in a list.
values_len = len(result['values'])
if values_len:
latest_value = result['values'][values_len - 1]
metric_values.append(
{"timestamp": convert_unix_timestamp_to_datetime_str(latest_value[0]),
"unit": metric['unit'], "type": metric['type'], "name": metric['name'],
"value": latest_value[1]})
# Save temporary a set of useful metrics for the packet loss calculation
if metric["name"] in ["container_network_receive_packets_dropped_total",
"container_network_receive_packets_total",
"container_network_transmit_packets_dropped_total",
"container_network_transmit_packets_total"]:
tx_rx_metrics[osm_container_id][metric["name"]] = latest_value[1]
proper_tm = convert_unix_timestamp_to_datetime_str(latest_value[0])
tx_rx_metrics[osm_container_id]['timestamp'] = proper_tm
# Push the metric values in batch per container ID
payload = {"container_id": osm_container_id, "type": metric['type'],
"data": metric_values}
logger.debug("Generic metrics: {}".format(payload))
publish_metrics(producer, payload)
except Exception as ex:
error_logger.exception(ex)
# The tx_rx_metrics keeps the required metrics for the calculation of the
# packet loss in each container. Indicative sample:
# --------------------
# {
# "bfbcd872d6b64adcbcd872d6b64adcf1": {
# "timestamp": "2019-04-24T07:56:34.158000Z",
# "container_network_receive_packets_dropped_total": "0",
# "container_network_transmit_packets_dropped_total": "0",
# "container_network_transmit_packets_total": "15.5",
# "container_network_receive_packets_total": "36"
# },
# "c7a07d28d38746c6a07d28d38746c630": {
# "timestamp": "2019-04-24T07:56:34.158000Z",
# "container_network_receive_packets_dropped_total": "0",
# "container_network_transmit_packets_dropped_total": "0",
# "container_network_transmit_packets_total": "16.875",
# "container_network_receive_packets_total": "37.25"
# }
# }
for container in tx_rx_metrics.keys():
# Packet loss in RX
try:
container_network_receive_packet_loss_percentage = calculate_packet_loss_values(
tx_rx_metrics[container].get('container_network_receive_packets_dropped_total'),
tx_rx_metrics[container].get('container_network_receive_packets_total'))
rx_payload = {"container_id": container,
"type": 'container_network_receive_packet_loss_percentage',
"data": [{"timestamp": tx_rx_metrics[container]["timestamp"],
"unit": "%", "type": "counter",
"name": "container_network_receive_packet_loss_percentage",
"value": container_network_receive_packet_loss_percentage}]}
logger.debug("rx_packet_loss: {}".format(rx_payload))
publish_metrics(producer, rx_payload)
except TypeError as ex:
error_logger.error(ex)
# Packet loss in TX
try:
container_network_transmit_packet_loss_percentage = calculate_packet_loss_values(
tx_rx_metrics[container].get('container_network_transmit_packets_dropped_total'),
tx_rx_metrics[container].get('container_network_transmit_packets_total'))
tx_payload = {"container_id": container,
"type": 'container_network_transmit_packet_loss_percentage',
"data": [{"timestamp": tx_rx_metrics[container]["timestamp"],
"unit": "%", "type": "counter",
"name": "container_network_transmit_packet_loss_percentage",
"value": container_network_transmit_packet_loss_percentage}]}
logger.debug("tx_packet_loss: {}".format(tx_payload))
publish_metrics(producer, tx_payload)
except TypeError as ex:
error_logger.error(ex)
# Close producer
producer.close()
def publish_metrics(producer, payload):
""" Publish the payload in kafka bus
Args:
producer (iterator): The kafka iterator
payload (dict): The message to be published
Returns:
None
"""
request = producer.send(KAFKA_KUBERNETES_TOPIC, payload)
try:
request.get(timeout=3)
except KafkaError as ex:
error_logger.error(ex)
if __name__ == '__main__':
# Retrieve the data every X seconds
schedule.every(int(SCHEDULER_SECONDS)).seconds.do(main)
while True:
schedule.run_pending()
time.sleep(1)