From 69e9416612a2f6185962e0dd9af9fab67beac60f Mon Sep 17 00:00:00 2001 From: Dmitry Kuzmenko Date: Tue, 16 Apr 2019 15:49:28 +0300 Subject: [PATCH 1/5] Revert "Minor: Fix typo in docstring" This reverts commit 37aeba314330a5cefdf9ca1d5ce069bc790e692f. --- salt/transport/ipc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/salt/transport/ipc.py b/salt/transport/ipc.py index 12d58fdf374c..a853b258f4d3 100644 --- a/salt/transport/ipc.py +++ b/salt/transport/ipc.py @@ -581,7 +581,7 @@ class IPCMessageSubscriberService(IPCClient): of IPCMessageSubscriber instances feeding all of them with data. It closes automatically when there are no more subscribers. - To use this refer to IPCMessageSubscriber documentation. + To use this rever to IPCMessageSubscriber documentation. ''' def __init__(self, socket_path, io_loop=None): super(IPCMessageSubscriberService, self).__init__( From 7f26e764fce51eafee1971274e6c693bc5de87ee Mon Sep 17 00:00:00 2001 From: Dmitry Kuzmenko Date: Tue, 16 Apr 2019 15:50:50 +0300 Subject: [PATCH 2/5] Revert "Update doc conf with the new import `tornado.queues`" This reverts commit 684bf584f68bef5d1965e81494dfbd00f5c46542. --- doc/conf.py | 1 - 1 file changed, 1 deletion(-) diff --git a/doc/conf.py b/doc/conf.py index 1f79530f24e5..0f31c080b472 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -132,7 +132,6 @@ def inner(fn, *iargs, **ikwargs): # pylint: disable=unused-argument 'tornado.ioloop', 'tornado.iostream', 'tornado.netutil', - 'tornado.queues', 'tornado.simple_httpclient', 'tornado.stack_context', 'tornado.web', From 7cf6d549f2233b40ea3d8adf082c9f3970e5c932 Mon Sep 17 00:00:00 2001 From: Dmitry Kuzmenko Date: Tue, 16 Apr 2019 15:51:49 +0300 Subject: [PATCH 3/5] Revert "Support parallel work of multiple IPCMEssageSubscribers in one process" This reverts commit 710ab50624b16012d54485beeff151ff5940846a. --- salt/transport/ipc.py | 281 +++++++++++++++++++----------------------- 1 file changed, 128 insertions(+), 153 deletions(-) diff --git a/salt/transport/ipc.py b/salt/transport/ipc.py index a853b258f4d3..33f5e58e7f1d 100644 --- a/salt/transport/ipc.py +++ b/salt/transport/ipc.py @@ -20,8 +20,7 @@ import tornado.gen import tornado.netutil import tornado.concurrent -import tornado.queues -from tornado.locks import Lock +from tornado.locks import Semaphore from tornado.ioloop import IOLoop, TimeoutError as TornadoTimeoutError from tornado.iostream import IOStream # Import Salt libs @@ -575,121 +574,11 @@ def __del__(self): pass -class IPCMessageSubscriberService(IPCClient): - ''' - IPC message subscriber service that is a standalone singleton class starting once for a number - of IPCMessageSubscriber instances feeding all of them with data. It closes automatically when - there are no more subscribers. - - To use this rever to IPCMessageSubscriber documentation. - ''' - def __init__(self, socket_path, io_loop=None): - super(IPCMessageSubscriberService, self).__init__( - socket_path, io_loop=io_loop) - self.saved_data = [] - self._read_in_progress = Lock() - self.handlers = weakref.WeakSet() - self.read_stream_future = None - - def _subscribe(self, handler): - self.handlers.add(handler) - - def unsubscribe(self, handler): - self.handlers.discard(handler) - - def _has_subscribers(self): - return bool(self.handlers) - - def _feed_subscribers(self, data): - for subscriber in self.handlers: - subscriber._feed(data) - - @tornado.gen.coroutine - def _read(self, timeout, callback=None): - try: - yield self._read_in_progress.acquire(timeout=0) - except tornado.gen.TimeoutError: - raise tornado.gen.Return(None) - - log.debug('IPC Subscriber Service is starting reading') - # If timeout is not specified we need to set some here to make the service able to check - # is there any handler waiting for data. - if timeout is None: - timeout = 5 - - self.read_stream_future = None - while self._has_subscribers(): - if self.read_stream_future is None: - self.read_stream_future = self.stream.read_bytes(4096, partial=True) - - try: - wire_bytes = yield FutureWithTimeout(self.io_loop, - self.read_stream_future, - timeout) - self.read_stream_future = None - - self.unpacker.feed(wire_bytes) - msgs = [msg['body'] for msg in self.unpacker] - self._feed_subscribers(msgs) - except TornadoTimeoutError: - # Continue checking are there alive waiting handlers - # Keep 'read_stream_future' alive to wait it more in the next loop - continue - except tornado.iostream.StreamClosedError as exc: - log.trace('Subscriber disconnected from IPC %s', self.socket_path) - self._feed_subscribers([None]) - break - except Exception as exc: - log.error('Exception occurred in Subscriber while handling stream: %s', exc) - exc = IPCExceptionProxy(sys.exc_info()) - self._feed_subscribers([exc]) - break - - log.debug('IPC Subscriber Service is stopping due to a lack of subscribers') - self._read_in_progress.release() - raise tornado.gen.Return(None) - - @tornado.gen.coroutine - def read(self, handler, timeout=None): - ''' - Asynchronously read messages and invoke a callback when they are ready. - - :param callback: A callback with the received data - ''' - self._subscribe(handler) - while not self.connected(): - try: - yield self.connect(timeout=5) - except tornado.iostream.StreamClosedError: - log.trace('Subscriber closed stream on IPC %s before connect', self.socket_path) - yield tornado.gen.sleep(1) - except Exception as exc: - log.error('Exception occurred while Subscriber connecting: %s', exc) - yield tornado.gen.sleep(1) - yield self._read(timeout) - - def close(self): - ''' - Routines to handle any cleanup before the instance shuts down. - Sockets and filehandles should be closed explicitly, to prevent - leaks. - ''' - super(IPCMessageSubscriberService, self).close() - if self.read_stream_future is not None and self.read_stream_future.done(): - exc = self.read_stream_future.exception() - if exc and not isinstance(exc, tornado.iostream.StreamClosedError): - log.error("Read future returned exception %r", exc) - - def __del__(self): - if IPCMessageSubscriberService in globals(): - self.close() - - -class IPCMessageSubscriber(object): +class IPCMessageSubscriber(IPCClient): ''' Salt IPC message subscriber - Create or reuse an IPC client to receive messages from IPC publisher + Create an IPC client to receive messages from IPC publisher An example of a very simple IPCMessageSubscriber connecting to an IPCMessagePublisher. This example assumes an already running IPCMessagePublisher. @@ -718,61 +607,147 @@ class IPCMessageSubscriber(object): # Wait for some data package = ipc_subscriber.read_sync() ''' - def __init__(self, socket_path, io_loop=None): - self.service = IPCMessageSubscriberService(socket_path, io_loop) - self.queue = tornado.queues.Queue() - - def connected(self): - return self.service.connected() - - def connect(self, callback=None, timeout=None): - return self.service.connect(callback=callback, timeout=timeout) + def __singleton_init__(self, socket_path, io_loop=None): + super(IPCMessageSubscriber, self).__singleton_init__( + socket_path, io_loop=io_loop) + self._read_sync_future = None + self._read_stream_future = None + self._sync_ioloop_running = False + self.saved_data = [] + self._sync_read_in_progress = Semaphore() @tornado.gen.coroutine - def _feed(self, msgs): - for msg in msgs: - yield self.queue.put(msg) + def _read_sync(self, timeout): + yield self._sync_read_in_progress.acquire() + exc_to_raise = None + ret = None - @tornado.gen.coroutine - def read_async(self, callback, timeout=None): - ''' - Asynchronously read messages and invoke a callback when they are ready. + try: + while True: + if self._read_stream_future is None: + self._read_stream_future = self.stream.read_bytes(4096, partial=True) - :param callback: A callback with the received data - ''' - self.service.read(self) - while True: - try: - if timeout is not None: - deadline = time.time() + timeout + if timeout is None: + wire_bytes = yield self._read_stream_future else: - deadline = None - data = yield self.queue.get(timeout=deadline) - except tornado.gen.TimeoutError: - raise tornado.gen.Return(None) - if data is None: - break - elif isinstance(data, IPCExceptionProxy): - six.reraise(*data.orig_info) - elif callback: - self.service.io_loop.spawn_callback(callback, data) - else: - raise tornado.gen.Return(data) + future_with_timeout = FutureWithTimeout( + self.io_loop, self._read_stream_future, timeout) + wire_bytes = yield future_with_timeout + + self._read_stream_future = None + + # Remove the timeout once we get some data or an exception + # occurs. We will assume that the rest of the data is already + # there or is coming soon if an exception doesn't occur. + timeout = None + + self.unpacker.feed(wire_bytes) + first = True + for framed_msg in self.unpacker: + if first: + ret = framed_msg['body'] + first = False + else: + self.saved_data.append(framed_msg['body']) + if not first: + # We read at least one piece of data + break + except TornadoTimeoutError: + # In the timeout case, just return None. + # Keep 'self._read_stream_future' alive. + ret = None + except tornado.iostream.StreamClosedError as exc: + log.trace('Subscriber disconnected from IPC %s', self.socket_path) + self._read_stream_future = None + exc_to_raise = exc + except Exception as exc: + log.error('Exception occurred in Subscriber while handling stream: %s', exc) + self._read_stream_future = None + exc_to_raise = exc + + if self._sync_ioloop_running: + # Stop the IO Loop so that self.io_loop.start() will return in + # read_sync(). + self.io_loop.spawn_callback(self.io_loop.stop) + + if exc_to_raise is not None: + raise exc_to_raise # pylint: disable=E0702 + self._sync_read_in_progress.release() + raise tornado.gen.Return(ret) def read_sync(self, timeout=None): ''' Read a message from an IPC socket + The socket must already be connected. The associated IO Loop must NOT be running. :param int timeout: Timeout when receiving message :return: message data if successful. None if timed out. Will raise an exception for all other error conditions. ''' - return self.service.io_loop.run_sync(lambda: self.read_async(None, timeout)) + if self.saved_data: + return self.saved_data.pop(0) + + self._sync_ioloop_running = True + self._read_sync_future = self._read_sync(timeout) + self.io_loop.start() + self._sync_ioloop_running = False + + ret_future = self._read_sync_future + self._read_sync_future = None + return ret_future.result() + + @tornado.gen.coroutine + def _read_async(self, callback): + while not self.stream.closed(): + try: + self._read_stream_future = self.stream.read_bytes(4096, partial=True) + wire_bytes = yield self._read_stream_future + self._read_stream_future = None + self.unpacker.feed(wire_bytes) + for framed_msg in self.unpacker: + body = framed_msg['body'] + self.io_loop.spawn_callback(callback, body) + except tornado.iostream.StreamClosedError: + log.trace('Subscriber disconnected from IPC %s', self.socket_path) + break + except Exception as exc: + log.error('Exception occurred while Subscriber handling stream: %s', exc) + + @tornado.gen.coroutine + def read_async(self, callback): + ''' + Asynchronously read messages and invoke a callback when they are ready. + + :param callback: A callback with the received data + ''' + while not self.connected(): + try: + yield self.connect(timeout=5) + except tornado.iostream.StreamClosedError: + log.trace('Subscriber closed stream on IPC %s before connect', self.socket_path) + yield tornado.gen.sleep(1) + except Exception as exc: + log.error('Exception occurred while Subscriber connecting: %s', exc) + yield tornado.gen.sleep(1) + yield self._read_async(callback) def close(self): - self.service.unsubscribe(self) - self.service.close() + ''' + Routines to handle any cleanup before the instance shuts down. + Sockets and filehandles should be closed explicitly, to prevent + leaks. + ''' + if not self._closing: + IPCClient.close(self) + # This will prevent this message from showing up: + # '[ERROR ] Future exception was never retrieved: + # StreamClosedError' + if self._read_sync_future is not None and self._read_sync_future.done(): + self._read_sync_future.exception() + if self._read_stream_future is not None and self._read_stream_future.done(): + self._read_stream_future.exception() def __del__(self): - self.close() + if IPCMessageSubscriber in globals(): + self.close() From 9c85734a3cc504f35188a2376f6c96bd4806222f Mon Sep 17 00:00:00 2001 From: Dmitry Kuzmenko Date: Tue, 16 Apr 2019 19:23:43 +0300 Subject: [PATCH 4/5] Drop singleton from IPCClient --- salt/transport/ipc.py | 109 +++++++++++++++--------------------------- 1 file changed, 38 insertions(+), 71 deletions(-) diff --git a/salt/transport/ipc.py b/salt/transport/ipc.py index 33f5e58e7f1d..fcb882fc29ac 100644 --- a/salt/transport/ipc.py +++ b/salt/transport/ipc.py @@ -8,9 +8,7 @@ import errno import logging import socket -import weakref import time -import sys # Import 3rd-party libs import msgpack @@ -20,7 +18,7 @@ import tornado.gen import tornado.netutil import tornado.concurrent -from tornado.locks import Semaphore +from tornado.locks import Lock from tornado.ioloop import IOLoop, TimeoutError as TornadoTimeoutError from tornado.iostream import IOStream # Import Salt libs @@ -84,11 +82,6 @@ def _done_callback(self, future): self.set_exception(exc) -class IPCExceptionProxy(object): - def __init__(self, orig_info): - self.orig_info = orig_info - - class IPCServer(object): ''' A Tornado IPC server very similar to Tornado's TCPServer class @@ -607,21 +600,23 @@ class IPCMessageSubscriber(IPCClient): # Wait for some data package = ipc_subscriber.read_sync() ''' - def __singleton_init__(self, socket_path, io_loop=None): - super(IPCMessageSubscriber, self).__singleton_init__( + def __init__(self, socket_path, io_loop=None): + super(IPCMessageSubscriber, self).__init__( socket_path, io_loop=io_loop) - self._read_sync_future = None self._read_stream_future = None - self._sync_ioloop_running = False - self.saved_data = [] - self._sync_read_in_progress = Semaphore() + self._saved_data = [] + self._read_in_progress = Lock() @tornado.gen.coroutine - def _read_sync(self, timeout): - yield self._sync_read_in_progress.acquire() + def _read(self, timeout, callback=None): + try: + yield self._read_in_progress.acquire(timeout=0) + except tornado.gen.TimeoutError: + raise tornado.gen.Return(None) + + log.debug('IPC Subscriber is starting reading') exc_to_raise = None ret = None - try: while True: if self._read_stream_future is None: @@ -630,10 +625,9 @@ def _read_sync(self, timeout): if timeout is None: wire_bytes = yield self._read_stream_future else: - future_with_timeout = FutureWithTimeout( - self.io_loop, self._read_stream_future, timeout) - wire_bytes = yield future_with_timeout - + wire_bytes = yield FutureWithTimeout(self.io_loop, + self._read_stream_future, + timeout) self._read_stream_future = None # Remove the timeout once we get some data or an exception @@ -642,15 +636,17 @@ def _read_sync(self, timeout): timeout = None self.unpacker.feed(wire_bytes) - first = True + first_sync_msg = True for framed_msg in self.unpacker: - if first: + if callback: + self.io_loop.spawn_callback(callback, framed_msg['body']) + elif first_sync_msg: ret = framed_msg['body'] - first = False + first_sync_msg = False else: - self.saved_data.append(framed_msg['body']) - if not first: - # We read at least one piece of data + self._saved_data.append(framed_msg['body']) + if not first_sync_msg: + # We read at least one piece of data and we're on sync run break except TornadoTimeoutError: # In the timeout case, just return None. @@ -665,14 +661,9 @@ def _read_sync(self, timeout): self._read_stream_future = None exc_to_raise = exc - if self._sync_ioloop_running: - # Stop the IO Loop so that self.io_loop.start() will return in - # read_sync(). - self.io_loop.spawn_callback(self.io_loop.stop) - if exc_to_raise is not None: raise exc_to_raise # pylint: disable=E0702 - self._sync_read_in_progress.release() + self._read_in_progress.release() raise tornado.gen.Return(ret) def read_sync(self, timeout=None): @@ -685,34 +676,9 @@ def read_sync(self, timeout=None): :return: message data if successful. None if timed out. Will raise an exception for all other error conditions. ''' - if self.saved_data: - return self.saved_data.pop(0) - - self._sync_ioloop_running = True - self._read_sync_future = self._read_sync(timeout) - self.io_loop.start() - self._sync_ioloop_running = False - - ret_future = self._read_sync_future - self._read_sync_future = None - return ret_future.result() - - @tornado.gen.coroutine - def _read_async(self, callback): - while not self.stream.closed(): - try: - self._read_stream_future = self.stream.read_bytes(4096, partial=True) - wire_bytes = yield self._read_stream_future - self._read_stream_future = None - self.unpacker.feed(wire_bytes) - for framed_msg in self.unpacker: - body = framed_msg['body'] - self.io_loop.spawn_callback(callback, body) - except tornado.iostream.StreamClosedError: - log.trace('Subscriber disconnected from IPC %s', self.socket_path) - break - except Exception as exc: - log.error('Exception occurred while Subscriber handling stream: %s', exc) + if self._saved_data: + return self._saved_data.pop(0) + return self.io_loop.run_sync(lambda: self._read(timeout)) @tornado.gen.coroutine def read_async(self, callback): @@ -730,7 +696,7 @@ def read_async(self, callback): except Exception as exc: log.error('Exception occurred while Subscriber connecting: %s', exc) yield tornado.gen.sleep(1) - yield self._read_async(callback) + yield self._read(None, callback) def close(self): ''' @@ -738,15 +704,16 @@ def close(self): Sockets and filehandles should be closed explicitly, to prevent leaks. ''' - if not self._closing: - IPCClient.close(self) - # This will prevent this message from showing up: - # '[ERROR ] Future exception was never retrieved: - # StreamClosedError' - if self._read_sync_future is not None and self._read_sync_future.done(): - self._read_sync_future.exception() - if self._read_stream_future is not None and self._read_stream_future.done(): - self._read_stream_future.exception() + if self._closing: + return + super(IPCMessageSubscriber, self).close() + # This will prevent this message from showing up: + # '[ERROR ] Future exception was never retrieved: + # StreamClosedError' + if self._read_stream_future is not None and self._read_stream_future.done(): + exc = self._read_stream_future.exception() + if exc and not isinstance(exc, tornado.iostream.StreamClosedError): + log.error("Read future returned exception %r", exc) def __del__(self): if IPCMessageSubscriber in globals(): From fabbcac8e662402e551f711a138e9d1b0e3eb8e9 Mon Sep 17 00:00:00 2001 From: Dmitry Kuzmenko Date: Wed, 17 Apr 2019 00:47:39 +0300 Subject: [PATCH 5/5] A couple of race conditions fixes and a test update. --- salt/transport/ipc.py | 17 +++++++++-------- tests/unit/transport/test_ipc.py | 13 +++++++++++++ 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/salt/transport/ipc.py b/salt/transport/ipc.py index fcb882fc29ac..18eff89b705f 100644 --- a/salt/transport/ipc.py +++ b/salt/transport/ipc.py @@ -20,7 +20,7 @@ import tornado.concurrent from tornado.locks import Lock from tornado.ioloop import IOLoop, TimeoutError as TornadoTimeoutError -from tornado.iostream import IOStream +from tornado.iostream import IOStream, StreamClosedError # Import Salt libs import salt.transport.client import salt.transport.frame @@ -176,7 +176,7 @@ def return_message(msg): for framed_msg in unpacker: body = framed_msg['body'] self.io_loop.spawn_callback(self.payload_handler, body, write_callback(stream, framed_msg['head'])) - except tornado.iostream.StreamClosedError: + except StreamClosedError: log.trace('Client disconnected from IPC %s', self.socket_path) break except socket.error as exc: @@ -501,7 +501,7 @@ def start(self): def _write(self, stream, pack): try: yield stream.write(pack) - except tornado.iostream.StreamClosedError: + except StreamClosedError: log.trace('Client disconnected from IPC %s', self.socket_path) self.streams.discard(stream) except Exception as exc: @@ -610,7 +610,7 @@ def __init__(self, socket_path, io_loop=None): @tornado.gen.coroutine def _read(self, timeout, callback=None): try: - yield self._read_in_progress.acquire(timeout=0) + yield self._read_in_progress.acquire(timeout=0.00000001) except tornado.gen.TimeoutError: raise tornado.gen.Return(None) @@ -652,7 +652,7 @@ def _read(self, timeout, callback=None): # In the timeout case, just return None. # Keep 'self._read_stream_future' alive. ret = None - except tornado.iostream.StreamClosedError as exc: + except StreamClosedError as exc: log.trace('Subscriber disconnected from IPC %s', self.socket_path) self._read_stream_future = None exc_to_raise = exc @@ -661,9 +661,10 @@ def _read(self, timeout, callback=None): self._read_stream_future = None exc_to_raise = exc + self._read_in_progress.release() + if exc_to_raise is not None: raise exc_to_raise # pylint: disable=E0702 - self._read_in_progress.release() raise tornado.gen.Return(ret) def read_sync(self, timeout=None): @@ -690,7 +691,7 @@ def read_async(self, callback): while not self.connected(): try: yield self.connect(timeout=5) - except tornado.iostream.StreamClosedError: + except StreamClosedError: log.trace('Subscriber closed stream on IPC %s before connect', self.socket_path) yield tornado.gen.sleep(1) except Exception as exc: @@ -712,7 +713,7 @@ def close(self): # StreamClosedError' if self._read_stream_future is not None and self._read_stream_future.done(): exc = self._read_stream_future.exception() - if exc and not isinstance(exc, tornado.iostream.StreamClosedError): + if exc and not isinstance(exc, StreamClosedError): log.error("Read future returned exception %r", exc) def __del__(self): diff --git a/tests/unit/transport/test_ipc.py b/tests/unit/transport/test_ipc.py index 3f5ad99f8a70..d7495b93c70c 100644 --- a/tests/unit/transport/test_ipc.py +++ b/tests/unit/transport/test_ipc.py @@ -253,3 +253,16 @@ def handler(raw): self.assertEqual(len(call_cnt), 2) self.assertEqual(call_cnt[0], 'TEST') self.assertEqual(call_cnt[1], 'TEST') + + def test_sync_reading(self): + # To be completely fair let's create 2 clients. + client1 = self.sub_channel + client2 = self._get_sub_channel() + call_cnt = [] + + # Now let both waiting data at once + self.pub_channel.publish('TEST') + ret1 = client1.read_sync() + ret2 = client2.read_sync() + self.assertEqual(ret1, 'TEST') + self.assertEqual(ret2, 'TEST')