Skip to content

Commit

Permalink
Update objecttype to latest bidi api
Browse files Browse the repository at this point in the history
  • Loading branch information
niloc132 committed Aug 2, 2023
1 parent 0ee766b commit 282eb6d
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 98 deletions.
2 changes: 1 addition & 1 deletion src/deephaven/plugin/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import abc
from typing import Union, Type

__version__ = "0.4.0.dev0"
__version__ = "0.5.0.dev0"

DEEPHAVEN_PLUGIN_ENTRY_KEY = "deephaven.plugin"
DEEPHAVEN_PLUGIN_REGISTRATION_CLASS = "registration_cls"
Expand Down
139 changes: 42 additions & 97 deletions src/deephaven/plugin/object.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import abc
from typing import Optional, Union, Type
from typing import Optional, Union, Type, List, Any

from . import Plugin, Registration, register_all_into

Expand All @@ -17,41 +17,42 @@ def index(self) -> int:
May be used in the output stream to refer to the reference from the client."""
return self._index

@property
def type(self) -> Optional[str]:
"""The type."""
return self._type


class Exporter(abc.ABC):
"""The interface for creating new references during ObjectType.to_bytes."""
"""The interface for creating new references during FetchOnlyObjectBase.to_bytes."""

@abc.abstractmethod
def reference(
self, obj: object, allow_unknown_type: bool = False, force_new: bool = False
) -> Optional[Reference]:
"""Gets the reference for object if it has already been created and force_new is False,
otherwise creates a new one. If allow_unknown_type is False, and no type can be found, no
reference will be created."""
def reference(self, obj: object) -> Reference:
"""Creates a reference for an object. Each reference """
pass


class MessageSender(Exporter):
"""The interface for creating references and sending messages for bidirectional communication"""
class MessageStream(abc.ABC):
"""A stream of messages, either sent from server to client or client to server. ObjectType implementations
provide an instance of this interface for each incoming stream to invoke as messages arrive, and will
likewise be given an instance of this interface to be able to send messages to the client.
"""
def __init__(self):
pass

@abc.abstractmethod
def send_message(self, message: bytes) -> None:
"""Sends a message to the client"""
def on_close(self):
"""Closes the stream on both ends. No further messages can be sent or received."""
pass

@abc.abstractmethod
def on_data(self, payload: bytes, references: List[Any]):
"""Transmits data to the remote end of the stream. This can consist of a binary payload and references to
objects on the server.
"""
pass


class ObjectType(Plugin):
"""An object type plugin. Useful for serializing custom objects between the server / client."""

_message_sender: Union[MessageSender, None]

def __init__(self):
self._message_sender = None
pass

@property
@abc.abstractmethod
Expand All @@ -60,96 +61,40 @@ def name(self) -> str:
pass

@abc.abstractmethod
def is_type(self, obj: object) -> bool:
"""Returns true if, and only if, the object is compatible with this object type."""
def is_type(self, obj: Any) -> bool:
"""Returns True if, and only if, the object is compatible with this object type."""
pass

@abc.abstractmethod
def to_bytes(self, exporter: Exporter, obj: object) -> bytes:
"""Serializes object into bytes. Must only be called with a compatible object."""
pass

def supports_bidi_messaging(self, obj: object) -> bool:
"""Checks if an object supports bidirectional messaging
Default implementation checks if the object descends from BidiObjectBase
"""
return isinstance(obj, BidiObjectBase)

def add_message_sender(self, obj: object, sender: MessageSender) -> None:
"""Should NOT be called in Python. Used by server plugin adapter to pass the message sender to the object"""
if isinstance(obj, BidiObjectBase):
obj.add_message_sender(sender)

def remove_message_sender(self, obj: object) -> None:
"""Should NOT be called in Python. Used by server plugin adapter to remove the message sender from the object"""
if isinstance(obj, BidiObjectBase):
obj.remove_message_sender()

def handle_message(self, message: bytes, obj: object, objects: list[object]) -> None:
"""Called when the client sends a message to the plugin.
This default implementation delegates the message to the object the client specified
"""
if isinstance(obj, BidiObjectBase):
obj.handle_message(message, objects)


class BidiObjectBase:
"""Base class for an object which supports bidirectional streaming
Any other implementations must extend this base class so the server knows the object supports bidi communication
class BidirectionObjectType(ObjectType):
"""Base class for an object type that can continue to send responses to the client, or receive requests
from the server even after it is fetched.
"""
_dh_message_sender: Union[MessageSender, None]

def __init__(self):
self._dh_message_sender = None

pass
@abc.abstractmethod
def handle_message(self, message: bytes, objects: list[object]) -> None:
"""Used to handle messages sent by the client to the plugin
def create_client_connection(self, obj: object, connection: MessageStream) -> MessageStream:
"""Signals creation of a client stream to the specified object. The returned MessageStream implementation will
be called with each received message from the client, and can call the provided connection parameter to send
messages as needed to the client.
Args:
message (bytes): The message from the client. Unless the client specified otherwise, utf-8 encoded.
May call decode on the message to get a string representation
objects (list[object]): Any objects the client referenced in the message
Before returning, this method must call connection.on_message with some initial payload, so that the client has
an initial view of the object.
"""
pass

def add_message_sender(self, sender: MessageSender):
"""Adds a message sender to this object."""
self._dh_message_sender = sender

def remove_message_sender(self):
"""Removes the message sender from this object."""
self._dh_message_sender = None

def reference(self, obj) -> Optional[Reference]:
"""Gets the export reference to a specific object.
The reference index can be used in messages to the client to communicate about specific objects.
Calling reference on an object that is not exported will queue it for export.
The queued references will be sent to the client the next time send_message is called

Args:
obj (object): The object to reference
Returns:
The object reference for the object
"""
if self._dh_message_sender:
return self._dh_message_sender.reference(obj)

def send_message(self, message: Union[str, bytes], encoding='utf-8') -> None:
"""Used to send a message to the client
Args:
message (Union[str, bytes]): The message to send to the client. If a string, it will be encoded to bytes
encoding (str): The encoding to use for the message. Defaults to utf-8
"""
if self._dh_message_sender:
message_bytes = message if type(message) == bytes else str(message).encode(encoding=encoding)
self._dh_message_sender.send_message(message_bytes)
class FetchOnlyObjectType(ObjectType):
"""Base class for an object type which will only be fetched once, rather than support streaming requests or
responses.
"""
@abc.abstractmethod
def to_bytes(self, exporter: Exporter, obj: Any) -> bytes:
"""Serializes object into bytes. Must only be called with a compatible object."""
pass


def find_object_type(obj: object) -> Optional[ObjectType]:
def find_object_type(obj: Any) -> Optional[ObjectType]:
class Visitor(Registration.Callback):
def __init__(self) -> None:
self._found = None
Expand Down

0 comments on commit 282eb6d

Please sign in to comment.