From c6018bc01f9c41492673e285ecc1ee38ec9e078c Mon Sep 17 00:00:00 2001 From: David Brochart Date: Fri, 22 Dec 2023 11:36:22 +0100 Subject: [PATCH] Revert PR #170 --- jupyter_collaboration/handlers.py | 3 - jupyter_collaboration/rooms.py | 113 +++----------------------- jupyter_collaboration/utils.py | 9 -- packages/docprovider/src/utils.ts | 9 -- packages/docprovider/src/yprovider.ts | 68 ---------------- tests/test_rooms.py | 45 ---------- tests/utils.py | 11 --- 7 files changed, 13 insertions(+), 245 deletions(-) diff --git a/jupyter_collaboration/handlers.py b/jupyter_collaboration/handlers.py index e543a5f5..5a760b1b 100644 --- a/jupyter_collaboration/handlers.py +++ b/jupyter_collaboration/handlers.py @@ -254,9 +254,6 @@ async def on_message(self, message): ) return skip - if message_type == MessageType.ROOM: - await self.room.handle_msg(message[1:]) - if message_type == MessageType.CHAT: msg = message[2:].decode("utf-8") diff --git a/jupyter_collaboration/rooms.py b/jupyter_collaboration/rooms.py index 543d664e..eb70423c 100644 --- a/jupyter_collaboration/rooms.py +++ b/jupyter_collaboration/rooms.py @@ -4,7 +4,6 @@ from __future__ import annotations import asyncio -import uuid from logging import Logger from typing import Any @@ -12,16 +11,9 @@ from jupyter_ydoc import ydocs as YDOCS from pycrdt_websocket.websocket_server import YRoom from pycrdt_websocket.ystore import BaseYStore, YDocNotFound -from pycrdt_websocket.yutils import write_var_uint from .loaders import FileLoader -from .utils import ( - JUPYTER_COLLABORATION_EVENTS_URI, - LogLevel, - MessageType, - OutOfBandChanges, - RoomMessages, -) +from .utils import JUPYTER_COLLABORATION_EVENTS_URI, LogLevel, OutOfBandChanges YFILE = YDOCS["file"] @@ -53,7 +45,6 @@ def __init__( self._save_delay = save_delay self._update_lock = asyncio.Lock() - self._outofband_lock = asyncio.Lock() self._initialization_lock = asyncio.Lock() self._cleaner: asyncio.Task | None = None self._saving_document: asyncio.Task | None = None @@ -159,41 +150,6 @@ async def initialize(self) -> None: self.ready = True self._emit(LogLevel.INFO, "initialize", "Room initialized") - async def handle_msg(self, data: bytes) -> None: - msg_type = data[0] - msg_id = data[2:].decode() - - # Use a lock to prevent handling responses from multiple clients - # at the same time - async with self._messages[msg_id]: - # Check whether the previous client resolved the conflict - if msg_id not in self._messages: - return - - try: - ans = None - if msg_type == RoomMessages.RELOAD: - # Restore the room with the content from disk - await self._load_document() - ans = RoomMessages.DOC_OVERWRITTEN - - elif msg_type == RoomMessages.OVERWRITE: - # Overwrite the file with content from the room - await self._save_document() - ans = RoomMessages.FILE_OVERWRITTEN - - if ans is not None: - # Remove the lock and broadcast the resolution - self._messages.pop(msg_id) - data = msg_id.encode() - self._outofband_lock.release() - await self._broadcast_msg( - bytes([MessageType.ROOM, ans]) + write_var_uint(len(data)) + data - ) - - except Exception: - return - def _emit(self, level: LogLevel, action: str | None = None, msg: str | None = None) -> None: data = {"level": level.value, "room": self._room_id, "path": self._file.path} if action: @@ -232,24 +188,24 @@ async def _on_content_change(self, event: str, args: dict[str, Any]) -> None: event (str): Type of change. args (dict): A dictionary with format, type, last_modified. """ - if self._outofband_lock.locked(): - return - if event == "metadata" and ( self._last_modified is None or self._last_modified < args["last_modified"] ): self.log.info("Out-of-band changes. Overwriting the content in room %s", self._room_id) self._emit(LogLevel.INFO, "overwrite", "Out-of-band changes. Overwriting the room.") - msg_id = str(uuid.uuid4()) - self._messages[msg_id] = asyncio.Lock() - await self._outofband_lock.acquire() - data = msg_id.encode() - await self._broadcast_msg( - bytes([MessageType.ROOM, RoomMessages.FILE_CHANGED]) - + write_var_uint(len(data)) - + data - ) + try: + model = await self._file.load_content(self._file_format, self._file_type, True) + except Exception as e: + msg = f"Error loading content from file: {self._file.path}\n{e!r}" + self.log.error(msg, exc_info=e) + self._emit(LogLevel.ERROR, None, msg) + return None + + async with self._update_lock: + self._document.source = model["content"] + self._last_modified = model["last_modified"] + self._document.dirty = False def _on_document_change(self, target: str, event: Any) -> None: """ @@ -276,45 +232,6 @@ def _on_document_change(self, target: str, event: Any) -> None: self._saving_document = asyncio.create_task(self._maybe_save_document()) - async def _load_document(self) -> None: - try: - model = await self._file.load_content(self._file_format, self._file_type, True) - except Exception as e: - msg = f"Error loading content from file: {self._file.path}\n{e!r}" - self.log.error(msg, exc_info=e) - self._emit(LogLevel.ERROR, None, msg) - return None - - async with self._update_lock: - self._document.source = model["content"] - self._last_modified = model["last_modified"] - self._document.dirty = False - - async def _save_document(self) -> None: - """ - Saves the content of the document to disk. - """ - try: - self.log.info("Saving the content from room %s", self._room_id) - model = await self._file.save_content( - { - "format": self._file_format, - "type": self._file_type, - "last_modified": self._last_modified, - "content": self._document.source, - } - ) - self._last_modified = model["last_modified"] - async with self._update_lock: - self._document.dirty = False - - self._emit(LogLevel.INFO, "save", "Content saved.") - - except Exception as e: - msg = f"Error saving file: {self._file.path}\n{e!r}" - self.log.error(msg, exc_info=e) - self._emit(LogLevel.ERROR, None, msg) - async def _maybe_save_document(self) -> None: """ Saves the content of the document to disk. @@ -368,10 +285,6 @@ async def _maybe_save_document(self) -> None: self.log.error(msg, exc_info=e) self._emit(LogLevel.ERROR, None, msg) - async def _broadcast_msg(self, msg: bytes) -> None: - for client in self.clients: - await client.send(msg) - class TransientRoom(YRoom): """A Y room for sharing state (e.g. awareness).""" diff --git a/jupyter_collaboration/utils.py b/jupyter_collaboration/utils.py index cf1c7bc8..e6c974cf 100644 --- a/jupyter_collaboration/utils.py +++ b/jupyter_collaboration/utils.py @@ -12,18 +12,9 @@ class MessageType(IntEnum): SYNC = 0 AWARENESS = 1 - ROOM = 124 CHAT = 125 -class RoomMessages(IntEnum): - RELOAD = 0 - OVERWRITE = 1 - FILE_CHANGED = 2 - FILE_OVERWRITTEN = 3 - DOC_OVERWRITTEN = 4 - - class LogLevel(Enum): INFO = "INFO" DEBUG = "DEBUG" diff --git a/packages/docprovider/src/utils.ts b/packages/docprovider/src/utils.ts index 47815952..ff937a76 100644 --- a/packages/docprovider/src/utils.ts +++ b/packages/docprovider/src/utils.ts @@ -4,14 +4,5 @@ |----------------------------------------------------------------------------*/ export enum MessageType { - ROOM = 124, CHAT = 125 } - -export enum RoomMessage { - RELOAD = 0, - OVERWRITE = 1, - FILE_CHANGED = 2, - FILE_OVERWRITTEN = 3, - DOC_OVERWRITTEN = 4 -} diff --git a/packages/docprovider/src/yprovider.ts b/packages/docprovider/src/yprovider.ts index 566b3f0f..4061ebeb 100644 --- a/packages/docprovider/src/yprovider.ts +++ b/packages/docprovider/src/yprovider.ts @@ -13,13 +13,10 @@ import { Signal } from '@lumino/signaling'; import { DocumentChange, YDocument } from '@jupyter/ydoc'; -import * as decoding from 'lib0/decoding'; -import * as encoding from 'lib0/encoding'; import { Awareness } from 'y-protocols/awareness'; import { WebsocketProvider as YWebsocketProvider } from 'y-websocket'; import { requestDocSession } from './requests'; -import { MessageType, RoomMessage } from './utils'; /** * An interface for a document provider. @@ -114,18 +111,6 @@ export class WebSocketProvider implements IDocumentProvider { this._yWebsocketProvider.on('sync', this._onSync); this._yWebsocketProvider.on('connection-close', this._onConnectionClosed); - - this._yWebsocketProvider.messageHandlers[MessageType.ROOM] = ( - encoder, - decoder, - provider, - emitSynced, - messageType - ) => { - const msgType = decoding.readVarUint(decoder); - const data = decoding.readVarString(decoder); - this._handleRoomMessage(msgType, data); - }; } private _onUserChanged(user: User.IManager): void { @@ -153,59 +138,6 @@ export class WebSocketProvider implements IDocumentProvider { } }; - private _handleRoomMessage(type: number, data: string): void { - switch (type) { - case RoomMessage.FILE_CHANGED: - this._handleFileChanged(data); - break; - - case RoomMessage.DOC_OVERWRITTEN: - case RoomMessage.FILE_OVERWRITTEN: - if (this._dialog) { - this._dialog.close(); - this._dialog = null; - } - break; - } - } - - private _handleFileChanged(data: string): void { - this._dialog = new Dialog({ - title: this._trans.__('File changed'), - body: this._trans.__('Do you want to overwrite the file or reload it?'), - buttons: [ - Dialog.okButton({ label: 'Reload' }), - Dialog.warnButton({ label: 'Overwrite' }) - ], - hasClose: false - }); - - this._dialog.launch().then(resp => { - if (resp.button.label === 'Reload') { - this._sendReloadMsg(data); - } else if (resp.button.label === 'Overwrite') { - this._sendOverwriteMsg(data); - } - }); - } - - private _sendReloadMsg(data: string): void { - const encoder = encoding.createEncoder(); - encoding.writeVarUint(encoder, MessageType.ROOM); - encoding.writeVarUint(encoder, RoomMessage.RELOAD); - encoding.writeVarString(encoder, data); - this._yWebsocketProvider?.ws!.send(encoding.toUint8Array(encoder)); - } - - private _sendOverwriteMsg(data: string): void { - const encoder = encoding.createEncoder(); - encoding.writeVarUint(encoder, MessageType.ROOM); - encoding.writeVarUint(encoder, RoomMessage.OVERWRITE); - encoding.writeVarString(encoder, data); - this._yWebsocketProvider?.ws!.send(encoding.toUint8Array(encoder)); - } - - private _dialog: Dialog | null = null; private _awareness: Awareness; private _contentType: string; private _format: string; diff --git a/tests/test_rooms.py b/tests/test_rooms.py index 8f788687..b3f58872 100644 --- a/tests/test_rooms.py +++ b/tests/test_rooms.py @@ -4,12 +4,9 @@ from __future__ import annotations import asyncio -from datetime import datetime from jupyter_ydoc import YUnicode -from .utils import overite_msg, reload_msg - async def test_should_initialize_document_room_without_store(rtc_create_mock_document_room): content = "test" @@ -78,45 +75,3 @@ async def test_undefined_save_delay_should_not_save_content_after_document_chang await asyncio.sleep(0.15) assert "save" not in cm.actions - - -async def test_should_reload_content_from_disk(rtc_create_mock_document_room): - content = "test" - last_modified = datetime.now() - - cm, loader, room = rtc_create_mock_document_room( - "test-id", "test.txt", "whatever", last_modified - ) - - await room.initialize() - - # Make sure the time increases - cm.model["last_modified"] = datetime.fromtimestamp(last_modified.timestamp() + 1) - cm.model["content"] = content - - await loader.notify() - - msg_id = next(iter(room._messages)).encode("utf8") - await room.handle_msg(reload_msg(msg_id)) - - assert room._document.source == content - - -async def test_should_not_reload_content_from_disk(rtc_create_mock_document_room): - content = "test" - last_modified = datetime.now() - - cm, loader, room = rtc_create_mock_document_room("test-id", "test.txt", content, last_modified) - - await room.initialize() - - # Make sure the time increases - cm.model["last_modified"] = datetime.fromtimestamp(last_modified.timestamp() + 1) - cm.model["content"] = "whatever" - - await loader.notify() - - msg_id = list(room._messages.keys())[0].encode("utf8") - await room.handle_msg(overite_msg(msg_id)) - - assert room._document.source == content diff --git a/tests/utils.py b/tests/utils.py index c25e3f29..8114b673 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -7,9 +7,6 @@ from typing import Any from jupyter_server import _tz as tz -from pycrdt_websocket.yutils import write_var_uint - -from jupyter_collaboration.utils import RoomMessages class FakeFileIDManager: @@ -55,11 +52,3 @@ def save_content(self, model: dict[str, Any], path: str) -> dict: class FakeEventLogger: def emit(self, schema_id: str, data: dict) -> None: print(data) - - -def reload_msg(msg_id: str) -> bytearray: - return bytes([RoomMessages.RELOAD]) + write_var_uint(len(msg_id)) + msg_id - - -def overite_msg(msg_id: str) -> bytearray: - return bytes([RoomMessages.OVERWRITE]) + write_var_uint(len(msg_id)) + msg_id