Skip to content

Commit

Permalink
Message queue optimizations (Fixes #1240)
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelgrinberg authored Sep 16, 2023
1 parent c419fc5 commit dc6e4f5
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 96 deletions.
55 changes: 33 additions & 22 deletions src/socketio/asyncio_pubsub_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,12 @@ async def emit(self, event, data, namespace=None, room=None, skip_sid=None,
callback = (room, namespace, id)
else:
callback = None
await self._publish({'method': 'emit', 'event': event, 'data': data,
'namespace': namespace, 'room': room,
'skip_sid': skip_sid, 'callback': callback,
'host_id': self.host_id})
message = {'method': 'emit', 'event': event, 'data': data,
'namespace': namespace, 'room': room,
'skip_sid': skip_sid, 'callback': callback,
'host_id': self.host_id}
await self._handle_emit(message) # handle in this host
await self._publish(message) # notify other hosts

async def can_disconnect(self, sid, namespace):
if self.is_connected(sid, namespace):
Expand All @@ -76,18 +78,23 @@ async def can_disconnect(self, sid, namespace):
else:
# client is in another server, so we post request to the queue
await self._publish({'method': 'disconnect', 'sid': sid,
'namespace': namespace or '/'})
'namespace': namespace or '/',
'host_id': self.host_id})

async def disconnect(self, sid, namespace, **kwargs):
if kwargs.get('ignore_queue'):
return await super(AsyncPubSubManager, self).disconnect(
sid, namespace=namespace)
await self._publish({'method': 'disconnect', 'sid': sid,
'namespace': namespace or '/'})
message = {'method': 'disconnect', 'sid': sid,
'namespace': namespace or '/', 'host_id': self.host_id}
await self._handle_disconnect(message) # handle in this host
await self._publish(message) # notify other hosts

async def close_room(self, room, namespace=None):
await self._publish({'method': 'close_room', 'room': room,
'namespace': namespace or '/'})
message = {'method': 'close_room', 'room': room,
'namespace': namespace or '/', 'host_id': self.host_id}
await self._handle_close_room(message) # handle in this host
await self._publish(message) # notify other hosts

async def _publish(self, data):
"""Publish a message on the Socket.IO channel.
Expand Down Expand Up @@ -139,18 +146,21 @@ async def _return_callback(self, host_id, sid, namespace, callback_id,
*args):
# When an event callback is received, the callback is returned back
# the sender, which is identified by the host_id
await self._publish({'method': 'callback', 'host_id': host_id,
'sid': sid, 'namespace': namespace,
'id': callback_id, 'args': args})
if host_id == self.host_id:
await self.trigger_callback(sid, callback_id, args)
else:
await self._publish({'method': 'callback', 'host_id': host_id,
'sid': sid, 'namespace': namespace,
'id': callback_id, 'args': args})

async def _handle_disconnect(self, message):
await self.server.disconnect(sid=message.get('sid'),
namespace=message.get('namespace'),
ignore_queue=True)

async def _handle_close_room(self, message):
await super().close_room(
room=message.get('room'), namespace=message.get('namespace'))
await super().close_room(room=message.get('room'),
namespace=message.get('namespace'))

async def _thread(self):
while True:
Expand All @@ -171,17 +181,18 @@ async def _thread(self):
except:
pass
if data and 'method' in data:
self._get_logger().info('pubsub message: {}'.format(
self._get_logger().debug('pubsub message: {}'.format(
data['method']))
try:
if data['method'] == 'emit':
await self._handle_emit(data)
elif data['method'] == 'callback':
if data['method'] == 'callback':
await self._handle_callback(data)
elif data['method'] == 'disconnect':
await self._handle_disconnect(data)
elif data['method'] == 'close_room':
await self._handle_close_room(data)
elif data.get('host_id') != self.host_id:
if data['method'] == 'emit':
await self._handle_emit(data)
elif data['method'] == 'disconnect':
await self._handle_disconnect(data)
elif data['method'] == 'close_room':
await self._handle_close_room(data)
except asyncio.CancelledError:
raise # let the outer try/except handle it
except:
Expand Down
67 changes: 39 additions & 28 deletions src/socketio/pubsub_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,30 +61,38 @@ def emit(self, event, data, namespace=None, room=None, skip_sid=None,
callback = (room, namespace, id)
else:
callback = None
self._publish({'method': 'emit', 'event': event, 'data': data,
'namespace': namespace, 'room': room,
'skip_sid': skip_sid, 'callback': callback,
'host_id': self.host_id})
message = {'method': 'emit', 'event': event, 'data': data,
'namespace': namespace, 'room': room,
'skip_sid': skip_sid, 'callback': callback,
'host_id': self.host_id}
self._handle_emit(message) # handle in this host
self._publish(message) # notify other hosts

def can_disconnect(self, sid, namespace):
if self.is_connected(sid, namespace):
# client is in this server, so we can disconnect directly
return super().can_disconnect(sid, namespace)
else:
# client is in another server, so we post request to the queue
self._publish({'method': 'disconnect', 'sid': sid,
'namespace': namespace or '/'})
message = {'method': 'disconnect', 'sid': sid,
'namespace': namespace or '/', 'host_id': self.host_id}
self._handle_disconnect(message) # handle in this host
self._publish(message) # notify other hosts

def disconnect(self, sid, namespace=None, **kwargs):
if kwargs.get('ignore_queue'):
return super(PubSubManager, self).disconnect(
sid, namespace=namespace)
self._publish({'method': 'disconnect', 'sid': sid,
'namespace': namespace or '/'})
message = {'method': 'disconnect', 'sid': sid,
'namespace': namespace or '/', 'host_id': self.host_id}
self._handle_disconnect(message) # handle in this host
self._publish(message) # notify other hosts

def close_room(self, room, namespace=None):
self._publish({'method': 'close_room', 'room': room,
'namespace': namespace or '/'})
message = {'method': 'close_room', 'room': room,
'namespace': namespace or '/', 'host_id': self.host_id}
self._handle_close_room(message) # handle in this host
self._publish(message) # notify other hosts

def _publish(self, data):
"""Publish a message on the Socket.IO channel.
Expand Down Expand Up @@ -116,11 +124,10 @@ def _handle_emit(self, message):
*remote_callback)
else:
callback = None
super(PubSubManager, self).emit(message['event'], message['data'],
namespace=message.get('namespace'),
room=message.get('room'),
skip_sid=message.get('skip_sid'),
callback=callback)
super().emit(message['event'], message['data'],
namespace=message.get('namespace'),
room=message.get('room'),
skip_sid=message.get('skip_sid'), callback=callback)

def _handle_callback(self, message):
if self.host_id == message.get('host_id'):
Expand All @@ -135,18 +142,21 @@ def _handle_callback(self, message):
def _return_callback(self, host_id, sid, namespace, callback_id, *args):
# When an event callback is received, the callback is returned back
# to the sender, which is identified by the host_id
self._publish({'method': 'callback', 'host_id': host_id,
'sid': sid, 'namespace': namespace, 'id': callback_id,
'args': args})
if host_id == self.host_id:
self.trigger_callback(sid, callback_id, args)
else:
self._publish({'method': 'callback', 'host_id': host_id,
'sid': sid, 'namespace': namespace,
'id': callback_id, 'args': args})

def _handle_disconnect(self, message):
self.server.disconnect(sid=message.get('sid'),
namespace=message.get('namespace'),
ignore_queue=True)

def _handle_close_room(self, message):
super(PubSubManager, self).close_room(
room=message.get('room'), namespace=message.get('namespace'))
super().close_room(room=message.get('room'),
namespace=message.get('namespace'))

def _thread(self):
for message in self._listen():
Expand All @@ -165,17 +175,18 @@ def _thread(self):
except:
pass
if data and 'method' in data:
self._get_logger().info('pubsub message: {}'.format(
self._get_logger().debug('pubsub message: {}'.format(
data['method']))
try:
if data['method'] == 'emit':
self._handle_emit(data)
elif data['method'] == 'callback':
if data['method'] == 'callback':
self._handle_callback(data)
elif data['method'] == 'disconnect':
self._handle_disconnect(data)
elif data['method'] == 'close_room':
self._handle_close_room(data)
elif data.get('host_id') != self.host_id:
if data['method'] == 'emit':
self._handle_emit(data)
elif data['method'] == 'disconnect':
self._handle_disconnect(data)
elif data['method'] == 'close_room':
self._handle_close_room(data)
except:
self.server.logger.exception(
'Unknown error in pubsub listening thread')
91 changes: 68 additions & 23 deletions tests/asyncio/test_asyncio_pubsub_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,15 @@ def test_can_disconnect(self):
assert _run(self.pm.can_disconnect(sid, '/')) is True
_run(self.pm.can_disconnect(sid, '/foo'))
self.pm._publish.mock.assert_called_once_with(
{'method': 'disconnect', 'sid': sid, 'namespace': '/foo'}
{'method': 'disconnect', 'sid': sid, 'namespace': '/foo',
'host_id': '123456'}
)

def test_disconnect(self):
_run(self.pm.disconnect('foo', '/'))
self.pm._publish.mock.assert_called_once_with(
{'method': 'disconnect', 'sid': 'foo', 'namespace': '/'}
{'method': 'disconnect', 'sid': 'foo', 'namespace': '/',
'host_id': '123456'}
)

def test_disconnect_ignore_queue(self):
Expand All @@ -182,13 +184,15 @@ def test_disconnect_ignore_queue(self):
def test_close_room(self):
_run(self.pm.close_room('foo'))
self.pm._publish.mock.assert_called_once_with(
{'method': 'close_room', 'room': 'foo', 'namespace': '/'}
{'method': 'close_room', 'room': 'foo', 'namespace': '/',
'host_id': '123456'}
)

def test_close_room_with_namespace(self):
_run(self.pm.close_room('foo', '/bar'))
self.pm._publish.mock.assert_called_once_with(
{'method': 'close_room', 'room': 'foo', 'namespace': '/bar'}
{'method': 'close_room', 'room': 'foo', 'namespace': '/bar',
'host_id': '123456'}
)

def test_handle_emit(self):
Expand Down Expand Up @@ -263,8 +267,7 @@ def test_handle_emit_with_skip_sid(self):
callback=None,
)

def test_handle_emit_with_callback(self):
host_id = self.pm.host_id
def test_handle_emit_with_remote_callback(self):
with mock.patch.object(
asyncio_manager.AsyncManager, 'emit', new=AsyncMock()
) as super_emit:
Expand All @@ -275,7 +278,7 @@ def test_handle_emit_with_callback(self):
'data': 'bar',
'namespace': '/baz',
'callback': ('sid', '/baz', 123),
'host_id': '123456',
'host_id': 'x',
}
)
)
Expand All @@ -291,14 +294,40 @@ def test_handle_emit_with_callback(self):
self.pm._publish.mock.assert_called_once_with(
{
'method': 'callback',
'host_id': host_id,
'host_id': 'x',
'sid': 'sid',
'namespace': '/baz',
'id': 123,
'args': ('one', 2, 'three'),
}
)

def test_handle_emit_with_local_callback(self):
with mock.patch.object(
asyncio_manager.AsyncManager, 'emit', new=AsyncMock()
) as super_emit:
_run(
self.pm._handle_emit(
{
'event': 'foo',
'data': 'bar',
'namespace': '/baz',
'callback': ('sid', '/baz', 123),
'host_id': self.pm.host_id,
}
)
)
assert super_emit.mock.call_count == 1
assert super_emit.mock.call_args[0] == (self.pm, 'foo', 'bar')
assert super_emit.mock.call_args[1]['namespace'] == '/baz'
assert super_emit.mock.call_args[1]['room'] is None
assert super_emit.mock.call_args[1]['skip_sid'] is None
assert isinstance(
super_emit.mock.call_args[1]['callback'], functools.partial
)
_run(super_emit.mock.call_args[1]['callback']('one', 2, 'three'))
self.pm._publish.mock.assert_not_called()

def test_handle_callback(self):
host_id = self.pm.host_id
with mock.patch.object(
Expand Down Expand Up @@ -419,50 +448,66 @@ def test_background_thread(self):
self.pm._handle_callback = AsyncMock()
self.pm._handle_disconnect = AsyncMock()
self.pm._handle_close_room = AsyncMock()
host_id = self.pm.host_id

async def messages():
import pickle

yield {'method': 'emit', 'value': 'foo'}
yield {'missing': 'method'}
yield '{"method": "callback", "value": "bar"}'
yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo'}
yield {'method': 'bogus'}
yield pickle.dumps({'method': 'close_room', 'value': 'baz'})
yield {'method': 'emit', 'value': 'foo', 'host_id': 'x'}
yield {'missing': 'method', 'host_id': 'x'}
yield '{"method": "callback", "value": "bar", "host_id": "x"}'
yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo',
'host_id': 'x'}
yield {'method': 'bogus', 'host_id': 'x'}
yield pickle.dumps({'method': 'close_room', 'value': 'baz',
'host_id': 'x'})
yield 'bad json'
yield b'bad pickled'

# these should not publish anything on the queue, as they come from
# the same host
yield {'method': 'emit', 'value': 'foo', 'host_id': host_id}
yield {'method': 'callback', 'value': 'bar', 'host_id': host_id}
yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo',
'host_id': host_id}
yield pickle.dumps({'method': 'close_room', 'value': 'baz',
'host_id': host_id})
raise asyncio.CancelledError() # force the thread to exit

self.pm._listen = messages
_run(self.pm._thread())

self.pm._handle_emit.mock.assert_called_once_with(
{'method': 'emit', 'value': 'foo'}
{'method': 'emit', 'value': 'foo', 'host_id': 'x'}
)
self.pm._handle_callback.mock.assert_any_call(
{'method': 'callback', 'value': 'bar', 'host_id': 'x'}
)
self.pm._handle_callback.mock.assert_called_once_with(
{'method': 'callback', 'value': 'bar'}
self.pm._handle_callback.mock.assert_any_call(
{'method': 'callback', 'value': 'bar', 'host_id': host_id}
)
self.pm._handle_disconnect.mock.assert_called_once_with(
{'method': 'disconnect', 'sid': '123', 'namespace': '/foo'}
{'method': 'disconnect', 'sid': '123', 'namespace': '/foo',
'host_id': 'x'}
)
self.pm._handle_close_room.mock.assert_called_once_with(
{'method': 'close_room', 'value': 'baz'}
{'method': 'close_room', 'value': 'baz', 'host_id': 'x'}
)

def test_background_thread_exception(self):
self.pm._handle_emit = AsyncMock(side_effect=[ValueError(),
asyncio.CancelledError])

async def messages():
yield {'method': 'emit', 'value': 'foo'}
yield {'method': 'emit', 'value': 'bar'}
yield {'method': 'emit', 'value': 'foo', 'host_id': 'x'}
yield {'method': 'emit', 'value': 'bar', 'host_id': 'x'}

self.pm._listen = messages
_run(self.pm._thread())

self.pm._handle_emit.mock.assert_any_call(
{'method': 'emit', 'value': 'foo'}
{'method': 'emit', 'value': 'foo', 'host_id': 'x'}
)
self.pm._handle_emit.mock.assert_called_with(
{'method': 'emit', 'value': 'bar'}
{'method': 'emit', 'value': 'bar', 'host_id': 'x'}
)
Loading

0 comments on commit dc6e4f5

Please sign in to comment.