Skip to content

Commit

Permalink
ZMQ needs to reconnect on forked processes(follow up to saltstack#64)
Browse files Browse the repository at this point in the history
  • Loading branch information
s0undt3ch committed Jun 17, 2021
1 parent 7e510bf commit 85fbb47
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 1 deletion.
2 changes: 2 additions & 0 deletions .pylint-spelling-words
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ maxdepth
minify
msg
msgpack
mworker
namespace
netbsd
ng
Expand All @@ -102,6 +103,7 @@ popen
pragma
prepend
proc
processpayload
processresult
proxied
psutil
Expand Down
1 change: 1 addition & 0 deletions changelog/69.improvement.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ZMQ needs to reconnect on forked processes or else Salt's own multiprocessing log forwarding log records won't be logged by the ``ZMQHandler``
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,16 @@ def setup_handlers():


class ZMQHandler(ExcInfoOnLogLevelFormatMixin, logging.Handler, NewStyleClassMixin):

# We implement a lazy start approach which is deferred until sending the
# first message because, logging handlers, on platforms which support
# forking, are inherited by forked processes, and we want to minimize the ZMQ
# machinery inherited.
# For the cases where the ZMQ machinery is still inherited because a
# process was forked after ZMQ has been prepped up, we check the handler's
# pid attribute against the current process pid. If it's not a match, we
# reconnect the ZMQ machinery.

def __init__(
self, host="127.0.0.1", port=3330, log_prefix=None, level=logging.NOTSET, socket_hwm=100000
):
Expand All @@ -145,6 +155,7 @@ def __init__(
# We set the formatter so that we only include the actual log message and not any other
# fields found in the log record
self.__formatter = logging.Formatter("%(message)s")
self.pid = os.getpid()

def _get_formatter(self):
return self.__formatter
Expand Down Expand Up @@ -192,6 +203,10 @@ def _get_log_prefix(self, log_prefix):
return log_prefix.format(cli_name=cli_name)

def start(self):
if self.pid != os.getpid():
self.stop()
self._exiting = False

if self._exiting is True:
return

Expand Down Expand Up @@ -233,6 +248,8 @@ def start(self):
self._exiting = False
return

self.pid = os.getpid()

def stop(self):
if self._exiting:
return
Expand Down Expand Up @@ -265,7 +282,7 @@ def stop(self):
sys.stderr.flush()
raise
finally:
self.context = self.pusher = None
self.context = self.pusher = self.pid = None

def format(self, record):
msg = super(ZMQHandler, self).format(record)
Expand Down
Empty file.
Empty file.
41 changes: 41 additions & 0 deletions tests/integration/utils/salt/test_log_handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import logging

import pytest

from saltfactories.utils import random_string


@pytest.fixture(scope="module")
def master(salt_factories):
factory = salt_factories.salt_master_daemon(random_string("master-"))
with factory.started():
yield factory


@pytest.fixture(scope="module")
def minion(master):
factory = master.salt_minion_daemon(random_string("minion-"))
with factory.started():
yield factory


@pytest.fixture
def salt_cli(master):
return master.get_salt_cli()


def test_logs_forwarded_from_sub_processes(salt_cli, minion, caplog):
assert minion.is_running()

with caplog.at_level(logging.DEBUG):
ret = salt_cli.run("test.ping", minion_tgt=minion.id)
assert ret.exitcode == 0, ret
assert ret.json is True

non_main_processes_count = 0
for record in caplog.records:
if record.processName != "MainProcess":
non_main_processes_count += 1

# We should see at least a log record from the MWorker and ProcessPayload processes
assert non_main_processes_count >= 2

0 comments on commit 85fbb47

Please sign in to comment.