Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Zabbix API timeout #69

Merged
merged 3 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ username = "Admin"
password = "zabbix"
dryrun = true
failsafe = 20
timeout = 60 # Zabbix API timeout in seconds (0 = no timeout)
tags_prefix = "zac_"
managed_inventory = ["location"]

Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,4 @@ console_scripts =
test =
pytest
hypothesis
pytest-timeout
31 changes: 31 additions & 0 deletions tests/test_models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from typing import Optional
import pytest
from pydantic import ValidationError
from zabbix_auto_config import models
Expand Down Expand Up @@ -177,3 +178,33 @@ def test_zacsettings_log_level_serialize() -> None:
# Serialize to JSON:
settings_json = settings.model_dump_json()
assert '"log_level":"INFO"' in settings_json

@pytest.mark.parametrize(
"timeout,expect",
[
(1, 1),
(60, 60),
(1234, 1234),
(0, None),
pytest.param(
-1,
None,
marks=pytest.mark.xfail(
reason="Timeout must be 0 or greater.",
strict=True,
raises=ValidationError,
),
id="-1"
)
]
)
def test_zabbix_settings_timeout(timeout: int, expect: Optional[int]) -> None:
settings = models.ZabbixSettings(
map_dir="",
url="",
username="",
password="",
dryrun=False,
timeout=timeout,
)
assert settings.timeout == expect
62 changes: 28 additions & 34 deletions tests/test_processing/test_sourcecollectorprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import time
from typing import List

import pytest

from zabbix_auto_config.processing import SourceCollectorProcess
from zabbix_auto_config.models import Host, SourceCollectorSettings

Expand All @@ -16,12 +18,7 @@ def collect(*args, **kwargs) -> List[Host]:
return [host, host]


class FaultySourceCollector:
@staticmethod
def collect(*args, **kwargs) -> List[Host]:
raise Exception("Source collector error!!")


@pytest.mark.timeout(5)
def test_source_collector_process():
process = SourceCollectorProcess(
name="test-source",
Expand All @@ -38,25 +35,25 @@ def test_source_collector_process():
source_hosts_queue=multiprocessing.Queue(),
)

process.start()
try:
process.start()
hosts = process.source_hosts_queue.get()
assert len(hosts["hosts"]) == 2
assert hosts["hosts"][0].hostname == "foo.example.com"
assert process.state["ok"] is True
finally:
process.stop_event.set()
process.join(timeout=0.01)

# FIXME: this is potentially flaky!
# We wait for the process to start up completely before we
# set the stop event. In order to not have to rewrite the class,
# we just wait for a bit. This is not ideal.
# The alternative is passing in a an Event which is set when the class
# enters run() for the first time. However, this would require a rewrite
# of the class and all callers of it, so we'll just wait for now.
time.sleep(0.5) # wait for process to start
process.stop_event.set()
process.join(timeout=0.1)

hosts = process.source_hosts_queue.get()
assert len(hosts["hosts"]) == 2
assert hosts["hosts"][0].hostname == "foo.example.com"
assert process.state["ok"] is True
# NOTE: Has to be defined in the global scope to be pickleable by multiprocessing
class FaultySourceCollector:
@staticmethod
def collect(*args, **kwargs) -> List[Host]:
raise Exception("Source collector error!!")


@pytest.mark.timeout(5)
def test_source_collector_disable_on_failure():
process = SourceCollectorProcess(
name="test-source",
Expand All @@ -72,17 +69,14 @@ def test_source_collector_disable_on_failure():
),
source_hosts_queue=multiprocessing.Queue(),
)
# FIXME: potentially flaky test!
# In addition to the problem described in the test above,
# if we terminate the process before it has the chance to
# set state["ok"] to False, the test will fail.
process.start()
time.sleep(1.5) # wait for process to start
process.stop_event.set()
process.join(timeout=0.5)

assert process.state["ok"] is False
assert process.source_hosts_queue.empty() is True

# TODO: assert that process is disabled.
# We probably need to add disablement info to state dict
# Start process and wait until it fails
try:
process.start()
while process.state["ok"] is True:
time.sleep(0.01)
assert process.state["ok"] is False
assert process.source_hosts_queue.empty() is True
process.stop_event.set()
finally:
process.join(timeout=0.01)
83 changes: 83 additions & 0 deletions tests/test_processing/test_zabbixupdater.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import multiprocessing
from pathlib import Path
import time
from unittest.mock import MagicMock, patch, Mock
import pytest
import requests
from zabbix_auto_config import exceptions

from zabbix_auto_config.models import ZabbixSettings
from zabbix_auto_config.processing import ZabbixUpdater


def raises_connect_timeout(*args, **kwargs):
raise requests.exceptions.ConnectTimeout("connect timeout")


@pytest.mark.timeout(10)
@patch("psycopg2.connect", MagicMock()) # throwaway mock
def test_zabbixupdater_connect_timeout():
with pytest.raises(exceptions.ZACException) as exc_info:
with patch(
"pyzabbix.ZabbixAPI.login", new_callable=lambda: raises_connect_timeout
):
ZabbixUpdater(
name="connect-timeout",
db_uri="",
state=multiprocessing.Manager().dict(),
zabbix_config=ZabbixSettings(
map_dir="",
url="",
username="",
password="",
dryrun=False,
timeout=1,
),
)
assert "connect timeout" in exc_info.exconly()


class TimeoutUpdater(ZabbixUpdater):
def do_update(self):
raise requests.exceptions.ReadTimeout("read timeout")


class PickableMock(MagicMock):
def __reduce__(self):
return (MagicMock, ())


@pytest.mark.timeout(5)
@patch("psycopg2.connect", PickableMock())
@patch("pyzabbix.ZabbixAPI", PickableMock())
def test_zabbixupdater_read_timeout(tmp_path: Path):
# TODO: use mapping file fixtures from #67
map_dir = tmp_path / "maps"
map_dir.mkdir()
(map_dir / "property_template_map.txt").touch()
(map_dir / "property_hostgroup_map.txt").touch()
(map_dir / "siteadmin_hostgroup_map.txt").touch()

process = TimeoutUpdater(
name="read-timeout",
db_uri="",
state=multiprocessing.Manager().dict(),
zabbix_config=ZabbixSettings(
map_dir=str(map_dir),
url="",
username="",
password="",
dryrun=False,
timeout=1,
),
)

# Start the process and wait for it to be marked as unhealthy
try:
process.start()
while process.state["ok"] is True:
time.sleep(0.1)
assert process.state["ok"] is False
process.stop_event.set()
finally:
process.join(timeout=0.01)
11 changes: 11 additions & 0 deletions zabbix_auto_config/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ class ZabbixSettings(ConfigBaseModel):
username: str
password: str
dryrun: bool
timeout: Optional[int] = Field(
60,
description="The timeout in seconds for HTTP requests to Zabbix.",
ge=0,
)

tags_prefix: str = "zac_"
managed_inventory: List[str] = []
Expand All @@ -66,6 +71,12 @@ class ZabbixSettings(ConfigBaseModel):
# These groups are not managed by ZAC beyond creating them.
extra_siteadmin_hostgroup_prefixes: Set[str] = set()

@field_validator("timeout")
@classmethod
def _validate_timeout(cls, v: Optional[int]) -> Optional[int]:
if v == 0:
return None
return v

class ZacSettings(ConfigBaseModel):
source_collector_dir: str
Expand Down
13 changes: 12 additions & 1 deletion zabbix_auto_config/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ def run(self):
except exceptions.ZACException as e:
logging.error("Work exception: %s", str(e))
self.state["ok"] = False
except requests.exceptions.Timeout as e:
logging.error("Timeout exception: %s", str(e))
self.state["ok"] = False

if self.update_interval > 1 and self.next_update < datetime.datetime.now():
# Only log warning when update_interval is actually changed from default
Expand Down Expand Up @@ -561,7 +564,10 @@ def __init__(self, name, state, db_uri, zabbix_config: models.ZabbixSettings):
pyzabbix_logger = logging.getLogger("pyzabbix")
pyzabbix_logger.setLevel(logging.ERROR)

self.api = pyzabbix.ZabbixAPI(self.config.url)
self.api = pyzabbix.ZabbixAPI(
self.config.url,
timeout=self.config.timeout, # timeout for connect AND read
)
try:
self.api.login(self.config.username, self.config.password)
except requests.exceptions.ConnectionError as e:
Expand All @@ -570,6 +576,11 @@ def __init__(self, name, state, db_uri, zabbix_config: models.ZabbixSettings):
except (pyzabbix.ZabbixAPIException, requests.exceptions.HTTPError) as e:
logging.error("Unable to login to Zabbix API: %s", str(e))
raise exceptions.ZACException(*e.args)
except requests.exceptions.Timeout as e:
logging.error(
"Timed out while connecting to Zabbix API: %s", self.config.url
)
raise exceptions.ZACException(*e.args)

self.property_template_map = utils.read_map_file(
os.path.join(self.config.map_dir, "property_template_map.txt")
Expand Down
Loading