-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathZabbixFactory.py
283 lines (215 loc) · 10.9 KB
/
ZabbixFactory.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
from abc import ABC
from typing import Union, Generator
from .Zabbix import *
class ZabbixFactory(Zabbix, ABC):
pass
class ZabbixProxyFactory(ZabbixFactory):
def __make(self, proxy: dict):
return ZabbixProxy(self._zapi, proxy)
@zapi_exception("Ошибка получения Zabbix прокси")
def __get(self, **options) -> list:
return self._zapi.proxy.get(**options)
def get_by_filter(self, _filter: dict, **options) -> Generator[ZabbixProxy, None, None]:
"""Получение списка объектов ZabbixGroup из ZabbixAPI по фильтру"""
z_proxies: list = self.__get(filter=_filter, **options)
return (self.__make(proxy) for proxy in z_proxies)
def get_by_host(self, _name: Union[str, List[str]]):
"""Получение списка объекторв ZabbixProxy из ZabbixAPI по имени"""
z_hosts = self.get_by_filter({'host': _name})
return z_hosts
class ZabbixGroupFactory(ZabbixFactory):
def __make(self, group: dict):
return ZabbixGroup(self._zapi, group)
@zapi_exception("Ошибка получения Zabbix группы", logging.CRITICAL)
def __get(self, **options) -> list:
return self._zapi.hostgroup.get(**options)
def get_by_id(self, groupid: int):
"""Создание объекта ZabbixGroup из ZabbixAPI"""
z_group = self.__get(groupids=[groupid])[0]
return self.__make(z_group)
def get_by_filter(self, _filter: dict) -> Generator[ZabbixGroup, None, None]:
"""Получение списка объектов ZabbixGroup из ZabbixAPI по фильтру"""
z_groups = self.__get(filter=_filter)
return (self.__make(group) for group in z_groups)
def get_by_name(self, _name: Union[str, List[str]]):
"""Получение списка объекторв ZabbixGroup из ZabbixAPI по имени"""
return self.get_by_filter({'name': _name})
@zapi_exception("Ошибка создания Zabbix группы")
def create(self, groupname: str):
"""Создание нового узлв в ZabbixAPI"""
z_groups = self._zapi.hostgroup.create(name=groupname)
return self.__make({'groupid': z_groups.get('groupids')[0]})
class ZabbixMacroFactory(ZabbixFactory):
def __make(self, macro: dict):
return ZabbixMacro(self._zapi, macro)
@zapi_exception("Ошибка получения Zabbix макроса")
def __get(self, **options) -> list:
return self._zapi.usermacro.get(**options)
def get_by_filter(self, _filter: dict, **options):
"""Получение макроса из ZabbixAPI по фильтру"""
z_macros = self.__get(filter=_filter, **options)
return (self.__make(m) for m in z_macros)
def get_by_macro(self, name: str, value: str):
return self.get_by_filter({'macro': name}, search={'value': value}, searchWildcardsEnabled=True)
def create(self, hostid: int, macro: str, value: str = ''):
"""Создание нового макроса в ZabbixAPI"""
return ZabbixMacro.create(self._zapi, hostid, macro, value)
class ZabbixTemplateFactory(ZabbixFactory):
def __make(self, template: dict):
return ZabbixTemplate(self._zapi, template)
@zapi_exception("Ошибка получения Zabbix шаблона")
def __get(self, **options) -> list:
return self._zapi.template.get(**options)
def get_by_filter(self, _filter: dict, **options):
"""Получение шаблона из ZabbixAPI по фильтру"""
z_templates = self.__get(filter=_filter, **options)
return (self.__make(t) for t in z_templates)
def get_by_name(self, template_name: str):
"""Получение шаблона из ZabbixAPI по имени"""
return self.get_by_filter({'host': template_name})
def get_by_group(self, group: ZabbixGroup):
"""Получение списка узлов ZabbixHost из ZabbixAPI по видимому имени"""
return self.__get(groupids=group.groupid)
class ZabbixInterfaceFactory(ZabbixFactory):
def __make(self, interface: dict):
return ZabbixInterface(self._zapi, interface)
@zapi_exception("Ошибка получения Zabbix узла")
def __get(self, **options) -> list:
return self._zapi.hostinterface.get(**options)
def get_by_id(self, interfaceid: int):
"""Создание объекта ZabbixInterface из ZabbixAPI"""
z_interface = self.__get(interfaceid=interfaceid)[0]
return self.__make(z_interface)
class ZabbixHostFactory(ZabbixFactory):
def __make(self, host: dict):
return ZabbixHost(self._zapi, host)
@zapi_exception("Ошибка получения Zabbix узла")
def __get(self, **options) -> list:
return self._zapi.host.get(**options)
def get_by_id(self, hostid: int):
"""Создание объекта ZabbixHost из ZabbixAPI"""
z_host = self.__get(hostids=hostid)[0]
return self.__make(z_host)
def get_by_filter(self, _filter: dict, **options):
"""Получение списка объектов ZabbixHost из ZabbixAPI по фильтру"""
z_hosts = self.__get(filter=_filter, **options)
return (self.__make(z_host) for z_host in z_hosts)
def get_by_name(self, _name: str):
"""Получение списка узлов ZabbixHost из ZabbixAPI по видимому имени"""
return self.get_by_filter({'host': _name})
def get_by_group(self, group: ZabbixGroup):
"""Получение списка узлов ZabbixHost из ZabbixAPI по видимому имени"""
hosts = self.__get(groupids=group.groupid)
return (self.__make(host) for host in hosts)
def search(self, _search: dict, **options):
"""Поиск в ZabbixAPI"""
z_hosts = self.__get(search=_search, searchWildcardsEnabled=True, **options)
return (self.__make(host) for host in z_hosts)
@zapi_exception("Ошибка создания Zabbix узла")
def create(self, host: dict):
"""Создание узла в ZabbixAPI
:param host: Словарь Zabbix узла.
Обязательные ключи: host, groups, interfaces
:return: Созданный ZabbixHost объект
:rtype: ZabbixHost
"""
if not host.get('host') or not host.get('groups') or not host.get('interfaces'):
raise KeyError
z_host = dict()
z_host['hostid'] = self._zapi.host.create(**host)['hostids'][0]
return self.__make(z_host)
class ZabbixTriggerFactory(ZabbixFactory):
def __make(self, trigger: dict):
host = self._get_host_by_triggerid(int(trigger['triggerid']))
return ZabbixTrigger(host, trigger)
@zapi_exception("Ошибка получения Zabbix узла по триггеру")
def __get(self, **options) -> list:
return self._zapi.trigger.get(**options)
def _get_host_by_triggerid(self, triggerid: int):
z_host = self.__get(
triggerids=triggerid,
selectHosts='extend',
)[0]['hosts'][0]
return ZabbixHost(self._zapi, z_host)
def get_by_id(self, triggerid: int):
z_trigger = self.__get(
triggerids=[triggerid],
expandExpression='true',
expandDescription='true',
expandData='true',
selectHosts='extend',
)[0]
return self.__make(z_trigger)
def get_by_filter(self, _filter: dict, **options):
"""Получение списка Zabbix триггеров из ZabbixAPI по фильтру"""
z_triggers = self.__get(filter=_filter, **options)
return (self.__make(z_trigger) for z_trigger in z_triggers)
class ZabbixEventFactory(ZabbixFactory):
def __make(self, event: dict):
trigger = self._get_trigger_by_eventid(event['eventid'])
return ZabbixEvent(trigger, event)
@zapi_exception("Ошибка получения Zabbix триггера по событию")
def __get(self, **options) -> list:
return self._zapi.event.get(**options)
def _get_trigger_by_eventid(self, eventid: int):
z_event = self.__get(
output=['relatedObject', 'hosts'],
eventids=eventid,
selectRelatedObject='extend',
selectHosts='extend',
)[0]
z_trigger = z_event['relatedObject']
z_host = z_event['hosts'][0]
host = ZabbixHost(self._zapi, z_host)
return ZabbixTrigger(host, z_trigger)
def get_by_id(self, eventid: int):
"""Создание объекта ZabbixEvent из ZabbixAPI"""
z_event = self.__get(eventids=[eventid])[0]
return self.__make(z_event)
def get_by_trigger(self, trigger: ZabbixTrigger, limit=10, **options):
event_get = dict(
objectids=trigger.triggerid,
sortfield=['clock', 'eventid'],
sortorder='DESC', # сортировка от более нового к более старому
limit=limit,
select_acknowledges=['acknowledgeid', 'clock', 'message'],
)
event_get.update(options)
z_events = self.__get(**event_get)
return (self.__make(event) for event in z_events)
class ZabbixProblemFactory(ZabbixEventFactory):
def __make(self, event: dict):
trigger = self._get_trigger_by_eventid(event['eventid'])
return ZabbixProblem(trigger, event)
@zapi_exception("Ошибка получения Zabbix проблем")
def __get(self, **options) -> list:
return self._zapi.problem.get(**options)
def get_by_id(self, eventid: int, recent=False):
z_problems = self.__get(eventid=eventid, recent=recent)
return (self.__make(problem) for problem in z_problems)
def get_by_tag(self, tag: str, limit: int = 500, **options):
z_events = self.__get(
time_from=int(time.time()) - (3 * 86400),
tags=[{'tag': tag}],
acknowledged=False,
suppressed=False,
**options,
)
if z_events is None or len(z_events) >= limit:
return
return (self.__make(event) for event in z_events)
def get_by_groupids(self, groupids: List[int], limit: int = 500, **options):
"""Генератор событий из групп groupids по ZabbixAPI"""
if groupids is None:
groupids = [10] # Группа по-умолчанию - A4
time_from = int(time.time()) - (86400 * 3) # За три последних дня
z_problems = self.__get(
groupids=groupids,
acknowledged=False,
suppressed=False,
time_from=time_from,
**options,
)
if len(z_problems) >= limit:
return
return (self.__make(problem) for problem in z_problems)