diff --git a/config.sample.toml b/config.sample.toml index 36ec450..d1aafd5 100644 --- a/config.sample.toml +++ b/config.sample.toml @@ -12,6 +12,7 @@ username = "Admin" password = "zabbix" dryrun = true failsafe = 20 +timeout = 60 # Zabbix API timeout in seconds (0 = no timeout) tags_prefix = "zac_" managed_inventory = ["location"] diff --git a/setup.cfg b/setup.cfg index 777a927..9db9169 100644 --- a/setup.cfg +++ b/setup.cfg @@ -42,3 +42,4 @@ console_scripts = test = pytest hypothesis + pytest-timeout diff --git a/tests/test_models.py b/tests/test_models.py index ca9017b..7e0c8eb 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -1,4 +1,5 @@ import logging +from typing import Optional import pytest from pydantic import ValidationError from zabbix_auto_config import models @@ -177,3 +178,33 @@ def test_zacsettings_log_level_serialize() -> None: # Serialize to JSON: settings_json = settings.model_dump_json() assert '"log_level":"INFO"' in settings_json + +@pytest.mark.parametrize( + "timeout,expect", + [ + (1, 1), + (60, 60), + (1234, 1234), + (0, None), + pytest.param( + -1, + None, + marks=pytest.mark.xfail( + reason="Timeout must be 0 or greater.", + strict=True, + raises=ValidationError, + ), + id="-1" + ) + ] +) +def test_zabbix_settings_timeout(timeout: int, expect: Optional[int]) -> None: + settings = models.ZabbixSettings( + map_dir="", + url="", + username="", + password="", + dryrun=False, + timeout=timeout, + ) + assert settings.timeout == expect \ No newline at end of file diff --git a/tests/test_processing/test_sourcecollectorprocess.py b/tests/test_processing/test_sourcecollectorprocess.py index b841063..f507274 100644 --- a/tests/test_processing/test_sourcecollectorprocess.py +++ b/tests/test_processing/test_sourcecollectorprocess.py @@ -2,6 +2,8 @@ import time from typing import List +import pytest + from zabbix_auto_config.processing import SourceCollectorProcess from zabbix_auto_config.models import Host, SourceCollectorSettings @@ -16,12 +18,7 @@ def collect(*args, **kwargs) -> List[Host]: return [host, host] -class FaultySourceCollector: - @staticmethod - def collect(*args, **kwargs) -> List[Host]: - raise Exception("Source collector error!!") - - +@pytest.mark.timeout(5) def test_source_collector_process(): process = SourceCollectorProcess( name="test-source", @@ -38,25 +35,25 @@ def test_source_collector_process(): source_hosts_queue=multiprocessing.Queue(), ) - process.start() + try: + process.start() + hosts = process.source_hosts_queue.get() + assert len(hosts["hosts"]) == 2 + assert hosts["hosts"][0].hostname == "foo.example.com" + assert process.state["ok"] is True + finally: + process.stop_event.set() + process.join(timeout=0.01) - # FIXME: this is potentially flaky! - # We wait for the process to start up completely before we - # set the stop event. In order to not have to rewrite the class, - # we just wait for a bit. This is not ideal. - # The alternative is passing in a an Event which is set when the class - # enters run() for the first time. However, this would require a rewrite - # of the class and all callers of it, so we'll just wait for now. - time.sleep(0.5) # wait for process to start - process.stop_event.set() - process.join(timeout=0.1) - hosts = process.source_hosts_queue.get() - assert len(hosts["hosts"]) == 2 - assert hosts["hosts"][0].hostname == "foo.example.com" - assert process.state["ok"] is True +# NOTE: Has to be defined in the global scope to be pickleable by multiprocessing +class FaultySourceCollector: + @staticmethod + def collect(*args, **kwargs) -> List[Host]: + raise Exception("Source collector error!!") +@pytest.mark.timeout(5) def test_source_collector_disable_on_failure(): process = SourceCollectorProcess( name="test-source", @@ -72,17 +69,14 @@ def test_source_collector_disable_on_failure(): ), source_hosts_queue=multiprocessing.Queue(), ) - # FIXME: potentially flaky test! - # In addition to the problem described in the test above, - # if we terminate the process before it has the chance to - # set state["ok"] to False, the test will fail. - process.start() - time.sleep(1.5) # wait for process to start - process.stop_event.set() - process.join(timeout=0.5) - - assert process.state["ok"] is False - assert process.source_hosts_queue.empty() is True - # TODO: assert that process is disabled. - # We probably need to add disablement info to state dict + # Start process and wait until it fails + try: + process.start() + while process.state["ok"] is True: + time.sleep(0.01) + assert process.state["ok"] is False + assert process.source_hosts_queue.empty() is True + process.stop_event.set() + finally: + process.join(timeout=0.01) diff --git a/tests/test_processing/test_zabbixupdater.py b/tests/test_processing/test_zabbixupdater.py new file mode 100644 index 0000000..5b51f4c --- /dev/null +++ b/tests/test_processing/test_zabbixupdater.py @@ -0,0 +1,83 @@ +import multiprocessing +from pathlib import Path +import time +from unittest.mock import MagicMock, patch, Mock +import pytest +import requests +from zabbix_auto_config import exceptions + +from zabbix_auto_config.models import ZabbixSettings +from zabbix_auto_config.processing import ZabbixUpdater + + +def raises_connect_timeout(*args, **kwargs): + raise requests.exceptions.ConnectTimeout("connect timeout") + + +@pytest.mark.timeout(10) +@patch("psycopg2.connect", MagicMock()) # throwaway mock +def test_zabbixupdater_connect_timeout(): + with pytest.raises(exceptions.ZACException) as exc_info: + with patch( + "pyzabbix.ZabbixAPI.login", new_callable=lambda: raises_connect_timeout + ): + ZabbixUpdater( + name="connect-timeout", + db_uri="", + state=multiprocessing.Manager().dict(), + zabbix_config=ZabbixSettings( + map_dir="", + url="", + username="", + password="", + dryrun=False, + timeout=1, + ), + ) + assert "connect timeout" in exc_info.exconly() + + +class TimeoutUpdater(ZabbixUpdater): + def do_update(self): + raise requests.exceptions.ReadTimeout("read timeout") + + +class PickableMock(MagicMock): + def __reduce__(self): + return (MagicMock, ()) + + +@pytest.mark.timeout(5) +@patch("psycopg2.connect", PickableMock()) +@patch("pyzabbix.ZabbixAPI", PickableMock()) +def test_zabbixupdater_read_timeout(tmp_path: Path): + # TODO: use mapping file fixtures from #67 + map_dir = tmp_path / "maps" + map_dir.mkdir() + (map_dir / "property_template_map.txt").touch() + (map_dir / "property_hostgroup_map.txt").touch() + (map_dir / "siteadmin_hostgroup_map.txt").touch() + + process = TimeoutUpdater( + name="read-timeout", + db_uri="", + state=multiprocessing.Manager().dict(), + zabbix_config=ZabbixSettings( + map_dir=str(map_dir), + url="", + username="", + password="", + dryrun=False, + timeout=1, + ), + ) + + # Start the process and wait for it to be marked as unhealthy + try: + process.start() + while process.state["ok"] is True: + time.sleep(0.1) + assert process.state["ok"] is False + process.stop_event.set() + finally: + process.join(timeout=0.01) diff --git a/zabbix_auto_config/models.py b/zabbix_auto_config/models.py index 31b1473..30a6213 100644 --- a/zabbix_auto_config/models.py +++ b/zabbix_auto_config/models.py @@ -46,6 +46,11 @@ class ZabbixSettings(ConfigBaseModel): username: str password: str dryrun: bool + timeout: Optional[int] = Field( + 60, + description="The timeout in seconds for HTTP requests to Zabbix.", + ge=0, + ) tags_prefix: str = "zac_" managed_inventory: List[str] = [] @@ -66,6 +71,12 @@ class ZabbixSettings(ConfigBaseModel): # These groups are not managed by ZAC beyond creating them. extra_siteadmin_hostgroup_prefixes: Set[str] = set() + @field_validator("timeout") + @classmethod + def _validate_timeout(cls, v: Optional[int]) -> Optional[int]: + if v == 0: + return None + return v class ZacSettings(ConfigBaseModel): source_collector_dir: str diff --git a/zabbix_auto_config/processing.py b/zabbix_auto_config/processing.py index ec99e8d..8b4fb94 100644 --- a/zabbix_auto_config/processing.py +++ b/zabbix_auto_config/processing.py @@ -66,6 +66,9 @@ def run(self): except exceptions.ZACException as e: logging.error("Work exception: %s", str(e)) self.state["ok"] = False + except requests.exceptions.Timeout as e: + logging.error("Timeout exception: %s", str(e)) + self.state["ok"] = False if self.update_interval > 1 and self.next_update < datetime.datetime.now(): # Only log warning when update_interval is actually changed from default @@ -561,7 +564,10 @@ def __init__(self, name, state, db_uri, zabbix_config: models.ZabbixSettings): pyzabbix_logger = logging.getLogger("pyzabbix") pyzabbix_logger.setLevel(logging.ERROR) - self.api = pyzabbix.ZabbixAPI(self.config.url) + self.api = pyzabbix.ZabbixAPI( + self.config.url, + timeout=self.config.timeout, # timeout for connect AND read + ) try: self.api.login(self.config.username, self.config.password) except requests.exceptions.ConnectionError as e: @@ -570,6 +576,11 @@ def __init__(self, name, state, db_uri, zabbix_config: models.ZabbixSettings): except (pyzabbix.ZabbixAPIException, requests.exceptions.HTTPError) as e: logging.error("Unable to login to Zabbix API: %s", str(e)) raise exceptions.ZACException(*e.args) + except requests.exceptions.Timeout as e: + logging.error( + "Timed out while connecting to Zabbix API: %s", self.config.url + ) + raise exceptions.ZACException(*e.args) self.property_template_map = utils.read_map_file( os.path.join(self.config.map_dir, "property_template_map.txt")