Skip to content

Commit

Permalink
Websocket fixes
Browse files Browse the repository at this point in the history
Mostly no need to record a client in the database anynmore
  • Loading branch information
d-j-hatton committed Sep 13, 2024
1 parent 6b3b350 commit edc3f8e
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 17 deletions.
5 changes: 3 additions & 2 deletions src/murfey/client/multigrid_control.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import logging
import threading
from dataclasses import dataclass, field
Expand Down Expand Up @@ -81,7 +82,7 @@ def __post_init__(self):

self.ws = murfey.client.websocket.WSApp(
server=self.murfey_url,
id=0,
register_client=False,
)

def _start_rsyncer_multigrid(
Expand Down Expand Up @@ -133,7 +134,7 @@ def _start_rsyncer_multigrid(
tag=tag,
limited=limited,
)
self.ws.send({"message": "refresh"})
self.ws.send(json.dumps({"message": "refresh"}))

def _rsyncer_stopped(self, source: Path, explicit_stop: bool = False):
if explicit_stop:
Expand Down
15 changes: 11 additions & 4 deletions src/murfey/client/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
import json
import logging
import queue
import random
import threading
import time
import urllib.parse
import uuid
from typing import Optional

import websocket
Expand All @@ -19,8 +19,10 @@
class WSApp:
environment: MurfeyInstanceEnvironment | None = None

def __init__(self, *, server: str, id: int | None = None):
self.id = random.randint(0, 100) if id is None else id
def __init__(
self, *, server: str, id: int | str | None = None, register_client: bool = True
):
self.id = uuid.uuid4() if id is None else id
log.info(f"Opening websocket connection for Client {self.id}")
websocket.enableTrace(True)
url = urllib.parse.urlparse(server)._replace(scheme="ws", path="")
Expand All @@ -29,8 +31,13 @@ def __init__(self, *, server: str, id: int | None = None):
self._ready = False
self._send_queue: queue.Queue[Optional[str]] = queue.Queue()
self._receive_queue: queue.Queue[Optional[str]] = queue.Queue()
ws_url = (
url._replace(path=f"/ws/test/{self.id}").geturl()
if register_client
else url._replace(path=f"/ws/connect/{self.id}").geturl()
)
self._ws = websocket.WebSocketApp(
url._replace(path=f"/ws/test/{id}").geturl(),
ws_url,
on_close=self.on_close,
on_message=self.on_message,
on_open=self.on_open,
Expand Down
67 changes: 56 additions & 11 deletions src/murfey/server/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,21 @@

class ConnectionManager(Generic[T]):
def __init__(self, state: State[T]):
self.active_connections: Dict[int, WebSocket] = {}
self.active_connections: Dict[int | str, WebSocket] = {}
self._state = state
self._state.subscribe(self._broadcast_state_update)

async def connect(self, websocket: WebSocket, client_id: int):
async def connect(
self, websocket: WebSocket, client_id: int | str, register_client: bool = True
):
await websocket.accept()
self.active_connections[client_id] = websocket
self._register_new_client(client_id)
if register_client:
if not isinstance(client_id, int):
raise ValueError(
"To register a client the client ID must be an integer"
)
self._register_new_client(client_id)
await websocket.send_json({"message": "state-full", "state": self._state.data})

@staticmethod
Expand All @@ -40,15 +47,20 @@ def _register_new_client(client_id: int):
murfey_db.commit()
murfey_db.close()

def disconnect(self, websocket: WebSocket, client_id: int):
def disconnect(
self, websocket: WebSocket, client_id: int | str, unregister_client: bool = True
):
self.active_connections.pop(client_id)
murfey_db = next(get_murfey_db_session())
client_env = murfey_db.exec(
select(ClientEnvironment).where(ClientEnvironment.client_id == client_id)
).one()
murfey_db.delete(client_env)
murfey_db.commit()
murfey_db.close()
if unregister_client:
murfey_db = next(get_murfey_db_session())
client_env = murfey_db.exec(
select(ClientEnvironment).where(
ClientEnvironment.client_id == client_id
)
).one()
murfey_db.delete(client_env)
murfey_db.commit()
murfey_db.close()

async def broadcast(self, message: str):
for connection in self.active_connections:
Expand Down Expand Up @@ -101,6 +113,31 @@ async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.delete_state(f"Client {client_id}")


@ws.websocket("/connect/{client_id}")
async def websocket_connection_endpoint(websocket: WebSocket, client_id: int | str):
await manager.connect(websocket, client_id, register_client=False)
await manager.broadcast(f"Client {client_id} joined")
await manager.set_state(f"Client {client_id}", "joined")
try:
while True:
data = await websocket.receive_text()
try:
json_data = json.loads(data)
if json_data.get("type") == "log": # and isinstance(json_data, dict)
json_data.pop("type")
await forward_log(json_data, websocket)
elif json_data.get("message") == "refresh":
await manager.broadcast(json.dumps(json_data))

except Exception:
await manager.broadcast(f"Client #{client_id} sent message {data}")
except WebSocketDisconnect:
log.info(f"Disconnecting Client {client_id}")
manager.disconnect(websocket, client_id, unregister_client=False)
await manager.broadcast(f"Client #{client_id} disconnected")
await manager.delete_state(f"Client {client_id}")


async def check_connections(active_connections):
log.info("Checking connections")
for connection in active_connections:
Expand Down Expand Up @@ -139,3 +176,11 @@ async def close_ws_connection(client_id: int):
manager.disconnect(manager.active_connections[client_id], client_id)
prom.monitoring_switch.labels(visit=visit_name).set(0)
await manager.broadcast(f"Client #{client_id} disconnected")


@ws.delete("/connect/{client_id}")
async def close_unrecorded_ws_connection(client_id: int | str):
client_id_str = str(client_id).replace("\r\n", "").replace("\n", "")
log.info(f"Disconnecting {client_id_str}")
manager.disconnect(manager.active_connections[client_id], client_id)
await manager.broadcast(f"Client #{client_id} disconnected")

0 comments on commit edc3f8e

Please sign in to comment.