Skip to content

Commit

Permalink
Support bidi objecttype plugins exporting new python objects (#4360)
Browse files Browse the repository at this point in the history
Attempts to sidestep #1775 by offering a consistent convention to pass
Python and Java objects between their runtimes - instead of exclusively
"wrapping" for Python and "unwrapping" to return to Java, this
encourages "javaify" to wrap or unwrap as necessary to to pass to Java,
and "pythonify" to unwrap or wrap as necessary to pass to Python. This
way, PyObjects can be avoided being passed to Java directly.

This new wrapper is a liveness node, to ensure that the surrounding
liveness scope always takes ownership on creation, even if the calling
Java code isn't aware that it has received a resource that needs to be
managed.

Fixes #4338
Co-authored-by: Ryan Caudy <ryan@deephaven.io>
  • Loading branch information
niloc132 authored Sep 6, 2023
1 parent 51f82f1 commit 46b96f6
Show file tree
Hide file tree
Showing 12 changed files with 251 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ private Object maybeUnwrap(PyObject o) {
if (o == null) {
return null;
}
final Object javaObject = module.unwrap_to_java_type(o);
final Object javaObject = module.javaify(o);
if (javaObject != null) {
return javaObject;
}
Expand Down Expand Up @@ -309,7 +309,7 @@ public String scriptType() {
public Object unwrapObject(Object object) {
if (object instanceof PyObject) {
final PyObject pyObject = (PyObject) object;
final Object unwrapped = module.unwrap_to_java_type(pyObject);
final Object unwrapped = module.javaify(pyObject);
if (unwrapped != null) {
return unwrapped;
}
Expand All @@ -321,7 +321,7 @@ public Object unwrapObject(Object object) {
interface PythonScriptSessionModule extends Closeable {
PyObject create_change_list(PyObject from, PyObject to);

Object unwrap_to_java_type(PyObject object);
Object javaify(PyObject object);

void close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import io.deephaven.util.Utils;
import io.deephaven.util.annotations.VisibleForTesting;
import io.deephaven.util.referencecounting.ReferenceCounted;
import org.jetbrains.annotations.NotNull;

import java.lang.ref.WeakReference;
Expand All @@ -14,7 +13,7 @@
/**
* {@link LivenessNode} implementation that relies on reference counting to determine its liveness.
*/
public abstract class ReferenceCountedLivenessNode extends ReferenceCounted implements LivenessNode {
public abstract class ReferenceCountedLivenessNode extends ReferenceCountedLivenessReferent implements LivenessNode {

final boolean enforceStrongReachability;

Expand Down Expand Up @@ -46,28 +45,6 @@ public final void initializeTransientFieldsForLiveness() {
}
}

@Override
public final boolean tryRetainReference() {
if (Liveness.REFERENCE_TRACKING_DISABLED) {
return true;
}
return tryIncrementReferenceCount();
}

@Override
public final void dropReference() {
if (Liveness.REFERENCE_TRACKING_DISABLED) {
return;
}
if (Liveness.DEBUG_MODE_ENABLED) {
Liveness.log.info().append("LivenessDebug: Releasing ").append(Utils.REFERENT_FORMATTER, this).endl();
}
if (!tryDecrementReferenceCount()) {
throw new LivenessStateException(
getReferentDescription() + " could not be released as it was no longer live");
}
}

@Override
public WeakReference<? extends LivenessReferent> getWeakReference() {
return tracker;
Expand Down Expand Up @@ -128,26 +105,9 @@ public final boolean tryUnmanage(@NotNull final Stream<? extends LivenessReferen
return true;
}

/**
* Attempt to release (destructively when necessary) resources held by this object. This may render the object
* unusable for subsequent operations. Implementations should be sure to call super.destroy().
* <p>
* This is intended to only ever be used as a side effect of decreasing the reference count to 0.
*/
protected void destroy() {}

@Override
protected final void onReferenceCountAtZero() {
if (Liveness.REFERENCE_TRACKING_DISABLED) {
throw new IllegalStateException(
"Reference count on " + this + " reached zero while liveness reference tracking is disabled");
}
try {
destroy();
} catch (Exception e) {
Liveness.log.warn().append("Exception while destroying ").append(Utils.REFERENT_FORMATTER, this)
.append(" after reference count reached zero: ").append(e).endl();
}
public final void onReferenceCountAtZero() {
super.onReferenceCountAtZero();
tracker.ensureReferencesDropped();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package io.deephaven.engine.liveness;

import io.deephaven.util.Utils;
import io.deephaven.util.referencecounting.ReferenceCounted;

import java.lang.ref.WeakReference;

/**
* {@link LivenessReferent} implementation that relies on reference counting to determine its liveness.
*/
public class ReferenceCountedLivenessReferent extends ReferenceCounted implements LivenessReferent {

public final boolean tryRetainReference() {
if (Liveness.REFERENCE_TRACKING_DISABLED) {
return true;
}
return tryIncrementReferenceCount();
}

public final void dropReference() {
if (Liveness.REFERENCE_TRACKING_DISABLED) {
return;
}
if (Liveness.DEBUG_MODE_ENABLED) {
Liveness.log.info().append("LivenessDebug: Releasing ").append(Utils.REFERENT_FORMATTER, this).endl();
}
if (!tryDecrementReferenceCount()) {
throw new LivenessStateException(
getReferentDescription() + " could not be released as it was no longer live");
}
}

@Override
public WeakReference<? extends LivenessReferent> getWeakReference() {
return new WeakReference<>(this);
}

/**
* Attempt to release (destructively when necessary) resources held by this object. This may render the object
* unusable for subsequent operations. Implementations should be sure to call super.destroy().
* <p>
* This is intended to only ever be used as a side effect of decreasing the reference count to 0.
*/
protected void destroy() {}

@Override
protected void onReferenceCountAtZero() {
if (Liveness.REFERENCE_TRACKING_DISABLED) {
throw new IllegalStateException(
"Reference count on " + this + " reached zero while liveness reference tracking is disabled");
}
try {
destroy();
} catch (Exception e) {
Liveness.log.warn().append("Exception while destroying ").append(Utils.REFERENT_FORMATTER, this)
.append(" after reference count reached zero: ").append(e).endl();
}
}
}
45 changes: 45 additions & 0 deletions py/server/deephaven/_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
_di_wrapper_classes: Set[JObjectWrapper] = set()
_has_all_wrappers_imported = False

JLivePyObjectWrapper = jpy.get_type('io.deephaven.server.plugin.python.LivePyObjectWrapper')


def _recursive_import(package_path: str) -> None:
""" Recursively import every module in a package. """
Expand Down Expand Up @@ -117,6 +119,49 @@ def _lookup_wrapped_class(j_obj: jpy.JType) -> Optional[type]:
return None


def javaify(obj: Any) -> Optional[jpy.JType]:
"""
Returns an object that is safe to pass to Java. Callers should take care to ensure that this happens
in a liveness scope that reflects the lifetime of the reference to be passed to Java.
The implementation will return a Java object that can be passed over jpy as a java.lang.Object. An
existing java.lang.Object passed in to this method will be returned as-is, a JObjectWrapper will be
unwrapped to return its underlying Java object, and anything else will be wrapped in a LivePyObjectWrapper.
https://github.com/deephaven/deephaven-core/issues/1775
"""
if obj is None:
return None
if isinstance(obj, JObjectWrapper):
return obj.j_object
if isinstance(obj, jpy.JType):
return obj
# We must return a java object, so wrap in a PyObjectLivenessNode so that the server's liveness tracking
# will correctly notify python that the object was released
return JLivePyObjectWrapper(obj)


def pythonify(j_obj: Any) -> Optional[Any]:
"""
Reciprocal of javaify, returns an object that is safe to be used in Python after being passed
from Java.
The implementation will return a Python object both when a Python object is passed in, or if a
LivePyObjectWrapper was passed in. Otherwise, delegates to wrap_j_object to attempt to wrap the
Java object, and if no wrapper is known, returns the Java object itself.
Where possible, when passing a python object or wrapper from Java, unwrap from LivePyObjectWrapper
to PyObject to avoid excess JNI/GIL overhead.
"""
if not isinstance(j_obj, jpy.JType):
return j_obj
# Definitely a JType, check if it is a LivePyObjectWrapper
if j_obj.jclass == JLivePyObjectWrapper.jclass:
return j_obj.getPythonObject()
# Vanilla Java object, see if we have explicit wrapping for it
return wrap_j_object(j_obj)


def wrap_j_object(j_obj: jpy.JType) -> Union[JObjectWrapper, jpy.JType]:
""" Wraps the specified Java object as an instance of a custom wrapper class if one is available, otherwise returns
the raw Java object. """
Expand Down
8 changes: 4 additions & 4 deletions py/server/deephaven/appmode.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import jpy

from deephaven import DHError
from deephaven._wrapper import JObjectWrapper, wrap_j_object, unwrap
from deephaven._wrapper import JObjectWrapper, pythonify, javaify

_JApplicationContext = jpy.get_type("io.deephaven.appmode.ApplicationContext")
_JApplicationState = jpy.get_type("io.deephaven.appmode.ApplicationState")
Expand Down Expand Up @@ -36,11 +36,11 @@ def __getitem__(self, item):
j_field = self.j_app_state.getField(item)
if not j_field:
raise KeyError(item)
return wrap_j_object(j_field.value())
return pythonify(j_field.value())

def __setitem__(self, key, value):
key = str(key)
self.j_app_state.setField(key, unwrap(value))
self.j_app_state.setField(key, javaify(value))

def __delitem__(self, key):
key = str(key)
Expand All @@ -54,7 +54,7 @@ def fields(self) -> Dict[str, object]:
j_fields = self.j_app_state.listFields()
for i in range(j_fields.size()):
j_field = j_fields.get(i)
fields[j_field.name()] = wrap_j_object(j_field.value())
fields[j_field.name()] = pythonify(j_field.value())

return fields

Expand Down
34 changes: 15 additions & 19 deletions py/server/deephaven_internal/plugin/object/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,29 @@

from typing import Optional, List, Any
from deephaven.plugin.object_type import Exporter, ObjectType, Reference, MessageStream, FetchOnlyObjectType
from deephaven._wrapper import JObjectWrapper, wrap_j_object
from deephaven._wrapper import pythonify, javaify
from deephaven.liveness_scope import liveness_scope

JReference = jpy.get_type('io.deephaven.plugin.type.Exporter$Reference')
JExporterAdapter = jpy.get_type('io.deephaven.server.plugin.python.ExporterAdapter')
JMessageStream = jpy.get_type('io.deephaven.plugin.type.ObjectType$MessageStream')
JPyObjectRefCountedNode = jpy.get_type('io.deephaven.server.plugin.python.LivePyObjectWrapper')


def _adapt_reference(ref: JReference) -> Reference:
return Reference(ref.index(), ref.type().orElse(None))


def _unwrap(object):
# todo: we should have generic unwrapping code ABC
if isinstance(object, JObjectWrapper):
return object.j_object
return object


class ExporterAdapter(Exporter):
"""Python implementation of Exporter that delegates to its Java counterpart."""

def __init__(self, exporter: JExporterAdapter):
self._exporter = exporter

def reference(self, obj: Any, allow_unknown_type: bool = True, force_new: bool = True) -> Optional[Reference]:
obj = _unwrap(obj)
if isinstance(obj, jpy.JType):
ref = self._exporter.reference(obj, allow_unknown_type, force_new)
else:
ref = self._exporter.referencePyObject(obj, allow_unknown_type, force_new)
# No liveness scope required here, this must be called from the same thread as the call from gRPC
obj = javaify(obj)
ref = self._exporter.reference(obj, allow_unknown_type, force_new)
return _adapt_reference(ref) if ref else None

def __str__(self):
Expand All @@ -48,7 +41,10 @@ def __init__(self, wrapped: JMessageStream):
self._wrapped = wrapped

def on_data(self, payload: bytes, references: List[Any]) -> None:
self._wrapped.onData(payload, [_unwrap(ref) for ref in references])
# Perform this in a single liveness scope to ensure we safely create PyObjectRefCountedNodes
# and pass them off to Java, which now owns them
with liveness_scope():
self._wrapped.onData(payload, [javaify(ref) for ref in references])

def on_close(self) -> None:
self._wrapped.onClose()
Expand All @@ -60,8 +56,8 @@ class ServerRequestStreamAdapter(MessageStream):
def __init__(self, wrapped: MessageStream):
self._wrapped = wrapped

def on_data(self, payload:bytes, references: List[Any]) -> None:
self._wrapped.on_data(payload, [wrap_j_object(ref) for ref in references])
def on_data(self, payload: bytes, references: List[Any]) -> None:
self._wrapped.on_data(payload, [pythonify(ref) for ref in references])

def on_close(self) -> None:
self._wrapped.on_close()
Expand All @@ -74,17 +70,17 @@ def __init__(self, user_object_type: ObjectType):
self._user_object_type = user_object_type

def is_type(self, obj) -> bool:
return self._user_object_type.is_type(obj)
return self._user_object_type.is_type(pythonify(obj))

def is_fetch_only(self) -> bool:
return isinstance(self._user_object_type, FetchOnlyObjectType)

def to_bytes(self, exporter: JExporterAdapter, obj: Any) -> bytes:
return self._user_object_type.to_bytes(ExporterAdapter(exporter), obj)
return self._user_object_type.to_bytes(ExporterAdapter(exporter), pythonify(obj))

def create_client_connection(self, obj: Any, connection: JMessageStream) -> MessageStream:
return ServerRequestStreamAdapter(
self._user_object_type.create_client_connection(obj, ClientResponseStreamAdapter(connection))
self._user_object_type.create_client_connection(pythonify(obj), ClientResponseStreamAdapter(connection))
)

def __str__(self):
Expand Down
15 changes: 5 additions & 10 deletions py/server/deephaven_internal/script_session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# Implementation utilities for io.deephaven.engine.util.PythonDeephavenSession
from jpy import JType

from deephaven._wrapper import JObjectWrapper
from deephaven import _wrapper


def create_change_list(from_snapshot, to_snapshot):
Expand Down Expand Up @@ -36,17 +36,12 @@ def make_change_item(name, existing_value, new_value):
return name, existing_value, new_value


def unwrap_to_java_type(object):
def javaify(obj) -> JType:
"""
Returns a JType object if the object is already a JType, or if the object can be unwrapped into a JType object;
otherwise, returns None.
otherwise, wraps the object in a LivenessArtifact to ensure it is freed in Python correctly.
:param object: the object to be unwrapped
:return: the JType object, or None
:return: the JType object
"""
if isinstance(object, JType):
return object
if isinstance(object, JObjectWrapper):
return object.j_object
# add more here when/if necessary
return None
return _wrapper.javaify(obj)
Loading

0 comments on commit 46b96f6

Please sign in to comment.