Skip to content

Commit

Permalink
Several fixes/improvements to the ZMQHandler log forwarding handler
Browse files Browse the repository at this point in the history
  • Loading branch information
s0undt3ch committed Jun 14, 2021
1 parent 0b416b1 commit e23bfc3
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 40 deletions.
1 change: 1 addition & 0 deletions changelog/64.improvement.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Several fixes/improvements to the ``ZMQHandler`` log forwarding handler
141 changes: 101 additions & 40 deletions src/saltfactories/utils/salt/log_handlers/pytest_log_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import socket
import sys
import threading
import time
import traceback

try:
Expand Down Expand Up @@ -147,7 +148,7 @@ class ZMQHandler(ExcInfoOnLogLevelFormatMixin, logging.Handler, NewStyleClassMix
# machinery inherited.
# For the cases where the ZMQ machinery is still inherited because a
# process was forked after ZMQ has been prepped up, we check the handler's
# pid attribute against, the current process pid. If it's not a match, we
# pid attribute against the current process pid. If it's not a match, we
# reconnect the ZMQ machinery.

def __init__(self, host="127.0.0.1", port=3330, log_prefix=None, level=logging.NOTSET):
Expand All @@ -156,6 +157,7 @@ def __init__(self, host="127.0.0.1", port=3330, log_prefix=None, level=logging.N
self.push_address = "tcp://{}:{}".format(host, port)
self.log_prefix = self._get_log_prefix(log_prefix)
self.context = self.proxy_address = self.in_proxy = self.proxy_thread = None
self.running_event = threading.Event()
self._exiting = False

def _get_log_prefix(self, log_prefix):
Expand All @@ -177,6 +179,7 @@ def start(self):
return

if self.in_proxy is not None:
# We're running ...
return

atexit.register(self.stop)
Expand All @@ -186,22 +189,38 @@ def start(self):
self.context = context
except zmq.ZMQError as exc:
sys.stderr.write(
"Failed to create the ZMQ Context: {}\n{}\n".format(exc, traceback.format_exc(exc))
"Failed to create the ZMQ Context: {}\n{}\n".format(exc, traceback.format_exc())
)
sys.stderr.flush()
self.stop()
# Allow the handler to re-try starting
self._exiting = False
return

# Let's start the proxy thread
socket_bind_event = threading.Event()
self.proxy_thread = threading.Thread(
target=self._proxy_logs_target, args=(socket_bind_event,)
target=self._proxy_logs_target, args=(socket_bind_event, self.running_event)
)
self.proxy_thread.daemon = True
self.proxy_thread.start()
# Now that we discovered which random port to use, let's continue with the setup
if socket_bind_event.wait(5) is not True:
sys.stderr.write("Failed to bind the ZMQ socket PAIR\n")
sys.stderr.flush()
socket_bind_event.clear()
self.running_event.clear()
context.term()
if self.proxy_thread.is_alive():
# Wait for the thread to terminate
self.proxy_thread.join(5)
if self.proxy_thread.is_alive():
# Hmm.. Still alive?!
# Wait a little longer
self.proxy_thread.join(5)
self.context = self.proxy_thread = None
self.stop()
# Allow the handler to re-try starting
self._exiting = False
return

# And we can now also connect the messages input side of the proxy
Expand All @@ -214,17 +233,34 @@ def start(self):
if in_proxy is not None:
in_proxy.close(1000)
sys.stderr.write(
"Failed to bind the ZMQ PAIR socket: {}\n{}\n".format(
exc, traceback.format_exc(exc)
)
"Failed to bind the ZMQ PAIR socket: {}\n{}\n".format(exc, traceback.format_exc())
)
sys.stderr.flush()
socket_bind_event.clear()
self.running_event.clear()
self.context.term()
if self.proxy_thread.is_alive():
# Wait for the thread to terminate
self.proxy_thread.join(5)
if self.proxy_thread.is_alive():
# Hmm.. Still alive?!
# Wait a little longer
self.proxy_thread.join(5)
self.context = self.proxy_thread = self.proxy_address = None
self.stop()
# Allow the handler to re-try starting
self._exiting = False
return

self.pid = os.getpid() # In case we're restarting
self.running_event.set()

def stop(self):
if self._exiting:
return

self._exiting = True
self.running_event.clear()

try:
atexit.unregister(self.stop)
Expand All @@ -237,16 +273,27 @@ def stop(self):
pass

try:
if self.in_proxy is not None:
self.in_proxy.send(msgpack.dumps(None))
if self.in_proxy is not None and not self.in_proxy.closed:
# Give it 1.5 seconds to flush any messages in it's queue
self.in_proxy.close(1500)
if self.context is not None:
self.in_proxy = None
if self.context is not None and not self.context.closed:
self.context.term()
self.context = None
if self.proxy_thread is not None and self.proxy_thread.is_alive():
# Wait for the thread to terminate
self.proxy_thread.join(5)
if self.proxy_thread.is_alive():
# Hmm.. Still alive?!
# Wait a little longer
self.proxy_thread.join(5)
self.proxy_thread = None
except (SystemExit, KeyboardInterrupt): # pragma: no cover pylint: disable=try-except-raise
# Don't write these exceptions to stderr
raise
except Exception as exc: # pragma: no cover pylint: disable=broad-except
sys.stderr.write(
"Failed to terminate ZMQHandler: {}\n{}\n".format(exc, traceback.format_exc(exc))
"Failed to terminate ZMQHandler: {}\n{}\n".format(exc, traceback.format_exc())
)
sys.stderr.flush()
raise
Expand Down Expand Up @@ -274,9 +321,12 @@ def prepare(self, record):
return msgpack.dumps(record.__dict__, use_bin_type=True)
except TypeError as exc:
# Failed to serialize something with msgpack
logging.getLogger(__name__).error(
"Failed to serialize log record: %s.\n%s", exc, pprint.pformat(record.__dict__)
sys.stderr.write(
"Failed to serialize log record:{}.\n{}\nLog Record:\n{}\n".format(
exc, traceback.format_exc(), pprint.pformat(record.__dict__)
)
)
sys.stderr.flush()
self.handleError(record)

def emit(self, record):
Expand All @@ -300,12 +350,14 @@ def emit(self, record):
try:
msg = self.prepare(record)
self.in_proxy.send(msg)
except SystemExit:
pass
except (SystemExit, KeyboardInterrupt): # pragma: no cover pylint: disable=try-except-raise
# Catch and raise SystemExit and KeyboardInterrupt so that we can handle
# all other exception below
raise
except Exception: # pragma: no cover pylint: disable=broad-except
self.handleError(record)

def _proxy_logs_target(self, socket_bind_event):
def _proxy_logs_target(self, socket_bind_event, running_event):
context = zmq.Context()
out_proxy = pusher = None
try:
Expand All @@ -318,9 +370,7 @@ def _proxy_logs_target(self, socket_bind_event):
out_proxy.close(1000)
context.term()
sys.stderr.write(
"Failed to bind the ZMQ PAIR socket: {}\n{}\n".format(
exc, traceback.format_exc(exc)
)
"Failed to bind the ZMQ PAIR socket: {}\n{}\n".format(exc, traceback.format_exc())
)
sys.stderr.flush()
return
Expand All @@ -335,31 +385,42 @@ def _proxy_logs_target(self, socket_bind_event):
context.term()
sys.stderr.write(
"Failed to connect the ZMQ PUSH socket: {}\n{}\n".format(
exc, traceback.format_exc(exc)
exc, traceback.format_exc()
)
)
sys.stderr.flush()
return

socket_bind_event.set()

sentinel = msgpack.dumps(None)
while True:
try:
msg = out_proxy.recv()
if msg == sentinel:
# Received sentinel to stop
break
pusher.send(msg)
except zmq.ZMQError as exc:
sys.stderr.write(
"Failed to proxy log message: {}\n{}\n".format(exc, traceback.format_exc(exc))
)
try:
if running_event.wait(5) is False:
sys.stderr.write("Running event failed to set after 5 seconds. Stop processing....")
sys.stderr.flush()
break

# Close the receiving end of the PAIR proxy socket
out_proxy.close(0)
# Allow, the pusher queue to send any messages in it's queue for
# the next 1.5 seconds
pusher.close(1500)
context.term()
self.stop()
return

while running_event.is_set():
try:
msg = out_proxy.recv(zmq.NOBLOCK)
pusher.send(msg)
except zmq.error.Again:
# no messages yet, a little sleep
time.sleep(0.001)
except zmq.ZMQError as exc:
sys.stderr.write(
"Failed to proxy log message: {}\n{}\n".format(exc, traceback.format_exc())
)
sys.stderr.flush()
self.stop()
break
finally:
# Close the receiving end of the PAIR proxy socket
if not out_proxy.closed:
out_proxy.close(0)
if not pusher.closed:
# Allow, the pusher queue to send any messages in it's queue for
# the next 1.5 seconds
pusher.close(1500)
if not context.closed:
context.term()

0 comments on commit e23bfc3

Please sign in to comment.