Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify IPCClient and prevent corrupt messages (Turn 2) 2019.2 #52570

Merged
merged 5 commits into from
Apr 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion doc/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ def inner(fn, *iargs, **ikwargs): # pylint: disable=unused-argument
'tornado.ioloop',
'tornado.iostream',
'tornado.netutil',
'tornado.queues',
'tornado.simple_httpclient',
'tornado.stack_context',
'tornado.web',
Expand Down
265 changes: 104 additions & 161 deletions salt/transport/ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@
import errno
import logging
import socket
import weakref
import time
import sys

# Import 3rd-party libs
import msgpack
Expand All @@ -20,10 +18,9 @@
import tornado.gen
import tornado.netutil
import tornado.concurrent
import tornado.queues
from tornado.locks import Lock
from tornado.ioloop import IOLoop, TimeoutError as TornadoTimeoutError
from tornado.iostream import IOStream
from tornado.iostream import IOStream, StreamClosedError
# Import Salt libs
import salt.transport.client
import salt.transport.frame
Expand Down Expand Up @@ -85,11 +82,6 @@ def _done_callback(self, future):
self.set_exception(exc)


class IPCExceptionProxy(object):
def __init__(self, orig_info):
self.orig_info = orig_info


class IPCServer(object):
'''
A Tornado IPC server very similar to Tornado's TCPServer class
Expand Down Expand Up @@ -184,7 +176,7 @@ def return_message(msg):
for framed_msg in unpacker:
body = framed_msg['body']
self.io_loop.spawn_callback(self.payload_handler, body, write_callback(stream, framed_msg['head']))
except tornado.iostream.StreamClosedError:
except StreamClosedError:
log.trace('Client disconnected from IPC %s', self.socket_path)
break
except socket.error as exc:
Expand Down Expand Up @@ -509,7 +501,7 @@ def start(self):
def _write(self, stream, pack):
try:
yield stream.write(pack)
except tornado.iostream.StreamClosedError:
except StreamClosedError:
log.trace('Client disconnected from IPC %s', self.socket_path)
self.streams.discard(stream)
except Exception as exc:
Expand Down Expand Up @@ -575,121 +567,11 @@ def __del__(self):
pass


class IPCMessageSubscriberService(IPCClient):
'''
IPC message subscriber service that is a standalone singleton class starting once for a number
of IPCMessageSubscriber instances feeding all of them with data. It closes automatically when
there are no more subscribers.

To use this refer to IPCMessageSubscriber documentation.
'''
def __init__(self, socket_path, io_loop=None):
super(IPCMessageSubscriberService, self).__init__(
socket_path, io_loop=io_loop)
self.saved_data = []
self._read_in_progress = Lock()
self.handlers = weakref.WeakSet()
self.read_stream_future = None

def _subscribe(self, handler):
self.handlers.add(handler)

def unsubscribe(self, handler):
self.handlers.discard(handler)

def _has_subscribers(self):
return bool(self.handlers)

def _feed_subscribers(self, data):
for subscriber in self.handlers:
subscriber._feed(data)

@tornado.gen.coroutine
def _read(self, timeout, callback=None):
try:
yield self._read_in_progress.acquire(timeout=0)
except tornado.gen.TimeoutError:
raise tornado.gen.Return(None)

log.debug('IPC Subscriber Service is starting reading')
# If timeout is not specified we need to set some here to make the service able to check
# is there any handler waiting for data.
if timeout is None:
timeout = 5

self.read_stream_future = None
while self._has_subscribers():
if self.read_stream_future is None:
self.read_stream_future = self.stream.read_bytes(4096, partial=True)

try:
wire_bytes = yield FutureWithTimeout(self.io_loop,
self.read_stream_future,
timeout)
self.read_stream_future = None

self.unpacker.feed(wire_bytes)
msgs = [msg['body'] for msg in self.unpacker]
self._feed_subscribers(msgs)
except TornadoTimeoutError:
# Continue checking are there alive waiting handlers
# Keep 'read_stream_future' alive to wait it more in the next loop
continue
except tornado.iostream.StreamClosedError as exc:
log.trace('Subscriber disconnected from IPC %s', self.socket_path)
self._feed_subscribers([None])
break
except Exception as exc:
log.error('Exception occurred in Subscriber while handling stream: %s', exc)
exc = IPCExceptionProxy(sys.exc_info())
self._feed_subscribers([exc])
break

log.debug('IPC Subscriber Service is stopping due to a lack of subscribers')
self._read_in_progress.release()
raise tornado.gen.Return(None)

@tornado.gen.coroutine
def read(self, handler, timeout=None):
'''
Asynchronously read messages and invoke a callback when they are ready.

:param callback: A callback with the received data
'''
self._subscribe(handler)
while not self.connected():
try:
yield self.connect(timeout=5)
except tornado.iostream.StreamClosedError:
log.trace('Subscriber closed stream on IPC %s before connect', self.socket_path)
yield tornado.gen.sleep(1)
except Exception as exc:
log.error('Exception occurred while Subscriber connecting: %s', exc)
yield tornado.gen.sleep(1)
yield self._read(timeout)

def close(self):
'''
Routines to handle any cleanup before the instance shuts down.
Sockets and filehandles should be closed explicitly, to prevent
leaks.
'''
super(IPCMessageSubscriberService, self).close()
if self.read_stream_future is not None and self.read_stream_future.done():
exc = self.read_stream_future.exception()
if exc and not isinstance(exc, tornado.iostream.StreamClosedError):
log.error("Read future returned exception %r", exc)

def __del__(self):
if IPCMessageSubscriberService in globals():
self.close()


class IPCMessageSubscriber(object):
class IPCMessageSubscriber(IPCClient):
'''
Salt IPC message subscriber

Create or reuse an IPC client to receive messages from IPC publisher
Create an IPC client to receive messages from IPC publisher

An example of a very simple IPCMessageSubscriber connecting to an IPCMessagePublisher.
This example assumes an already running IPCMessagePublisher.
Expand Down Expand Up @@ -719,60 +601,121 @@ class IPCMessageSubscriber(object):
package = ipc_subscriber.read_sync()
'''
def __init__(self, socket_path, io_loop=None):
self.service = IPCMessageSubscriberService(socket_path, io_loop)
self.queue = tornado.queues.Queue()

def connected(self):
return self.service.connected()

def connect(self, callback=None, timeout=None):
return self.service.connect(callback=callback, timeout=timeout)
super(IPCMessageSubscriber, self).__init__(
socket_path, io_loop=io_loop)
self._read_stream_future = None
self._saved_data = []
self._read_in_progress = Lock()

@tornado.gen.coroutine
def _feed(self, msgs):
for msg in msgs:
yield self.queue.put(msg)
def _read(self, timeout, callback=None):
try:
yield self._read_in_progress.acquire(timeout=0.00000001)
except tornado.gen.TimeoutError:
raise tornado.gen.Return(None)

@tornado.gen.coroutine
def read_async(self, callback, timeout=None):
'''
Asynchronously read messages and invoke a callback when they are ready.
log.debug('IPC Subscriber is starting reading')
exc_to_raise = None
ret = None
try:
while True:
if self._read_stream_future is None:
self._read_stream_future = self.stream.read_bytes(4096, partial=True)

:param callback: A callback with the received data
'''
self.service.read(self)
while True:
try:
if timeout is not None:
deadline = time.time() + timeout
if timeout is None:
wire_bytes = yield self._read_stream_future
else:
deadline = None
data = yield self.queue.get(timeout=deadline)
except tornado.gen.TimeoutError:
raise tornado.gen.Return(None)
if data is None:
break
elif isinstance(data, IPCExceptionProxy):
six.reraise(*data.orig_info)
elif callback:
self.service.io_loop.spawn_callback(callback, data)
else:
raise tornado.gen.Return(data)
wire_bytes = yield FutureWithTimeout(self.io_loop,
self._read_stream_future,
timeout)
self._read_stream_future = None

# Remove the timeout once we get some data or an exception
# occurs. We will assume that the rest of the data is already
# there or is coming soon if an exception doesn't occur.
timeout = None

self.unpacker.feed(wire_bytes)
first_sync_msg = True
for framed_msg in self.unpacker:
if callback:
self.io_loop.spawn_callback(callback, framed_msg['body'])
elif first_sync_msg:
ret = framed_msg['body']
first_sync_msg = False
else:
self._saved_data.append(framed_msg['body'])
if not first_sync_msg:
# We read at least one piece of data and we're on sync run
break
except TornadoTimeoutError:
# In the timeout case, just return None.
# Keep 'self._read_stream_future' alive.
ret = None
except StreamClosedError as exc:
log.trace('Subscriber disconnected from IPC %s', self.socket_path)
self._read_stream_future = None
exc_to_raise = exc
except Exception as exc:
log.error('Exception occurred in Subscriber while handling stream: %s', exc)
self._read_stream_future = None
exc_to_raise = exc

self._read_in_progress.release()

if exc_to_raise is not None:
raise exc_to_raise # pylint: disable=E0702
raise tornado.gen.Return(ret)

def read_sync(self, timeout=None):
'''
Read a message from an IPC socket

The socket must already be connected.
The associated IO Loop must NOT be running.
:param int timeout: Timeout when receiving message
:return: message data if successful. None if timed out. Will raise an
exception for all other error conditions.
'''
return self.service.io_loop.run_sync(lambda: self.read_async(None, timeout))
if self._saved_data:
return self._saved_data.pop(0)
return self.io_loop.run_sync(lambda: self._read(timeout))

@tornado.gen.coroutine
def read_async(self, callback):
'''
Asynchronously read messages and invoke a callback when they are ready.

:param callback: A callback with the received data
'''
while not self.connected():
try:
yield self.connect(timeout=5)
except StreamClosedError:
log.trace('Subscriber closed stream on IPC %s before connect', self.socket_path)
yield tornado.gen.sleep(1)
except Exception as exc:
log.error('Exception occurred while Subscriber connecting: %s', exc)
yield tornado.gen.sleep(1)
yield self._read(None, callback)

def close(self):
self.service.unsubscribe(self)
self.service.close()
'''
Routines to handle any cleanup before the instance shuts down.
Sockets and filehandles should be closed explicitly, to prevent
leaks.
'''
if self._closing:
return
super(IPCMessageSubscriber, self).close()
# This will prevent this message from showing up:
# '[ERROR ] Future exception was never retrieved:
# StreamClosedError'
if self._read_stream_future is not None and self._read_stream_future.done():
exc = self._read_stream_future.exception()
if exc and not isinstance(exc, StreamClosedError):
log.error("Read future returned exception %r", exc)

def __del__(self):
self.close()
if IPCMessageSubscriber in globals():
self.close()
13 changes: 13 additions & 0 deletions tests/unit/transport/test_ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,3 +253,16 @@ def handler(raw):
self.assertEqual(len(call_cnt), 2)
self.assertEqual(call_cnt[0], 'TEST')
self.assertEqual(call_cnt[1], 'TEST')

def test_sync_reading(self):
# To be completely fair let's create 2 clients.
client1 = self.sub_channel
client2 = self._get_sub_channel()
call_cnt = []

# Now let both waiting data at once
self.pub_channel.publish('TEST')
ret1 = client1.read_sync()
ret2 = client2.read_sync()
self.assertEqual(ret1, 'TEST')
self.assertEqual(ret2, 'TEST')