diff --git a/src/socketio/admin.py b/src/socketio/admin.py index 58c8aff9..8d6fbcf5 100644 --- a/src/socketio/admin.py +++ b/src/socketio/admin.py @@ -77,13 +77,9 @@ def instrument(self): # track socket connection times self.sio.manager._timestamps = {} - # report socket.io connections - self.sio.manager.__connect = self.sio.manager.connect - self.sio.manager.connect = self._connect - - # report socket.io disconnection - self.sio.manager.__disconnect = self.sio.manager.disconnect - self.sio.manager.disconnect = self._disconnect + # report socket.io connections, disconnections and received events + self.sio.__trigger_event = self.sio._trigger_event + self.sio._trigger_event = self._trigger_event # report join rooms self.sio.manager.__basic_enter_room = \ @@ -99,10 +95,6 @@ def instrument(self): self.sio.manager.__emit = self.sio.manager.emit self.sio.manager.emit = self._emit - # report receive events - self.sio.__handle_event_internal = self.sio._handle_event_internal - self.sio._handle_event_internal = self._handle_event_internal - # report engine.io connections self.sio.eio.on('connect', self._handle_eio_connect) self.sio.eio.on('disconnect', self._handle_eio_disconnect) @@ -128,14 +120,12 @@ def instrument(self): def uninstrument(self): # pragma: no cover if self.mode == 'development': - self.sio.manager.connect = self.sio.manager.__connect - self.sio.manager.disconnect = self.sio.manager.__disconnect + self.sio._trigger_event = self.sio.__trigger_event self.sio.manager.basic_enter_room = \ self.sio.manager.__basic_enter_room self.sio.manager.basic_leave_room = \ self.sio.manager.__basic_leave_room self.sio.manager.emit = self.sio.manager.__emit - self.sio._handle_event_internal = self.sio.__handle_event_internal self.sio.eio._ok = self.sio.eio.__ok from engineio.socket import Socket @@ -205,26 +195,34 @@ def shutdown(self): self.stop_stats_event.set() self.stats_task.join() - def _connect(self, eio_sid, namespace): - sid = self.sio.manager.__connect(eio_sid, namespace) + def _trigger_event(self, event, namespace, *args): t = time.time() - self.sio.manager._timestamps[sid] = t - serialized_socket = self.serialize_socket(sid, namespace, eio_sid) - self.sio.emit('socket_connected', ( - serialized_socket, - datetime.utcfromtimestamp(t).isoformat() + 'Z', - ), namespace=self.admin_namespace) - return sid - - def _disconnect(self, sid, namespace, **kwargs): - del self.sio.manager._timestamps[sid] - self.sio.emit('socket_disconnected', ( - namespace, - sid, - 'N/A', - datetime.utcnow().isoformat() + 'Z', - ), namespace=self.admin_namespace) - return self.sio.manager.__disconnect(sid, namespace, **kwargs) + sid = args[0] + if event == 'connect': + eio_sid = self.sio.manager.eio_sid_from_sid(sid, namespace) + self.sio.manager._timestamps[sid] = t + serialized_socket = self.serialize_socket(sid, namespace, eio_sid) + self.sio.emit('socket_connected', ( + serialized_socket, + datetime.utcfromtimestamp(t).isoformat() + 'Z', + ), namespace=self.admin_namespace) + elif event == 'disconnect': + del self.sio.manager._timestamps[sid] + reason = args[1] + self.sio.emit('socket_disconnected', ( + namespace, + sid, + reason, + datetime.utcfromtimestamp(t).isoformat() + 'Z', + ), namespace=self.admin_namespace) + else: + self.sio.emit('event_received', ( + namespace, + sid, + (event, *args[1:]), + datetime.utcfromtimestamp(t).isoformat() + 'Z', + ), namespace=self.admin_namespace) + return self.sio.__trigger_event(event, namespace, *args) def _check_for_upgrade(self, eio_sid, sid, namespace): # pragma: no cover for _ in range(5): @@ -269,7 +267,7 @@ def _emit(self, event, data, namespace, room=None, skip_sid=None, **kwargs) if namespace != self.admin_namespace: event_data = [event] + list(data) if isinstance(data, tuple) \ - else [data] + else [event, data] if not isinstance(skip_sid, list): # pragma: no branch skip_sid = [skip_sid] for sid, _ in self.sio.manager.get_participants(namespace, room): @@ -282,18 +280,6 @@ def _emit(self, event, data, namespace, room=None, skip_sid=None, ), namespace=self.admin_namespace) return ret - def _handle_event_internal(self, server, sid, eio_sid, data, namespace, - id): - ret = self.sio.__handle_event_internal(server, sid, eio_sid, data, - namespace, id) - self.sio.emit('event_received', ( - namespace, - sid, - data, - datetime.utcnow().isoformat() + 'Z', - ), namespace=self.admin_namespace) - return ret - def _handle_eio_connect(self, eio_sid, environ): if self.stop_stats_event is None: self.stop_stats_event = self.sio.eio.create_event() @@ -303,9 +289,9 @@ def _handle_eio_connect(self, eio_sid, environ): self.event_buffer.push('rawConnection') return self.sio._handle_eio_connect(eio_sid, environ) - def _handle_eio_disconnect(self, eio_sid): + def _handle_eio_disconnect(self, eio_sid, reason): self.event_buffer.push('rawDisconnection') - return self.sio._handle_eio_disconnect(eio_sid) + return self.sio._handle_eio_disconnect(eio_sid, reason) def _eio_http_response(self, packets=None, headers=None, jsonp_index=None): ret = self.sio.eio.__ok(packets=packets, headers=headers, diff --git a/src/socketio/async_admin.py b/src/socketio/async_admin.py index 162c5660..68ea289c 100644 --- a/src/socketio/async_admin.py +++ b/src/socketio/async_admin.py @@ -58,13 +58,9 @@ def instrument(self): # track socket connection times self.sio.manager._timestamps = {} - # report socket.io connections - self.sio.manager.__connect = self.sio.manager.connect - self.sio.manager.connect = self._connect - - # report socket.io disconnection - self.sio.manager.__disconnect = self.sio.manager.disconnect - self.sio.manager.disconnect = self._disconnect + # report socket.io connections, disconnections and received events + self.sio.__trigger_event = self.sio._trigger_event + self.sio._trigger_event = self._trigger_event # report join rooms self.sio.manager.__basic_enter_room = \ @@ -80,10 +76,6 @@ def instrument(self): self.sio.manager.__emit = self.sio.manager.emit self.sio.manager.emit = self._emit - # report receive events - self.sio.__handle_event_internal = self.sio._handle_event_internal - self.sio._handle_event_internal = self._handle_event_internal - # report engine.io connections self.sio.eio.on('connect', self._handle_eio_connect) self.sio.eio.on('disconnect', self._handle_eio_disconnect) @@ -109,14 +101,12 @@ def instrument(self): def uninstrument(self): # pragma: no cover if self.mode == 'development': - self.sio.manager.connect = self.sio.manager.__connect - self.sio.manager.disconnect = self.sio.manager.__disconnect + self.sio._trigger_event = self.sio.__trigger_event self.sio.manager.basic_enter_room = \ self.sio.manager.__basic_enter_room self.sio.manager.basic_leave_room = \ self.sio.manager.__basic_leave_room self.sio.manager.emit = self.sio.manager.__emit - self.sio._handle_event_internal = self.sio.__handle_event_internal self.sio.eio._ok = self.sio.eio.__ok from engineio.async_socket import AsyncSocket @@ -193,26 +183,34 @@ async def shutdown(self): self.stop_stats_event.set() await asyncio.gather(self.stats_task) - async def _connect(self, eio_sid, namespace): - sid = await self.sio.manager.__connect(eio_sid, namespace) + async def _trigger_event(self, event, namespace, *args): t = time.time() - self.sio.manager._timestamps[sid] = t - serialized_socket = self.serialize_socket(sid, namespace, eio_sid) - await self.sio.emit('socket_connected', ( - serialized_socket, - datetime.utcfromtimestamp(t).isoformat() + 'Z', - ), namespace=self.admin_namespace) - return sid - - async def _disconnect(self, sid, namespace, **kwargs): - del self.sio.manager._timestamps[sid] - await self.sio.emit('socket_disconnected', ( - namespace, - sid, - 'N/A', - datetime.utcnow().isoformat() + 'Z', - ), namespace=self.admin_namespace) - return await self.sio.manager.__disconnect(sid, namespace, **kwargs) + sid = args[0] + if event == 'connect': + eio_sid = self.sio.manager.eio_sid_from_sid(sid, namespace) + self.sio.manager._timestamps[sid] = t + serialized_socket = self.serialize_socket(sid, namespace, eio_sid) + await self.sio.emit('socket_connected', ( + serialized_socket, + datetime.utcfromtimestamp(t).isoformat() + 'Z', + ), namespace=self.admin_namespace) + elif event == 'disconnect': + del self.sio.manager._timestamps[sid] + reason = args[1] + await self.sio.emit('socket_disconnected', ( + namespace, + sid, + reason, + datetime.utcfromtimestamp(t).isoformat() + 'Z', + ), namespace=self.admin_namespace) + else: + await self.sio.emit('event_received', ( + namespace, + sid, + (event, *args[1:]), + datetime.utcfromtimestamp(t).isoformat() + 'Z', + ), namespace=self.admin_namespace) + return await self.sio.__trigger_event(event, namespace, *args) async def _check_for_upgrade(self, eio_sid, sid, namespace): # pragma: no cover @@ -258,7 +256,7 @@ async def _emit(self, event, data, namespace, room=None, skip_sid=None, callback=callback, **kwargs) if namespace != self.admin_namespace: event_data = [event] + list(data) if isinstance(data, tuple) \ - else [data] + else [event, data] if not isinstance(skip_sid, list): # pragma: no branch skip_sid = [skip_sid] for sid, _ in self.sio.manager.get_participants(namespace, room): @@ -271,18 +269,6 @@ async def _emit(self, event, data, namespace, room=None, skip_sid=None, ), namespace=self.admin_namespace) return ret - async def _handle_event_internal(self, server, sid, eio_sid, data, - namespace, id): - ret = await self.sio.__handle_event_internal(server, sid, eio_sid, - data, namespace, id) - await self.sio.emit('event_received', ( - namespace, - sid, - data, - datetime.utcnow().isoformat() + 'Z', - ), namespace=self.admin_namespace) - return ret - async def _handle_eio_connect(self, eio_sid, environ): if self.stop_stats_event is None: self.stop_stats_event = self.sio.eio.create_event() @@ -292,9 +278,9 @@ async def _handle_eio_connect(self, eio_sid, environ): self.event_buffer.push('rawConnection') return await self.sio._handle_eio_connect(eio_sid, environ) - async def _handle_eio_disconnect(self, eio_sid): + async def _handle_eio_disconnect(self, eio_sid, reason): self.event_buffer.push('rawDisconnection') - return await self.sio._handle_eio_disconnect(eio_sid) + return await self.sio._handle_eio_disconnect(eio_sid, reason) def _eio_http_response(self, packets=None, headers=None, jsonp_index=None): ret = self.sio.eio.__ok(packets=packets, headers=headers,