Skip to content

Commit

Permalink
Additional improvements to the ZMQHandler reliability
Browse files Browse the repository at this point in the history
  • Loading branch information
s0undt3ch committed Jun 16, 2021
1 parent 255f1c1 commit b8e1d08
Show file tree
Hide file tree
Showing 5 changed files with 357 additions and 29 deletions.
1 change: 1 addition & 0 deletions .pylint-spelling-words
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ globals
hookspec
hostname
https
hwm
ico
ie
illumos
Expand Down
Empty file.
91 changes: 62 additions & 29 deletions src/saltfactories/utils/salt/log_handlers/pytest_log_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
Salt External Logging Handler
"""
import atexit
import copy
import logging
import os
import pprint
import socket
import sys
import time
import traceback

try:
Expand Down Expand Up @@ -139,29 +139,41 @@ class ZMQHandler(ExcInfoOnLogLevelFormatMixin, logging.Handler, NewStyleClassMix
# pid attribute against the current process pid. If it's not a match, we
# reconnect the ZMQ machinery.

def __init__(self, host="127.0.0.1", port=3330, log_prefix=None, level=logging.NOTSET):
def __init__(
self, host="127.0.0.1", port=3330, log_prefix=None, level=logging.NOTSET, socket_hwm=100000
):
super(ZMQHandler, self).__init__(level=level)
self.pid = None
self.host = host
self.port = port
self._log_prefix = log_prefix
self.socket_hwm = socket_hwm
self.log_prefix = self._get_log_prefix(log_prefix)
self.context = self.pusher = None
self._exiting = False
self.dropped_messages_count = 0
# We set the formatter so that we only include the actual log message and not any other
# fields found in the log record
self.__formatter = logging.Formatter("%(message)s")

def __getstate__(self):
return {
"host": self.host,
"port": self.port,
"log_prefix": self._log_prefix,
"level": self.level,
"socket_hwm": self.socket_hwm,
}

def __setstate__(self, state):
self.__init__(**state)
self.stop()
self._exiting = False

def __repr__(self):
return "<{} host={} port={} level={}>".format(
self.__class__.__name__, self.host, self.port, logging.getLevelName(self.level)
)

def _get_log_prefix(self, log_prefix):
if log_prefix is None:
return
Expand All @@ -172,20 +184,28 @@ def _get_log_prefix(self, log_prefix):
cli_name = os.path.basename(sys.argv[cli_arg_idx])
return log_prefix.format(cli_name=cli_name)

def start(self):
if self.pid and self.pid != os.getpid():
# This is not the starting pid, reconnect
self.stop(flush=False)
self._exiting = False
def _get_formatter(self):
return self.__formatter

def _set_formatter(self, fmt):
if fmt is not None:
self.setFormatter(fmt)

def setFormatter(self, fmt):
raise RuntimeError("Do not set a formatter on {}".format(self.__class__.__name__))

# We set formatter as a property to purposely blow up
formatter = property(_get_formatter, _set_formatter)

def start(self):
if self._exiting is True:
return

if self.pusher is not None:
# We're running ...
return

atexit.register(self.stop)
self.dropped_messages_count = 0
context = pusher = None
try:
context = zmq.Context()
Expand All @@ -202,7 +222,7 @@ def start(self):

try:
pusher = context.socket(zmq.PUSH)
pusher.set_hwm(100000)
pusher.set_hwm(self.socket_hwm)
pusher.connect("tcp://{}:{}".format(self.host, self.port))
self.pusher = pusher
except zmq.ZMQError as exc:
Expand All @@ -219,31 +239,24 @@ def start(self):
self._exiting = False
return

self.pid = os.getpid()

def stop(self, flush=True):
def stop(self):
if self._exiting:
return

self._exiting = True

try:
atexit.unregister(self.stop)
except AttributeError: # pragma: no cover
# Python 2
try:
atexit._exithandlers.remove((self.stop, (), {}))
except ValueError:
# The exit handler isn't registered
pass
if self.dropped_messages_count:
sys.stderr.write(
"Dropped {} messages from getting forwarded. High water mark reached...\n".format(
self.dropped_messages_count
)
)
sys.stderr.flush()

try:
if self.pusher is not None and not self.pusher.closed:
if flush is True:
# Give it 1.5 seconds to flush any messages in it's queue
self.pusher.close(1500)
else:
self.pusher.close()
# Give it 1.5 seconds to flush any messages in it's queue
self.pusher.close(1500)
self.pusher = None
if self.context is not None and not self.context.closed:
self.context.term()
Expand All @@ -258,7 +271,7 @@ def stop(self, flush=True):
sys.stderr.flush()
raise
finally:
self.context = self.pusher = self.pid = None
self.context = self.pusher = None

def format(self, record):
msg = super(ZMQHandler, self).format(record)
Expand Down Expand Up @@ -310,14 +323,34 @@ def emit(self, record):
try:
msg = self.prepare(record)
if msg:
self.pusher.send(msg)
try:
self._send_message(msg)
except zmq.error.Again:
# Sleep a little and give up
time.sleep(0.001)
try:
self._send_message(msg)
except zmq.error.Again:
pass
# We can't send it nor queue it for send.
# Drop it, otherwise, this call blocks until we can at least queue the message
self.dropped_messages_count += 1
except (SystemExit, KeyboardInterrupt): # pragma: no cover pylint: disable=try-except-raise
# Catch and raise SystemExit and KeyboardInterrupt so that we can handle
# all other exception below
raise
except Exception: # pragma: no cover pylint: disable=broad-except
self.handleError(record)

def _send_message(self, msg):
self.pusher.send(msg, flags=zmq.NOBLOCK)
if self.dropped_messages_count:
logging.getLogger(__name__).debug(
"Dropped %s messages from getting forwarded. High water mark reached...",
self.dropped_messages_count,
)
self.dropped_messages_count = 0

def close(self):
"""
Tidy up any resources used by the handler.
Expand Down
Empty file.
Loading

0 comments on commit b8e1d08

Please sign in to comment.