From 260f01da2451a9eaf67b6c761cb5e83c6fc3fc48 Mon Sep 17 00:00:00 2001 From: Pedro Algarvio Date: Thu, 17 Jun 2021 11:17:19 +0100 Subject: [PATCH] ZMQ needs to reconnect on forked processes(follow up to #64) --- .pylint-spelling-words | 2 + changelog/69.improvement.rst | 1 + .../log_handlers/pytest_log_handler.py | 19 ++++++++- tests/integration/utils/__init__.py | 0 tests/integration/utils/salt/__init__.py | 0 .../utils/salt/test_log_handlers.py | 41 +++++++++++++++++++ 6 files changed, 62 insertions(+), 1 deletion(-) create mode 100644 changelog/69.improvement.rst create mode 100644 tests/integration/utils/__init__.py create mode 100644 tests/integration/utils/salt/__init__.py create mode 100644 tests/integration/utils/salt/test_log_handlers.py diff --git a/.pylint-spelling-words b/.pylint-spelling-words index 4b555200..6afcc82e 100644 --- a/.pylint-spelling-words +++ b/.pylint-spelling-words @@ -84,6 +84,7 @@ maxdepth minify msg msgpack +mworker namespace netbsd ng @@ -102,6 +103,7 @@ popen pragma prepend proc +processpayload processresult proxied psutil diff --git a/changelog/69.improvement.rst b/changelog/69.improvement.rst new file mode 100644 index 00000000..d0350a59 --- /dev/null +++ b/changelog/69.improvement.rst @@ -0,0 +1 @@ +ZMQ needs to reconnect on forked processes or else Salt's own multiprocessing log forwarding log records won't be logged by the ``ZMQHandler`` diff --git a/src/saltfactories/utils/saltext/log_handlers/pytest_log_handler.py b/src/saltfactories/utils/saltext/log_handlers/pytest_log_handler.py index 914772a4..ddc3cd07 100644 --- a/src/saltfactories/utils/saltext/log_handlers/pytest_log_handler.py +++ b/src/saltfactories/utils/saltext/log_handlers/pytest_log_handler.py @@ -130,6 +130,16 @@ def setup_handlers(): class ZMQHandler(ExcInfoOnLogLevelFormatMixin, logging.Handler, NewStyleClassMixin): + + # We implement a lazy start approach which is deferred until sending the + # first message because, logging handlers, on platforms which support + # forking, are inherited by forked processes, and we want to minimize the ZMQ + # machinery inherited. + # For the cases where the ZMQ machinery is still inherited because a + # process was forked after ZMQ has been prepped up, we check the handler's + # pid attribute against the current process pid. If it's not a match, we + # reconnect the ZMQ machinery. + def __init__( self, host="127.0.0.1", port=3330, log_prefix=None, level=logging.NOTSET, socket_hwm=100000 ): @@ -145,6 +155,7 @@ def __init__( # We set the formatter so that we only include the actual log message and not any other # fields found in the log record self.__formatter = logging.Formatter("%(message)s") + self.pid = os.getpid() def _get_formatter(self): return self.__formatter @@ -192,6 +203,10 @@ def _get_log_prefix(self, log_prefix): return log_prefix.format(cli_name=cli_name) def start(self): + if self.pid != os.getpid(): + self.stop() + self._exiting = False + if self._exiting is True: return @@ -233,6 +248,8 @@ def start(self): self._exiting = False return + self.pid = os.getpid() + def stop(self): if self._exiting: return @@ -265,7 +282,7 @@ def stop(self): sys.stderr.flush() raise finally: - self.context = self.pusher = None + self.context = self.pusher = self.pid = None def format(self, record): msg = super(ZMQHandler, self).format(record) diff --git a/tests/integration/utils/__init__.py b/tests/integration/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/integration/utils/salt/__init__.py b/tests/integration/utils/salt/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/integration/utils/salt/test_log_handlers.py b/tests/integration/utils/salt/test_log_handlers.py new file mode 100644 index 00000000..75dd86e4 --- /dev/null +++ b/tests/integration/utils/salt/test_log_handlers.py @@ -0,0 +1,41 @@ +import logging + +import pytest + +from saltfactories.utils import random_string + + +@pytest.fixture(scope="module") +def master(salt_factories): + factory = salt_factories.salt_master_daemon(random_string("master-")) + with factory.started(): + yield factory + + +@pytest.fixture(scope="module") +def minion(master): + factory = master.salt_minion_daemon(random_string("minion-")) + with factory.started(): + yield factory + + +@pytest.fixture +def salt_cli(master): + return master.get_salt_cli() + + +def test_logs_forwarded_from_sub_processes(salt_cli, minion, caplog): + assert minion.is_running() + + with caplog.at_level(logging.DEBUG): + ret = salt_cli.run("test.ping", minion_tgt=minion.id) + assert ret.exitcode == 0, ret + assert ret.json is True + + non_main_processes_count = 0 + for record in caplog.records: + if record.processName != "MainProcess": + non_main_processes_count += 1 + + # We should see at least a log record from the MWorker and ProcessPayload processes + assert non_main_processes_count >= 2