Skip to content

Commit

Permalink
Merge pull request #52570 from DSRCorporation/bugs/revert_51963_2019.2
Browse files Browse the repository at this point in the history
Simplify IPCClient and prevent corrupt messages (Turn 2) 2019.2
  • Loading branch information
dwoz authored Apr 17, 2019
2 parents b173406 + fabbcac commit e522ffe
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 162 deletions.
1 change: 0 additions & 1 deletion doc/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ def inner(fn, *iargs, **ikwargs): # pylint: disable=unused-argument
'tornado.ioloop',
'tornado.iostream',
'tornado.netutil',
'tornado.queues',
'tornado.simple_httpclient',
'tornado.stack_context',
'tornado.web',
Expand Down
265 changes: 104 additions & 161 deletions salt/transport/ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@
import errno
import logging
import socket
import weakref
import time
import sys

# Import 3rd-party libs
import msgpack
Expand All @@ -20,10 +18,9 @@
import tornado.gen
import tornado.netutil
import tornado.concurrent
import tornado.queues
from tornado.locks import Lock
from tornado.ioloop import IOLoop, TimeoutError as TornadoTimeoutError
from tornado.iostream import IOStream
from tornado.iostream import IOStream, StreamClosedError
# Import Salt libs
import salt.transport.client
import salt.transport.frame
Expand Down Expand Up @@ -85,11 +82,6 @@ def _done_callback(self, future):
self.set_exception(exc)


class IPCExceptionProxy(object):
def __init__(self, orig_info):
self.orig_info = orig_info


class IPCServer(object):
'''
A Tornado IPC server very similar to Tornado's TCPServer class
Expand Down Expand Up @@ -184,7 +176,7 @@ def return_message(msg):
for framed_msg in unpacker:
body = framed_msg['body']
self.io_loop.spawn_callback(self.payload_handler, body, write_callback(stream, framed_msg['head']))
except tornado.iostream.StreamClosedError:
except StreamClosedError:
log.trace('Client disconnected from IPC %s', self.socket_path)
break
except socket.error as exc:
Expand Down Expand Up @@ -508,7 +500,7 @@ def start(self):
def _write(self, stream, pack):
try:
yield stream.write(pack)
except tornado.iostream.StreamClosedError:
except StreamClosedError:
log.trace('Client disconnected from IPC %s', self.socket_path)
self.streams.discard(stream)
except Exception as exc:
Expand Down Expand Up @@ -574,121 +566,11 @@ def __del__(self):
pass


class IPCMessageSubscriberService(IPCClient):
'''
IPC message subscriber service that is a standalone singleton class starting once for a number
of IPCMessageSubscriber instances feeding all of them with data. It closes automatically when
there are no more subscribers.
To use this refer to IPCMessageSubscriber documentation.
'''
def __init__(self, socket_path, io_loop=None):
super(IPCMessageSubscriberService, self).__init__(
socket_path, io_loop=io_loop)
self.saved_data = []
self._read_in_progress = Lock()
self.handlers = weakref.WeakSet()
self.read_stream_future = None

def _subscribe(self, handler):
self.handlers.add(handler)

def unsubscribe(self, handler):
self.handlers.discard(handler)

def _has_subscribers(self):
return bool(self.handlers)

def _feed_subscribers(self, data):
for subscriber in self.handlers:
subscriber._feed(data)

@tornado.gen.coroutine
def _read(self, timeout, callback=None):
try:
yield self._read_in_progress.acquire(timeout=0)
except tornado.gen.TimeoutError:
raise tornado.gen.Return(None)

log.debug('IPC Subscriber Service is starting reading')
# If timeout is not specified we need to set some here to make the service able to check
# is there any handler waiting for data.
if timeout is None:
timeout = 5

self.read_stream_future = None
while self._has_subscribers():
if self.read_stream_future is None:
self.read_stream_future = self.stream.read_bytes(4096, partial=True)

try:
wire_bytes = yield FutureWithTimeout(self.io_loop,
self.read_stream_future,
timeout)
self.read_stream_future = None

self.unpacker.feed(wire_bytes)
msgs = [msg['body'] for msg in self.unpacker]
self._feed_subscribers(msgs)
except TornadoTimeoutError:
# Continue checking are there alive waiting handlers
# Keep 'read_stream_future' alive to wait it more in the next loop
continue
except tornado.iostream.StreamClosedError as exc:
log.trace('Subscriber disconnected from IPC %s', self.socket_path)
self._feed_subscribers([None])
break
except Exception as exc:
log.error('Exception occurred in Subscriber while handling stream: %s', exc)
exc = IPCExceptionProxy(sys.exc_info())
self._feed_subscribers([exc])
break

log.debug('IPC Subscriber Service is stopping due to a lack of subscribers')
self._read_in_progress.release()
raise tornado.gen.Return(None)

@tornado.gen.coroutine
def read(self, handler, timeout=None):
'''
Asynchronously read messages and invoke a callback when they are ready.
:param callback: A callback with the received data
'''
self._subscribe(handler)
while not self.connected():
try:
yield self.connect(timeout=5)
except tornado.iostream.StreamClosedError:
log.trace('Subscriber closed stream on IPC %s before connect', self.socket_path)
yield tornado.gen.sleep(1)
except Exception as exc:
log.error('Exception occurred while Subscriber connecting: %s', exc)
yield tornado.gen.sleep(1)
yield self._read(timeout)

def close(self):
'''
Routines to handle any cleanup before the instance shuts down.
Sockets and filehandles should be closed explicitly, to prevent
leaks.
'''
super(IPCMessageSubscriberService, self).close()
if self.read_stream_future is not None and self.read_stream_future.done():
exc = self.read_stream_future.exception()
if exc and not isinstance(exc, tornado.iostream.StreamClosedError):
log.error("Read future returned exception %r", exc)

def __del__(self):
if IPCMessageSubscriberService in globals():
self.close()


class IPCMessageSubscriber(object):
class IPCMessageSubscriber(IPCClient):
'''
Salt IPC message subscriber
Create or reuse an IPC client to receive messages from IPC publisher
Create an IPC client to receive messages from IPC publisher
An example of a very simple IPCMessageSubscriber connecting to an IPCMessagePublisher.
This example assumes an already running IPCMessagePublisher.
Expand Down Expand Up @@ -718,60 +600,121 @@ class IPCMessageSubscriber(object):
package = ipc_subscriber.read_sync()
'''
def __init__(self, socket_path, io_loop=None):
self.service = IPCMessageSubscriberService(socket_path, io_loop)
self.queue = tornado.queues.Queue()

def connected(self):
return self.service.connected()

def connect(self, callback=None, timeout=None):
return self.service.connect(callback=callback, timeout=timeout)
super(IPCMessageSubscriber, self).__init__(
socket_path, io_loop=io_loop)
self._read_stream_future = None
self._saved_data = []
self._read_in_progress = Lock()

@tornado.gen.coroutine
def _feed(self, msgs):
for msg in msgs:
yield self.queue.put(msg)
def _read(self, timeout, callback=None):
try:
yield self._read_in_progress.acquire(timeout=0.00000001)
except tornado.gen.TimeoutError:
raise tornado.gen.Return(None)

@tornado.gen.coroutine
def read_async(self, callback, timeout=None):
'''
Asynchronously read messages and invoke a callback when they are ready.
log.debug('IPC Subscriber is starting reading')
exc_to_raise = None
ret = None
try:
while True:
if self._read_stream_future is None:
self._read_stream_future = self.stream.read_bytes(4096, partial=True)

:param callback: A callback with the received data
'''
self.service.read(self)
while True:
try:
if timeout is not None:
deadline = time.time() + timeout
if timeout is None:
wire_bytes = yield self._read_stream_future
else:
deadline = None
data = yield self.queue.get(timeout=deadline)
except tornado.gen.TimeoutError:
raise tornado.gen.Return(None)
if data is None:
break
elif isinstance(data, IPCExceptionProxy):
six.reraise(*data.orig_info)
elif callback:
self.service.io_loop.spawn_callback(callback, data)
else:
raise tornado.gen.Return(data)
wire_bytes = yield FutureWithTimeout(self.io_loop,
self._read_stream_future,
timeout)
self._read_stream_future = None

# Remove the timeout once we get some data or an exception
# occurs. We will assume that the rest of the data is already
# there or is coming soon if an exception doesn't occur.
timeout = None

self.unpacker.feed(wire_bytes)
first_sync_msg = True
for framed_msg in self.unpacker:
if callback:
self.io_loop.spawn_callback(callback, framed_msg['body'])
elif first_sync_msg:
ret = framed_msg['body']
first_sync_msg = False
else:
self._saved_data.append(framed_msg['body'])
if not first_sync_msg:
# We read at least one piece of data and we're on sync run
break
except TornadoTimeoutError:
# In the timeout case, just return None.
# Keep 'self._read_stream_future' alive.
ret = None
except StreamClosedError as exc:
log.trace('Subscriber disconnected from IPC %s', self.socket_path)
self._read_stream_future = None
exc_to_raise = exc
except Exception as exc:
log.error('Exception occurred in Subscriber while handling stream: %s', exc)
self._read_stream_future = None
exc_to_raise = exc

self._read_in_progress.release()

if exc_to_raise is not None:
raise exc_to_raise # pylint: disable=E0702
raise tornado.gen.Return(ret)

def read_sync(self, timeout=None):
'''
Read a message from an IPC socket
The socket must already be connected.
The associated IO Loop must NOT be running.
:param int timeout: Timeout when receiving message
:return: message data if successful. None if timed out. Will raise an
exception for all other error conditions.
'''
return self.service.io_loop.run_sync(lambda: self.read_async(None, timeout))
if self._saved_data:
return self._saved_data.pop(0)
return self.io_loop.run_sync(lambda: self._read(timeout))

@tornado.gen.coroutine
def read_async(self, callback):
'''
Asynchronously read messages and invoke a callback when they are ready.
:param callback: A callback with the received data
'''
while not self.connected():
try:
yield self.connect(timeout=5)
except StreamClosedError:
log.trace('Subscriber closed stream on IPC %s before connect', self.socket_path)
yield tornado.gen.sleep(1)
except Exception as exc:
log.error('Exception occurred while Subscriber connecting: %s', exc)
yield tornado.gen.sleep(1)
yield self._read(None, callback)

def close(self):
self.service.unsubscribe(self)
self.service.close()
'''
Routines to handle any cleanup before the instance shuts down.
Sockets and filehandles should be closed explicitly, to prevent
leaks.
'''
if self._closing:
return
super(IPCMessageSubscriber, self).close()
# This will prevent this message from showing up:
# '[ERROR ] Future exception was never retrieved:
# StreamClosedError'
if self._read_stream_future is not None and self._read_stream_future.done():
exc = self._read_stream_future.exception()
if exc and not isinstance(exc, StreamClosedError):
log.error("Read future returned exception %r", exc)

def __del__(self):
self.close()
if IPCMessageSubscriber in globals():
self.close()
13 changes: 13 additions & 0 deletions tests/unit/transport/test_ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,3 +253,16 @@ def handler(raw):
self.assertEqual(len(call_cnt), 2)
self.assertEqual(call_cnt[0], 'TEST')
self.assertEqual(call_cnt[1], 'TEST')

def test_sync_reading(self):
# To be completely fair let's create 2 clients.
client1 = self.sub_channel
client2 = self._get_sub_channel()
call_cnt = []

# Now let both waiting data at once
self.pub_channel.publish('TEST')
ret1 = client1.read_sync()
ret2 = client2.read_sync()
self.assertEqual(ret1, 'TEST')
self.assertEqual(ret2, 'TEST')

0 comments on commit e522ffe

Please sign in to comment.