From f1d0e025ec444a8311c6788dff90cf8f087e04a7 Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Mon, 8 Apr 2019 16:01:34 +0000 Subject: [PATCH 1/9] Re-raise queued exceptions with traceback --- salt/transport/ipc.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/salt/transport/ipc.py b/salt/transport/ipc.py index 69032ff352c6..5013feb75c3a 100644 --- a/salt/transport/ipc.py +++ b/salt/transport/ipc.py @@ -11,6 +11,7 @@ import weakref import time import threading +import sys # Import 3rd-party libs import msgpack @@ -85,6 +86,11 @@ def _done_callback(self, future): self.set_exception(exc) +class IPCExceptionProxy(object): + def __init__(self, orig_info): + self.orig_info = orig_info + + class IPCServer(object): ''' A Tornado IPC server very similar to Tornado's TCPServer class @@ -694,6 +700,7 @@ def _read(self, timeout, callback=None): break except Exception as exc: log.error('Exception occurred in Subscriber while handling stream: %s', exc) + exc = IPCExceptionProxy(sys.exc_info()) self._feed_subscribers([exc]) break @@ -801,13 +808,19 @@ def read_async(self, callback, timeout=None): raise tornado.gen.Return(None) if data is None: break - elif isinstance(data, Exception): - raise data + elif isinstance(data, IPCExceptionProxy): + self.reraise(data.orig_info) elif callback: self.service.io_loop.spawn_callback(callback, data) else: raise tornado.gen.Return(data) + def reraise(self, exc_info): + if six.PY2: + raise exc_info[0], exc_info[1], exc_info[2] + else: + raise exc_info[0].with_traceback(exc_info[1], exc_info[2]) + def read_sync(self, timeout=None): ''' Read a message from an IPC socket From 024b4b61ebcf17be0cab112c2c8a675a17260794 Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Mon, 8 Apr 2019 16:04:22 +0000 Subject: [PATCH 2/9] Remove IPCClient singleton --- salt/transport/ipc.py | 65 ++++++------------------------------------- 1 file changed, 8 insertions(+), 57 deletions(-) diff --git a/salt/transport/ipc.py b/salt/transport/ipc.py index 5013feb75c3a..84f8959e7825 100644 --- a/salt/transport/ipc.py +++ b/salt/transport/ipc.py @@ -250,36 +250,7 @@ class IPCClient(object): case it is used as the port for a tcp localhost connection. ''' - - # Create singleton map between two sockets - instance_map = weakref.WeakKeyDictionary() - - def __new__(cls, socket_path, io_loop=None): - io_loop = io_loop or tornado.ioloop.IOLoop.current() - if io_loop not in IPCClient.instance_map: - IPCClient.instance_map[io_loop] = weakref.WeakValueDictionary() - loop_instance_map = IPCClient.instance_map[io_loop] - - # FIXME - key = six.text_type(socket_path) - - client = loop_instance_map.get(key) - if client is None: - log.debug('Initializing new IPCClient for path: %s', key) - client = object.__new__(cls) - # FIXME - client.__singleton_init__(io_loop=io_loop, socket_path=socket_path) - client._instance_key = key - loop_instance_map[key] = client - client._refcount = 1 - client._refcount_lock = threading.RLock() - else: - log.debug('Re-using IPCClient for %s', key) - with client._refcount_lock: - client._refcount += 1 - return client - - def __singleton_init__(self, socket_path, io_loop=None): + def __init__(self, socket_path, io_loop=None): ''' Create a new IPC client @@ -298,10 +269,6 @@ def __singleton_init__(self, socket_path, io_loop=None): encoding = 'utf-8' self.unpacker = msgpack.Unpacker(encoding=encoding) - def __init__(self, socket_path, io_loop=None): - # Handled by singleton __new__ - pass - def connected(self): return self.stream is not None and not self.stream.closed() @@ -373,16 +340,11 @@ def _connect(self, timeout=None): def __del__(self): try: - with self._refcount_lock: - # Make sure we actually close no matter if something - # went wrong with our ref counting - self._refcount = 1 - try: - self.close() - except socket.error as exc: - if exc.errno != errno.EBADF: - # If its not a bad file descriptor error, raise - raise + self.close() + except socket.error as exc: + if exc.errno != errno.EBADF: + # If its not a bad file descriptor error, raise + raise except TypeError: # This is raised when Python's GC has collected objects which # would be needed when calling self.close() @@ -414,17 +376,6 @@ def close(self): if self.stream is not None and not self.stream.closed(): self.stream.close() - # Remove the entry from the instance map so - # that a closed entry may not be reused. - # This forces this operation even if the reference - # count of the entry has not yet gone to zero. - if self.io_loop in self.__class__.instance_map: - loop_instance_map = self.__class__.instance_map[self.io_loop] - if self._instance_key in loop_instance_map: - del loop_instance_map[self._instance_key] - if not loop_instance_map: - del self.__class__.instance_map[self.io_loop] - class IPCMessageClient(IPCClient): ''' @@ -643,8 +594,8 @@ class IPCMessageSubscriberService(IPCClient): To use this refer to IPCMessageSubscriber documentation. ''' - def __singleton_init__(self, socket_path, io_loop=None): - super(IPCMessageSubscriberService, self).__singleton_init__( + def __init__(self, socket_path, io_loop=None): + super(IPCMessageSubscriberService, self).__init__( socket_path, io_loop=io_loop) self.saved_data = [] self._read_in_progress = Lock() From c80da32234a23f45f7fd4f43afe1bc19db9ca39d Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Mon, 8 Apr 2019 16:55:44 +0000 Subject: [PATCH 3/9] Fix linter issues --- salt/transport/ipc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/salt/transport/ipc.py b/salt/transport/ipc.py index 84f8959e7825..7f94f45f157f 100644 --- a/salt/transport/ipc.py +++ b/salt/transport/ipc.py @@ -768,7 +768,7 @@ def read_async(self, callback, timeout=None): def reraise(self, exc_info): if six.PY2: - raise exc_info[0], exc_info[1], exc_info[2] + raise exc_info[0], exc_info[1], exc_info[2] # pylint: disable=W1699 else: raise exc_info[0].with_traceback(exc_info[1], exc_info[2]) From 6d80789cf1736f00fb9618fa0e639a4b7683e1eb Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Mon, 8 Apr 2019 17:17:20 +0000 Subject: [PATCH 4/9] Fix ipc unit tests --- tests/unit/transport/test_ipc.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/tests/unit/transport/test_ipc.py b/tests/unit/transport/test_ipc.py index 5cc5a70ee89e..9626ab38d39a 100644 --- a/tests/unit/transport/test_ipc.py +++ b/tests/unit/transport/test_ipc.py @@ -86,13 +86,14 @@ class IPCMessageClient(BaseIPCReqCase): ''' def _get_channel(self): - channel = salt.transport.ipc.IPCMessageClient( - socket_path=self.socket_path, - io_loop=self.io_loop, - ) - channel.connect(callback=self.stop) - self.wait() - return channel + if not hasattr(self, 'channel') or self.channel is None: + self.channel = salt.transport.ipc.IPCMessageClient( + socket_path=self.socket_path, + io_loop=self.io_loop, + ) + self.channel.connect(callback=self.stop) + self.wait() + return self.channel def setUp(self): super(IPCMessageClient, self).setUp() @@ -107,6 +108,8 @@ def tearDown(self): if exc.errno != errno.EBADF: # If its not a bad file descriptor error, raise raise + finally: + self.channel = None def test_singleton(self): channel = self._get_channel() From 25f5a90bd637d9691ea5f709aad9b5d7b8125928 Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Tue, 9 Apr 2019 17:58:13 +0000 Subject: [PATCH 5/9] Use six.reraise for py3 compatability --- salt/transport/ipc.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/salt/transport/ipc.py b/salt/transport/ipc.py index 7f94f45f157f..920951a7bf48 100644 --- a/salt/transport/ipc.py +++ b/salt/transport/ipc.py @@ -760,18 +760,12 @@ def read_async(self, callback, timeout=None): if data is None: break elif isinstance(data, IPCExceptionProxy): - self.reraise(data.orig_info) + six.reraise(*data.orig_info) elif callback: self.service.io_loop.spawn_callback(callback, data) else: raise tornado.gen.Return(data) - def reraise(self, exc_info): - if six.PY2: - raise exc_info[0], exc_info[1], exc_info[2] # pylint: disable=W1699 - else: - raise exc_info[0].with_traceback(exc_info[1], exc_info[2]) - def read_sync(self, timeout=None): ''' Read a message from an IPC socket From c9ec8b1f9721e70317335e411987202fe4a25e5e Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Wed, 10 Apr 2019 03:32:20 +0000 Subject: [PATCH 6/9] Ensure exceptions in service future are handled --- salt/transport/ipc.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/salt/transport/ipc.py b/salt/transport/ipc.py index 920951a7bf48..65c090e2f7af 100644 --- a/salt/transport/ipc.py +++ b/salt/transport/ipc.py @@ -600,6 +600,7 @@ def __init__(self, socket_path, io_loop=None): self.saved_data = [] self._read_in_progress = Lock() self.handlers = weakref.WeakSet() + self.read_stream_future = None def _subscribe(self, handler): self.handlers.add(handler) @@ -627,16 +628,16 @@ def _read(self, timeout, callback=None): if timeout is None: timeout = 5 - read_stream_future = None + self.read_stream_future = None while self._has_subscribers(): - if read_stream_future is None: - read_stream_future = self.stream.read_bytes(4096, partial=True) + if self.read_stream_future is None: + self.read_stream_future = self.stream.read_bytes(4096, partial=True) try: wire_bytes = yield FutureWithTimeout(self.io_loop, - read_stream_future, + self.read_stream_future, timeout) - read_stream_future = None + self.read_stream_future = None self.unpacker.feed(wire_bytes) msgs = [msg['body'] for msg in self.unpacker] @@ -676,7 +677,7 @@ def read(self, handler, timeout=None): except Exception as exc: log.error('Exception occurred while Subscriber connecting: %s', exc) yield tornado.gen.sleep(1) - self._read(timeout) + yield self._read(timeout) def close(self): ''' @@ -684,8 +685,11 @@ def close(self): Sockets and filehandles should be closed explicitly, to prevent leaks. ''' - if not self._closing: - super(IPCMessageSubscriberService, self).close() + super(IPCMessageSubscriberService, self).close() + if self.read_stream_future is not None and self.read_stream_future.done(): + exc = self.read_stream_future.exception() + if exc and not isinstance(exc, tornado.iostream.StreamClosedError): + log.error("Read future returned exception %r", exc) def __del__(self): if IPCMessageSubscriberService in globals(): From 82150f0241107a1a02a4fa54da74c4b202c6aeb3 Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Wed, 10 Apr 2019 18:01:52 +0000 Subject: [PATCH 7/9] Remove un-used import --- salt/transport/ipc.py | 1 - 1 file changed, 1 deletion(-) diff --git a/salt/transport/ipc.py b/salt/transport/ipc.py index 65c090e2f7af..e0937f7a5b0b 100644 --- a/salt/transport/ipc.py +++ b/salt/transport/ipc.py @@ -10,7 +10,6 @@ import socket import weakref import time -import threading import sys # Import 3rd-party libs From 0141b7f74484abcb3d6e418df56664a72c5957cb Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Wed, 10 Apr 2019 18:20:31 +0000 Subject: [PATCH 8/9] Fix merge wart --- salt/transport/ipc.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/salt/transport/ipc.py b/salt/transport/ipc.py index e0937f7a5b0b..c23af2956647 100644 --- a/salt/transport/ipc.py +++ b/salt/transport/ipc.py @@ -358,16 +358,6 @@ def close(self): if self._closing: return - if self._refcount > 1: - # Decrease refcount - with self._refcount_lock: - self._refcount -= 1 - log.debug( - 'This is not the last %s instance. Not closing yet.', - self.__class__.__name__ - ) - return - self._closing = True log.debug('Closing %s instance', self.__class__.__name__) From f45d29b222e0d9686b12a5fb55ebdaf4125a2a3f Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Wed, 10 Apr 2019 19:16:14 +0000 Subject: [PATCH 9/9] Remove un-needed test --- tests/unit/transport/test_ipc.py | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/tests/unit/transport/test_ipc.py b/tests/unit/transport/test_ipc.py index 9626ab38d39a..3f5ad99f8a70 100644 --- a/tests/unit/transport/test_ipc.py +++ b/tests/unit/transport/test_ipc.py @@ -123,23 +123,6 @@ def test_singleton(self): self.wait() self.assertEqual(self.payloads[0], msg) - def test_last_singleton_instance_closes(self): - channel = self._get_channel() - msg = {'foo': 'bar', 'stop': True} - log.debug('Sending msg1') - self.channel.send(msg) - self.wait() - self.assertEqual(self.payloads[0], msg) - channel.close() - # Since this is a singleton, and only the last singleton instance - # should actually close the connection, the next code should still - # work and not timeout - msg = {'bar': 'foo', 'stop': True} - log.debug('Sending msg2') - self.channel.send(msg) - self.wait() - self.assertEqual(self.payloads[1], msg) - def test_basic_send(self): msg = {'foo': 'bar', 'stop': True} self.channel.send(msg)