Skip to content

Commit

Permalink
Migrate deephaven/__init__.py to v2 (#2036)
Browse files Browse the repository at this point in the history
* Migrate deephaven/__init__.py to v2
  • Loading branch information
jmao-denver authored Feb 28, 2022
1 parent 2ae657a commit 81a9758
Show file tree
Hide file tree
Showing 6 changed files with 475 additions and 17 deletions.
40 changes: 31 additions & 9 deletions pyintegration/deephaven2/_jcompat.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@
#
""" This module provides Java compatibility support including convenience functions to create some widely used Java
data structures from corresponding Python ones in order to be able to call Java methods. """
from typing import Any, Iterable, Dict, Set

from typing import Any, Iterable, Dict, Set, TypeVar, Callable

import jpy
from deephaven2.dtypes import DType


def is_java_type(obj: Any) -> bool:
""" Returns True if the object is originated in Java. """
"""Returns True if the object is originated in Java."""
return isinstance(obj, jpy.JType)


def j_array_list(values: Iterable = None) -> jpy.JType:
""" Creates a Java ArrayList instance from an iterable. """
"""Creates a Java ArrayList instance from an iterable."""
if values is None:
return None
r = jpy.get_type("java.util.ArrayList")(len(list(values)))
Expand All @@ -24,20 +26,20 @@ def j_array_list(values: Iterable = None) -> jpy.JType:


def j_hashmap(d: Dict = None) -> jpy.JType:
""" Creates a Java HashMap from a dict. """
"""Creates a Java HashMap from a dict."""
if d is None:
return None

r = jpy.get_type("java.util.HashMap")()
for key, value in d.items():
if value is None:
value = ''
value = ""
r.put(key, value)
return r


def j_hashset(s: Set = None) -> jpy.JType:
""" Creates a Java HashSet from a set. """
"""Creates a Java HashSet from a set."""
if s is None:
return None

Expand All @@ -48,19 +50,19 @@ def j_hashset(s: Set = None) -> jpy.JType:


def j_properties(d: Dict = None) -> jpy.JType:
""" Creates a Java Properties from a dict. """
"""Creates a Java Properties from a dict."""
if d is None:
return None
r = jpy.get_type("java.util.Properties")()
for key, value in d.items():
if value is None:
value = ''
value = ""
r.setProperty(key, value)
return r


def j_map_to_dict(m):
""" Converts a java map to a python dictionary. """
"""Converts a java map to a python dictionary."""
if not m:
return None

Expand All @@ -71,3 +73,23 @@ def j_map_to_dict(m):
r[k] = v

return r


T = TypeVar("T")
R = TypeVar("R")


def j_function(func: Callable[[T], R], dtype: DType) -> jpy.JType:
"""Constructs a Java 'Function<PyObject, Object>' implementation from a Python callable or an object with an
'apply' method that accepts a single argument.
Args:
func (Callable): a Python callable or an object with an 'apply' method that accepts a single argument
dtype (DType): the return type of 'func'
Returns:
io.deephaven.integrations.python.PythonFunction instance
"""
return jpy.get_type("io.deephaven.integrations.python.PythonFunction")(
func, dtype.qst_type.clazz()
)
7 changes: 3 additions & 4 deletions pyintegration/deephaven2/numpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@ def _to_column_name(name: str) -> str:
def _column_to_numpy_array(col_def: Column, j_array: jpy.JType) -> np.ndarray:
""" Produces a numpy array from the given Java array and the Table column definition. """
try:
if col_def.data_type in {dtypes.char, dtypes.string}:
return np.array(j_array)

if col_def.data_type == dtypes.DateTime:
if col_def.data_type.is_primitive:
np_array = np.frombuffer(j_array, col_def.data_type.np_type)
elif col_def.data_type == dtypes.DateTime:
longs = _JPrimitiveArrayConversionUtility.translateArrayDateTimeToLong(j_array)
np_long_array = np.frombuffer(longs, np.int64)
np_array = np_long_array.view(col_def.data_type.np_type)
Expand Down
279 changes: 279 additions & 0 deletions pyintegration/deephaven2/table_listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
#
# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
#
""" This module provides utilities for listening to table changes. """
from __future__ import annotations

from abc import ABC, abstractmethod
from inspect import signature
from typing import Callable, Union, Type

import jpy

from deephaven2 import DHError
from deephaven2.table import Table


class TableListenerHandle:
"""A handle for a table listener."""

def __init__(self, t: Table, listener):
"""Creates a new table listener handle.
Args:
t (Table): table being listened to
listener: listener object
"""
self.t = t
self.listener = listener
self.isRegistered = False

def register(self) -> None:
"""Register the listener with the table and listen for updates.
Raises:
RuntimeError
"""

if self.isRegistered:
raise RuntimeError(
"Attempting to register an already registered listener.."
)

self.t.j_table.listenForUpdates(self.listener)

self.isRegistered = True

def deregister(self) -> None:
"""Deregister the listener from the table and stop listening for updates.
Raises:
RuntimeError
"""

if not self.isRegistered:
raise RuntimeError("Attempting to deregister an unregistered listener..")

self.t.j_table.removeUpdateListener(self.listener)
self.isRegistered = False


def _do_locked(f: Callable, lock_type="shared") -> None:
"""Executes a function while holding the UpdateGraphProcessor (UGP) lock. Holding the UGP lock
ensures that the contents of a table will not change during a computation, but holding
the lock also prevents table updates from happening. The lock should be held for as little
time as possible.
Args:
f (Callable): callable to execute while holding the UGP lock, could be function or an object with an 'apply'
attribute which is callable
lock_type (str): UGP lock type, valid values are "exclusive" and "shared". "exclusive" allows only a single
reader or writer to hold the lock. "shared" allows multiple readers or a single writer to hold the lock.
Raises:
ValueError
"""
throwing_runnable = jpy.get_type(
"io.deephaven.integrations.python.PythonThrowingRunnable"
)
update_graph_processor = jpy.get_type(
"io.deephaven.engine.updategraph.UpdateGraphProcessor"
)

if lock_type == "exclusive":
update_graph_processor.DEFAULT.exclusiveLock().doLocked(throwing_runnable(f))
elif lock_type == "shared":
update_graph_processor.DEFAULT.sharedLock().doLocked(throwing_runnable(f))
else:
raise ValueError(f"Unsupported lock type: lock_type={lock_type}")


def _nargs_listener(listener) -> int:
"""Returns the number of arguments the listener takes.
Args:
listener: listener
Returns:
the number of arguments the listener takes
Raises:
ValueError, NotImplementedError
"""

if callable(listener):
f = listener
elif hasattr(listener, "onUpdate"):
f = listener.onUpdate
else:
raise ValueError(
"ShiftObliviousListener is neither callable nor has an 'onUpdate' method"
)

return len(signature(f).parameters)


class TableListener(ABC):
"""An abstract table listener class that should be subclassed by any user Table listener class."""

@abstractmethod
def onUpdate(self, *args, **kwargs) -> None:
...


def listen(
t: Table,
listener: Union[Callable, Type[TableListener]],
description: str = None,
retain: bool = True,
listener_type: str = "auto",
start_listening: bool = True,
replay_initial: bool = False,
lock_type: str = "shared",
) -> TableListenerHandle:
"""Listen to table changes.
Table change events are processed by 'listener', which can be either
(1) a callable (e.g. function) or
(2) an object that provides an "onUpdate" method.
In either case, the method must have one of the following signatures.
* (added, removed, modified): shift_oblivious
* (isReplay, added, removed, modified): shift_oblivious + replay
* (update): shift-aware
* (isReplay, update): shift-aware + replay
For shift-oblivious listeners, 'added', 'removed', and 'modified' are the indices of the rows which changed.
For shift-aware listeners, 'update' is an object that describes the table update.
Listeners that support replaying the initial table snapshot have an additional parameter, 'isReplay', which is
true when replaying the initial snapshot and false during normal updates.
See the Deephaven listener documentation for details on processing update events. This documentation covers the
details of the added, removed, and modified row sets; row shift information; as well as the details of how to apply
the update object. It also has examples on how to access current and previous-tick table values.
Args:
t (Table): table to listen to
listener (Callable): listener for table changes
description (str): description for the UpdatePerformanceTracker to append to the listener's entry description
retain (bool): whether a hard reference to this listener should be maintained to prevent it from being
collected, default is True
listener_type (str): listener type, valid values are "auto", "shift_oblivious", and "shift_aware"
"auto" (default) uses inspection to automatically determine the type of input listener
"shift_oblivious" is for a shift_oblivious listener, which takes three (added, removed, modified) or four
(isReplay, added, removed, modified) arguments
"shift_aware" is for a shift-aware listener, which takes one (update) or two (isReplay, update) arguments
start_listening (bool): True to create the listener and register the listener with the table. The listener will
see updates. False to create the listener, but do not register the listener with the table. The listener
will not see updates. Default is True
replay_initial (bool): True to replay the initial table contents to the listener. False to only listen to new
table changes. To replay the initial image, the listener must support replay. Default is False
lock_type (str): UGP lock type, used when replay_initial=True, valid values are "exclusive" and "shared".
Default is "shared".
Returns:
a TableListenerHandle instance
Raises:
DHError
"""

try:
if replay_initial and not start_listening:
raise ValueError(
"Unable to create listener. Inconsistent arguments. If the initial snapshot is replayed ("
"replay_initial=True), then the listener must be registered to start listening ("
"start_listening=True)."
)

nargs = _nargs_listener(listener)

if listener_type is None or listener_type == "auto":
if nargs == 1 or nargs == 2:
listener_type = "shift_aware"
elif nargs == 3 or nargs == 4:
listener_type = "shift_oblivious"
else:
raise ValueError(
f"Unable to autodetect listener type. ShiftObliviousListener does not take an expected number of "
f"arguments. args={nargs}."
)

if listener_type == "shift_oblivious":
if nargs == 3:
if replay_initial:
raise ValueError(
"ShiftObliviousListener does not support replay: ltype={} nargs={}".format(
listener_type, nargs
)
)

listener_adapter = jpy.get_type(
"io.deephaven.integrations.python.PythonShiftObliviousListenerAdapter"
)
listener_adapter = listener_adapter(
description, t.j_table, retain, listener
)
elif nargs == 4:
listener_adapter = jpy.get_type(
"io.deephaven.integrations.python"
".PythonReplayShiftObliviousListenerAdapter"
)
listener_adapter = listener_adapter(
description, t.j_table, retain, listener
)
else:
raise ValueError(
"Legacy listener must take 3 (added, removed, modified) or 4 (isReplay, added, removed, modified) "
"arguments."
)

elif listener_type == "shift_aware":
if nargs == 1:
if replay_initial:
raise ValueError(
"ShiftObliviousListener does not support replay: ltype={} nargs={}".format(
listener_type, nargs
)
)

listener_adapter = jpy.get_type(
"io.deephaven.integrations.python.PythonListenerAdapter"
)
listener_adapter = listener_adapter(
description, t.j_table, retain, listener
)
elif nargs == 2:
listener_adapter = jpy.get_type(
"io.deephaven.integrations.python.PythonReplayListenerAdapter"
)
listener_adapter = listener_adapter(
description, t.j_table, retain, listener
)
else:
raise ValueError(
"Shift-aware listener must take 1 (update) or 2 (isReplay, update) arguments."
)

else:
raise ValueError(
"Unsupported listener type: ltype={}".format(listener_type)
)

handle = TableListenerHandle(t, listener_adapter)

def start():
if replay_initial:
listener_adapter.replay()

if start_listening:
handle.register()

if replay_initial:
_do_locked(start, lock_type=lock_type)
else:
start()

return handle
except Exception as e:
raise DHError(e, "failed to listen to the table.") from e
Loading

0 comments on commit 81a9758

Please sign in to comment.