From 9776e1d2dcf4ed175ac784a7ee316c7ef288e53e Mon Sep 17 00:00:00 2001 From: Jianfeng Mao <4297243+jmao-denver@users.noreply.github.com> Date: Mon, 15 Jul 2024 14:25:05 -0600 Subject: [PATCH] feat(api)!: Support merged listening on multiple tables (#5672) Fixes #5647 --------- Co-authored-by: Chip Kent <5250374+chipkent@users.noreply.github.com> Co-authored-by: Ryan Caudy --- .../python/PythonMergedListenerAdapter.java | 96 ++++++ .../integrations/python/PythonUtils.java | 12 + .../engine/table/impl/MergedListener.java | 5 + py/server/deephaven/table_listener.py | 323 +++++++++++++----- py/server/tests/test_table_listener.py | 165 ++++++++- py/server/tests/test_udf_scalar_args.py | 6 +- 6 files changed, 519 insertions(+), 88 deletions(-) create mode 100644 Integrations/src/main/java/io/deephaven/integrations/python/PythonMergedListenerAdapter.java diff --git a/Integrations/src/main/java/io/deephaven/integrations/python/PythonMergedListenerAdapter.java b/Integrations/src/main/java/io/deephaven/integrations/python/PythonMergedListenerAdapter.java new file mode 100644 index 00000000000..215aec0b81a --- /dev/null +++ b/Integrations/src/main/java/io/deephaven/integrations/python/PythonMergedListenerAdapter.java @@ -0,0 +1,96 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.integrations.python; + +import io.deephaven.engine.context.ExecutionContext; +import io.deephaven.engine.rowset.RowSetFactory; +import io.deephaven.engine.rowset.RowSetShiftData; +import io.deephaven.engine.table.ModifiedColumnSet; +import io.deephaven.engine.table.TableUpdate; +import io.deephaven.engine.table.impl.ListenerRecorder; +import io.deephaven.engine.table.impl.MergedListener; +import io.deephaven.engine.table.impl.TableUpdateImpl; +import io.deephaven.engine.updategraph.NotificationQueue; +import io.deephaven.engine.updategraph.UpdateGraph; +import io.deephaven.util.SafeCloseable; +import io.deephaven.util.annotations.ScriptApi; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.jpy.PyObject; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Objects; +import java.util.stream.Stream; + +/** + * A Deephaven merged listener which fires when any of its bound listener recorders has updates and all of its + * dependencies have been satisfied. The listener then invokes the Python listener object. + * + * The Python listener object must be a Python MergedListener instance that provides a "_process" method implementation + * with no argument. + */ +@ScriptApi +public class PythonMergedListenerAdapter extends MergedListener { + private final PyObject pyCallable; + + /** + * Create a Python merged listener. + * + * @param recorders The listener recorders to which this listener will subscribe. + * @param dependencies The tables that must be satisfied before this listener is executed. + * @param listenerDescription A description for the UpdatePerformanceTracker to append to its entry description, may + * be null. + * @param pyObjectIn Python listener object. + */ + private PythonMergedListenerAdapter( + @NotNull ListenerRecorder[] recorders, + @Nullable NotificationQueue.Dependency[] dependencies, + @Nullable String listenerDescription, + @NotNull PyObject pyObjectIn) { + super(Arrays.asList(recorders), Arrays.asList(dependencies), listenerDescription, null); + Arrays.stream(recorders).forEach(rec -> rec.setMergedListener(this)); + this.pyCallable = PythonUtils.pyMergeListenerFunc(pyObjectIn); + } + + public static PythonMergedListenerAdapter create( + @NotNull ListenerRecorder[] recorders, + @Nullable NotificationQueue.Dependency[] dependencies, + @Nullable String listenerDescription, + @NotNull PyObject pyObjectIn) { + if (recorders.length < 2) { + throw new IllegalArgumentException("At least two listener recorders must be provided"); + } + + final NotificationQueue.Dependency[] allItems = + Stream.concat(Arrays.stream(recorders), Arrays.stream(dependencies)) + .filter(Objects::nonNull) + .toArray(NotificationQueue.Dependency[]::new); + + final UpdateGraph updateGraph = allItems[0].getUpdateGraph(allItems); + + try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(updateGraph).open()) { + return new PythonMergedListenerAdapter(recorders, dependencies, listenerDescription, pyObjectIn); + } + } + + public ArrayList currentRowsAsUpdates() { + final ArrayList updates = new ArrayList<>(); + for (ListenerRecorder recorder : getRecorders()) { + final TableUpdate update = new TableUpdateImpl( + recorder.getParent().getRowSet().copy(), + RowSetFactory.empty(), + RowSetFactory.empty(), + RowSetShiftData.EMPTY, + ModifiedColumnSet.EMPTY); + updates.add(update); + } + return updates; + } + + @Override + protected void process() { + pyCallable.call("__call__"); + } +} diff --git a/Integrations/src/main/java/io/deephaven/integrations/python/PythonUtils.java b/Integrations/src/main/java/io/deephaven/integrations/python/PythonUtils.java index 5d3d861fa21..6ab80722f61 100644 --- a/Integrations/src/main/java/io/deephaven/integrations/python/PythonUtils.java +++ b/Integrations/src/main/java/io/deephaven/integrations/python/PythonUtils.java @@ -23,6 +23,18 @@ static PyObject pyListenerFunc(final PyObject pyObject) { return pyCallable(pyObject, "on_update"); } + /** + * Gets the python function that should be called by a merged listener. The input can be either (1) a callable or + * (2) an object which provides an "_process" method. + * + * @param pyObject python listener object. This should either be a callable or an object which provides an + * "_process" method. + * @return python function that should be called by a merged listener. + * @throws IllegalArgumentException python listener object is not a valid listener. + */ + static PyObject pyMergeListenerFunc(final PyObject pyObject) { + return pyCallable(pyObject, "_process"); + } /** * Creates a callable PyObject, either using method.apply() or __call__(), if the pyObjectIn has such methods diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/MergedListener.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/MergedListener.java index fd5a723736e..3c7e63883aa 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/MergedListener.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/MergedListener.java @@ -48,6 +48,7 @@ public abstract class MergedListener extends LivenessArtifact implements Notific AtomicLongFieldUpdater.newUpdater(MergedListener.class, "lastCompletedStep"); private final UpdateGraph updateGraph; + private final Iterable recorders; private final Iterable dependencies; private final String listenerDescription; @@ -91,6 +92,10 @@ public UpdateGraph getUpdateGraph() { return updateGraph; } + protected Iterable getRecorders() { + return recorders; + } + public final void notifyOnUpstreamError( @NotNull final Throwable upstreamError, @Nullable final TableListener.Entry errorSourceEntry) { notifyInternal(upstreamError, errorSourceEntry); diff --git a/py/server/deephaven/table_listener.py b/py/server/deephaven/table_listener.py index c7db8af827e..b8d406673d7 100644 --- a/py/server/deephaven/table_listener.py +++ b/py/server/deephaven/table_listener.py @@ -6,7 +6,7 @@ from abc import ABC, abstractmethod from functools import wraps from inspect import signature -from typing import Callable, Union, List, Generator, Dict, Literal, Sequence +from typing import Callable, Union, List, Generator, Dict, Literal, Sequence, Optional import jpy import numpy @@ -14,13 +14,15 @@ from deephaven import DHError from deephaven import update_graph from deephaven._wrapper import JObjectWrapper -from deephaven.jcompat import to_sequence +from deephaven.jcompat import to_sequence, j_list_to_list from deephaven.table import Table -from deephaven._table_reader import _table_reader_chunk_dict, _table_reader_all_dict +from deephaven._table_reader import _table_reader_all_dict, _table_reader_chunk_dict from deephaven.update_graph import UpdateGraph _JPythonReplayListenerAdapter = jpy.get_type("io.deephaven.integrations.python.PythonReplayListenerAdapter") _JTableUpdate = jpy.get_type("io.deephaven.engine.table.TableUpdate") +_JListenerRecorder = jpy.get_type("io.deephaven.engine.table.impl.ListenerRecorder") +_JPythonMergedListenerAdapter = jpy.get_type("io.deephaven.integrations.python.PythonMergedListenerAdapter") class TableUpdate(JObjectWrapper): """A TableUpdate object represents a table update event. It contains the added, removed, and modified rows in the @@ -30,6 +32,8 @@ class TableUpdate(JObjectWrapper): def __init__(self, table: Table, j_table_update: jpy.JType): self.table = table self.j_table_update = j_table_update + # make sure we always use the _JTableUpdate interface and not the implementations + self.j_table_update = jpy.cast(j_table_update, _JTableUpdate) @property def j_object(self) -> jpy.JType: @@ -45,10 +49,10 @@ def added(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray]: Returns: a dict """ - if not self.j_table_update.added: + if not (added := self.j_table_update.added()): return {} - return _table_reader_all_dict(table=self.table, cols=cols, row_set= self.j_table_update.added.asRowSet(), + return _table_reader_all_dict(table=self.table, cols=cols, row_set= added.asRowSet(), prev=False, to_numpy=True) def added_chunks(self, chunk_size: int, cols: Union[str, List[str]] = None) -> Generator[ @@ -63,10 +67,10 @@ def added_chunks(self, chunk_size: int, cols: Union[str, List[str]] = None) -> G Returns: a generator """ - if not self.j_table_update.added: + if not (added := self.j_table_update.added()): return (_ for _ in ()) - return _table_reader_chunk_dict(table=self.table, cols=cols, row_set=self.j_table_update.added.asRowSet(), + return _table_reader_chunk_dict(table=self.table, cols=cols, row_set=added.asRowSet(), chunk_size=chunk_size, prev=False) def removed(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray]: @@ -79,10 +83,10 @@ def removed(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray Returns: a dict """ - if not self.j_table_update.removed: + if not (removed := self.j_table_update.removed()): return {} - return _table_reader_all_dict(table=self.table, cols=cols, row_set=self.j_table_update.removed.asRowSet(), + return _table_reader_all_dict(table=self.table, cols=cols, row_set=removed.asRowSet(), prev=True) def removed_chunks(self, chunk_size: int, cols: Union[str, List[str]] = None) -> Generator[ @@ -97,10 +101,10 @@ def removed_chunks(self, chunk_size: int, cols: Union[str, List[str]] = None) -> Returns: a generator """ - if not self.j_table_update.removed: + if not (removed := self.j_table_update.removed()): return (_ for _ in ()) - return _table_reader_chunk_dict(table=self.table, cols=cols, row_set=self.j_table_update.removed.asRowSet(), + return _table_reader_chunk_dict(table=self.table, cols=cols, row_set=removed.asRowSet(), chunk_size=chunk_size, prev=True) def modified(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray]: @@ -113,10 +117,10 @@ def modified(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarra Returns: a dict """ - if not self.j_table_update.modified: + if not (modified := self.j_table_update.modified()): return {} - return _table_reader_all_dict(table=self.table, cols=cols, row_set=self.j_table_update.modified.asRowSet(), + return _table_reader_all_dict(table=self.table, cols=cols, row_set=modified.asRowSet(), prev=False, to_numpy=True) def modified_chunks(self, chunk_size: int, cols: Union[str, List[str]] = None) -> Generator[ @@ -131,10 +135,10 @@ def modified_chunks(self, chunk_size: int, cols: Union[str, List[str]] = None) - Returns: a generator """ - if not self.j_table_update.modified: + if not (modified := self.j_table_update.modified()): return (_ for _ in ()) - return _table_reader_chunk_dict(self.table, cols=cols, row_set=self.j_table_update.modified.asRowSet(), + return _table_reader_chunk_dict(self.table, cols=cols, row_set=modified.asRowSet(), chunk_size=chunk_size, prev=False) def modified_prev(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray]: @@ -147,10 +151,10 @@ def modified_prev(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.n Returns: a dict """ - if not self.j_table_update.modified: + if not (modified := self.j_table_update.modified()): return {} - return _table_reader_all_dict(table=self.table, cols=cols, row_set=self.j_table_update.modified.asRowSet(), + return _table_reader_all_dict(table=self.table, cols=cols, row_set=modified.asRowSet(), prev=True, to_numpy=True) def modified_prev_chunks(self, chunk_size: int, cols: Union[str, List[str]] = None) -> Generator[ @@ -165,10 +169,10 @@ def modified_prev_chunks(self, chunk_size: int, cols: Union[str, List[str]] = No Returns: a generator """ - if not self.j_table_update.modified: + if not (modified := self.j_table_update.modified()): return (_ for _ in ()) - return _table_reader_chunk_dict(self.table, cols=cols, row_set=self.j_table_update.modified.asRowSet(), + return _table_reader_chunk_dict(self.table, cols=cols, row_set=modified.asRowSet(), chunk_size=chunk_size, prev=True) @property @@ -178,40 +182,11 @@ def shifted(self): @property def modified_columns(self) -> List[str]: """The list of modified columns in this update.""" - cols = self.j_table_update.modifiedColumnSet.dirtyColumnNames() + cols = self.j_table_update.modifiedColumnSet().dirtyColumnNames() return list(cols) if cols else [] -def _do_locked(ug: Union[UpdateGraph, Table], f: Callable, lock_type: Literal["shared","exclusive"] = "shared") -> \ - None: - """Executes a function while holding the UpdateGraph (UG) lock. Holding the UG lock - ensures that the contents of a table will not change during a computation, but holding - the lock also prevents table updates from happening. The lock should be held for as little - time as possible. - - Args: - ug (Union[UpdateGraph, Table]): The Update Graph (UG) or a table-like object. - f (Callable): callable to execute while holding the UG lock, could be function or an object with an 'apply' - attribute which is callable - lock_type (str): UG lock type, valid values are "exclusive" and "shared". "exclusive" allows only a single - reader or writer to hold the lock. "shared" allows multiple readers or a single writer to hold the lock. - Raises: - ValueError - """ - if isinstance(ug, Table): - ug = ug.update_graph - - if lock_type == "exclusive": - with update_graph.exclusive_lock(ug): - f() - elif lock_type == "shared": - with update_graph.shared_lock(ug): - f() - else: - raise ValueError(f"Unsupported lock type: lock_type={lock_type}") - - class TableListener(ABC): """An abstract table listener class that should be subclassed by any user table listener class.""" @@ -228,7 +203,7 @@ def _listener_wrapper(table: Table): table (Table): the table to listen for updates. """ - def decorator(listener: Callable): + def decorator(listener: Callable[[TableUpdate, bool], None]): @wraps(listener) def wrapper(update, *args): t_update = TableUpdate(table=table, j_table_update=update) @@ -239,7 +214,7 @@ def wrapper(update, *args): return decorator -def _wrap_listener_func(t: Table, listener: Callable): +def _wrap_listener_func(t: Table, listener: Callable[[TableUpdate, bool], None]): n_params = len(signature(listener).parameters) if n_params != 2: raise ValueError("listener function must have 2 (update, is_replay) parameters.") @@ -258,7 +233,7 @@ class TableListenerHandle(JObjectWrapper): """A handle to manage a table listener's lifecycle.""" j_object_type = _JPythonReplayListenerAdapter - def __init__(self, t: Table, listener: Union[Callable, TableListener], description: str = None, + def __init__(self, t: Table, listener: Union[Callable[[TableUpdate, bool], None], TableListener], description: str = None, dependencies: Union[Table, Sequence[Table]] = None): """Creates a new table listener handle with dependencies. @@ -269,14 +244,14 @@ def __init__(self, t: Table, listener: Union[Callable, TableListener], descripti The callable or the on_update method must have the following signatures. * (update: TableUpdate, is_replay: bool): support replaying the initial table snapshot and normal table updates The 'update' parameter is an object that describes the table update; - The 'is_replay' parameter is used only by replay listeners, it is set to 'true' when replaying the initial - snapshot and 'false' during normal updates. + The 'is_replay' parameter is used only by replay listeners, it is set to 'True' when replaying the initial + snapshot and 'False' during normal updates. Note: Don't do table operations in the listener. Do them beforehand, and add the results as dependencies. Args: t (Table): table to listen to - listener (Union[Callable, TableListener]): listener for table changes + listener (Union[Callable[[TableUpdate, bool], None], TableListener]): listener for table changes description (str, optional): description for the UpdatePerformanceTracker to append to the listener's entry description, default is None dependencies (Union[Table, Sequence[Table]]): tables that must be satisfied before the listener's execution. @@ -294,33 +269,35 @@ def __init__(self, t: Table, listener: Union[Callable, TableListener], descripti Raises: DHError """ + if not t.is_refreshing: + raise DHError(message="table must be a refreshing table.") + self.t = t self.description = description self.dependencies = to_sequence(dependencies) - if callable(listener): - self.listener_wrapped = _wrap_listener_func(t, listener) - elif isinstance(listener, TableListener): + if isinstance(listener, TableListener): self.listener_wrapped = _wrap_listener_obj(t, listener) + elif callable(listener): + self.listener_wrapped = _wrap_listener_func(t, listener) else: raise DHError(message="listener is neither callable nor TableListener object") try: - self.listener = _JPythonReplayListenerAdapter.create(description, t.j_table, False, self.listener_wrapped, self.dependencies) + self.listener_adapter = _JPythonReplayListenerAdapter.create(description, t.j_table, False, self.listener_wrapped, self.dependencies) except Exception as e: raise DHError(e, "failed to create a table listener.") from e self.started = False @property def j_object(self) -> jpy.JType: - return self.listener + return self.listener_adapter - def start(self, do_replay: bool = False, replay_lock: Literal["shared", "exclusive"] = "shared") -> None: + def start(self, do_replay: bool = False) -> None: """Start the listener by registering it with the table and listening for updates. Args: do_replay (bool): whether to replay the initial snapshot of the table, default is False - replay_lock (str): the lock type used during replay, default is 'shared', can also be 'exclusive'. Raises: DHError @@ -329,18 +306,13 @@ def start(self, do_replay: bool = False, replay_lock: Literal["shared", "exclusi raise RuntimeError("Attempting to start an already started listener..") try: - def _start(): + with update_graph.auto_locking_ctx(self.t): if do_replay: - self.listener.replay() - - self.t.j_table.addUpdateListener(self.listener) + self.listener_adapter.replay() - if do_replay: - _do_locked(self.t, _start, lock_type=replay_lock) - else: - _start() + self.t.j_table.addUpdateListener(self.listener_adapter) except Exception as e: - raise DHError(e, "failed to listen to the table changes.") from e + raise DHError(e, "failed to listen to the table changes.") from e self.started = True @@ -349,13 +321,12 @@ def stop(self) -> None: if not self.started: return - self.t.j_table.removeUpdateListener(self.listener) + self.t.j_table.removeUpdateListener(self.listener_adapter) self.started = False -def listen(t: Table, listener: Union[Callable, TableListener], description: str = None, do_replay: bool = False, - replay_lock: Literal["shared", "exclusive"] = "shared", dependencies: Union[Table, Sequence[Table]] = None)\ - -> TableListenerHandle: +def listen(t: Table, listener: Union[Callable[[TableUpdate, bool], None], TableListener], description: str = None, do_replay: bool = False, + dependencies: Union[Table, Sequence[Table]] = None) -> TableListenerHandle: """This is a convenience function that creates a TableListenerHandle object and immediately starts it to listen for table updates. @@ -366,11 +337,10 @@ def listen(t: Table, listener: Union[Callable, TableListener], description: str Args: t (Table): table to listen to - listener (Union[Callable, TableListener]): listener for table changes + listener (Union[Callable[[TableUpdate, bool], None], TableListener]): listener for table changes description (str, optional): description for the UpdatePerformanceTracker to append to the listener's entry description, default is None do_replay (bool): whether to replay the initial snapshot of the table, default is False - replay_lock (str): the lock type used during replay, default is 'shared', can also be 'exclusive' dependencies (Union[Table, Sequence[Table]]): tables that must be satisfied before the listener's execution. A refreshing table is considered to be satisfied if all possible updates to the table have been processed in the current update graph cycle. A static table is always considered to be satisfied. If a specified @@ -391,5 +361,202 @@ def listen(t: Table, listener: Union[Callable, TableListener], description: str """ table_listener_handle = TableListenerHandle(t=t, dependencies=dependencies, listener=listener, description=description) - table_listener_handle.start(do_replay=do_replay, replay_lock=replay_lock) + table_listener_handle.start(do_replay=do_replay) return table_listener_handle + + +class _ListenerRecorder (JObjectWrapper): + """A ListenerRecorder object records the table updates and notifies the associated MergedListener that a change + has occurred.""" + + j_object_type = _JListenerRecorder + + @property + def j_object(self) -> jpy.JType: + return self.j_listener_recorder + + def __init__(self, table: Table): + if not table.is_refreshing: + raise DHError(message="table must be a refreshing table.") + + self.j_listener_recorder = _JListenerRecorder("Python Wrapped Listener recorder", table.j_table, None) + self.table = table + + def table_update(self) -> Optional[TableUpdate]: + """Gets the table update from the listener recorder. If there is no update in the current update graph cycle, + returns None. + + Returns: + a TableUpdate or None + """ + j_table_update = self.j_listener_recorder.getUpdate() + return TableUpdate(self.table, j_table_update) if j_table_update else None + + +class MergedListener(ABC): + """An abstract multi-table listener class that should be subclassed by any user multi-table listener class.""" + + @abstractmethod + def on_update(self, updates: Dict[Table, TableUpdate], is_replay: bool) -> None: + """The required method on a listener object that receives table updates from the + tables that are listened to. + """ + ... + + +class MergedListenerHandle(JObjectWrapper): + """A handle to manage a merged listener's lifecycle.""" + j_object_type = _JPythonMergedListenerAdapter + + @property + def j_object(self) -> jpy.JType: + return self.merged_listener_adapter + + def __init__(self, tables: Sequence[Table], listener: Union[Callable[[Dict[Table, TableUpdate], bool], None], MergedListener], + description: str = None, dependencies: Union[Table, Sequence[Table]] = None): + """Creates a new MergedListenerHandle with the provided listener recorders and dependencies. + + Table change events are processed by 'listener', which can be either + (1) a callable (e.g. function) or + (2) an instance of MergedListener type which provides an "on_update" method. + The callable or the on_update method must have the following signature. + *(updates: Dict[Table, TableUpdate], is_replay: bool): support replaying the initial table snapshots and normal table updates + The 'updates' parameter is a dictionary of Table to TableUpdate; + The 'is_replay' parameter is used only by replay listeners, it is set to 'True' when replaying the initial + snapshots and 'False' during normal updates. + + + Note: Don't do table operations in the listener. Do them beforehand, and add the results as dependencies. + + Args: + tables (Sequence[Table]): tables to listen to + listener (Union[Callable[[Dict[Table, TableUpdate], bool], None], MergedListener]): listener to process table updates + from the tables. + description (str, optional): description for the UpdatePerformanceTracker to append to the listener's entry + dependencies (Union[Table, Sequence[Table]]): tables that must be satisfied before the listener's execution. + A refreshing table is considered to be satisfied if all possible updates to the table have been processed + in the current update graph cycle. A static table is always considered to be satisfied. If a specified + table is refreshing, it must belong to the same update graph as the table being listened to. Default is + None. + + Dependencies ensure that the listener can safely access the dependent tables during its execution. This + mainly includes reading the data from the tables. While performing operations on the dependent tables in + the listener is safe, it is not recommended because reading or operating on the result tables of those + operations may not be safe. It is best to perform the operations on the dependent tables beforehand, + and then add the result tables as dependencies to the listener so that they can be safely read in it. + + Raises: + DHError + """ + if len(tables) < 2: + raise DHError(message="A merged listener must listen to at least 2 refreshing tables.") + + self.tables = tables + self.listener_recorders = [_ListenerRecorder(t) for t in tables] + + self.dependencies = dependencies + + if isinstance(listener, MergedListener): + self.listener = listener.on_update + else: + self.listener = listener + n_params = len(signature(self.listener).parameters) + if n_params != 2: + raise ValueError("merged listener function must have 2 parameters (updates, is_replay).") + + + try: + self.merged_listener_adapter = _JPythonMergedListenerAdapter.create( + to_sequence(self.listener_recorders), + to_sequence(self.dependencies), + description, + self) + self.started = False + except Exception as e: + raise DHError(e, "failed to create a merged listener adapter.") from e + + def _process(self) -> None: + """Process the table updates from the listener recorders. """ + self.listener({lr.table: lr.table_update() for lr in self.listener_recorders}, False) + + def start(self, do_replay: bool = False) -> None: + """Start the listener by registering it with the tables and listening for updates. + + Args: + do_replay (bool): whether to replay the initial snapshots of the tables, default is False + + Raises: + DHError + """ + if self.started: + raise RuntimeError("Attempting to start an already started merged listener..") + + try: + # self.tables[0] is guaranteed to be a refreshing table, the lock is needed to add all the listener recorders + # on the same update graph cycle and if replay is requested, the initial snapshots of the tables are all + # taken on the same update graph cycle as well. + with update_graph.auto_locking_ctx(self.tables[0]): + if do_replay: + j_replay_updates = self.merged_listener_adapter.currentRowsAsUpdates() + replay_updates = {t: TableUpdate(t, tu) for t, tu in zip(self.tables, j_list_to_list(j_replay_updates))} + try: + self.listener(replay_updates, True) + finally: + for replay_update in replay_updates.values(): + replay_update.j_object.release() + + for lr in self.listener_recorders: + lr.table.j_table.addUpdateListener(lr.j_listener_recorder) + except Exception as e: + raise DHError(e, "failed to listen to the table changes.") from e + + self.started = True + + + def stop(self) -> None: + """Stop the listener.""" + if not self.started: + return + + # self.tables[0] is guaranteed to be a refreshing table, the lock is needed to remove all the listener recorders + # on the same update graph cycle + with update_graph.auto_locking_ctx(self.tables[0]): + for lr in self.listener_recorders: + lr.table.j_table.removeUpdateListener(lr.j_listener_recorder) + self.started = False + + +def merged_listen(tables: Sequence[Table], listener: Union[Callable[[Dict[Table, TableUpdate]], None], MergedListener], + do_replay: bool = False, description: str = None, dependencies: Union[Table, Sequence[Table]] = None)\ + -> MergedListenerHandle: + """This is a convenience function that creates a MergedListenerHandle object and immediately starts it to + listen for table updates. + + The function returns the created MergedListenerHandle object whose 'stop' method can be called to stop + listening. If it goes out of scope and is garbage collected, the listener will stop receiving any table updates. + + Note: Don't do table operations in the listener. Do them beforehand, and add the results as dependencies. + + Args: + tables (Sequence[Table]): tables to listen to. + listener (Union[Callable[[Dict[Table, TableUpdate]], None], MergedListener]): listener to process table updates + from the tables. + description (str, optional): description for the UpdatePerformanceTracker to append to the listener's entry + description, default is None + do_replay (bool): whether to replay the initial snapshots of the tables, default is False + dependencies (Union[Table, Sequence[Table]]): tables that must be satisfied before the listener's execution. + A refreshing table is considered to be satisfied if all possible updates to the table have been processed + in the current update graph cycle. A static table is always considered to be satisfied. If a specified + table is refreshing, it must belong to the same update graph as the table being listened to. Default is + None. + + Dependencies ensure that the listener can safely access the dependent tables during its execution. This + mainly includes reading the data from the tables. While performing operations on the dependent tables in + the listener is safe, it is not recommended because reading or operating on the result tables of those + operations may not be safe. It is best to perform the operations on the dependent tables beforehand, + and then add the result tables as dependencies to the listener so that they can be safely read in it. + """ + merged_listener_handle = MergedListenerHandle(tables=tables, listener=listener, + description=description, dependencies=dependencies) + merged_listener_handle.start(do_replay=do_replay) + return merged_listener_handle diff --git a/py/server/tests/test_table_listener.py b/py/server/tests/test_table_listener.py index bded80c1786..db570b77414 100644 --- a/py/server/tests/test_table_listener.py +++ b/py/server/tests/test_table_listener.py @@ -4,17 +4,18 @@ import time import unittest -from typing import List, Union +from typing import List, Union, Optional, Dict import numpy import jpy -from deephaven import time_table, new_table, input_table, DHError +from deephaven import time_table, new_table, input_table, DHError, empty_table from deephaven.column import bool_col, string_col from deephaven.experimental import time_window from deephaven.jcompat import to_sequence from deephaven.table import Table -from deephaven.table_listener import listen, TableListener, TableListenerHandle +from deephaven.table_listener import listen, TableListener, TableListenerHandle, MergedListener, TableUpdate, \ + MergedListenerHandle, merged_listen from deephaven.execution_context import get_exec_ctx from deephaven.update_graph import exclusive_lock from tests.testbase import BaseTestCase @@ -22,7 +23,7 @@ _JColumnVectors = jpy.get_type("io.deephaven.engine.table.vectors.ColumnVectors") class TableUpdateRecorder: - def __init__(self, table: Table, chunk_size: int = None, cols: Union[str, List[str]] = None): + def __init__(self, table: Optional[Table] = None, chunk_size: int = None, cols: Union[str, List[str]] = None): self.table = table self.chunk_size = chunk_size self.cols = cols @@ -34,7 +35,10 @@ def __init__(self, table: Table, chunk_size: int = None, cols: Union[str, List[s self.replays = [] self.modified_columns_list = [] - def record(self, update, is_replay): + def record(self, update: TableUpdate, is_replay: bool): + if not update: + return + if self.chunk_size is None: self.added.append(update.added()) self.removed.append(update.removed()) @@ -293,14 +297,14 @@ def listener_func(update, is_replay): with self.subTest("do_replay=True, replay_lock='exclusive'"): table_update_recorder = TableUpdateRecorder(self.test_table) - table_listener_handle.start(do_replay=True, replay_lock="exclusive") + table_listener_handle.start(do_replay=True) ensure_ugp_cycles(table_update_recorder, cycles=3) table_listener_handle.stop() self.check_update_recorder(table_update_recorder, has_replay=True, has_added=True, has_removed=True, has_modified=False) with self.subTest("do_replay=True, replay_lock='shared'"): table_update_recorder = TableUpdateRecorder(self.test_table) - table_listener_handle.start(do_replay=True, replay_lock="shared") # noqa + table_listener_handle.start(do_replay=True) # noqa ensure_ugp_cycles(table_update_recorder, cycles=3) table_listener_handle.stop() self.check_update_recorder(table_update_recorder, has_replay=True, has_added=True, has_removed=True, has_modified=False) @@ -327,6 +331,153 @@ def listener_func(update, is_replay): with self.assertRaises(DHError): table_listener_handle = TableListenerHandle(self.test_table, listener_func, dependencies=dep_table) + def test_merged_listener_obj(self): + t1 = time_table("PT1s").update(["X=i % 11"]) + t2 = time_table("PT2s").update(["Y=i % 8"]) + t3 = time_table("PT3s").update(["Z=i % 5"]) + + class TestMergedListener(MergedListener): + def on_update(self, updates: Dict[Table, TableUpdate], is_replay: bool) -> None: + for update in updates.values(): + tur.record(update, is_replay) + + tml = TestMergedListener() + with self.subTest("Direct Handle"): + tur = TableUpdateRecorder() + mlh = MergedListenerHandle([t1, t2, t3], tml) + mlh.start() + ensure_ugp_cycles(tur, cycles=3) + mlh.stop() + mlh.start() + ensure_ugp_cycles(tur, cycles=6) + mlh.stop() + self.assertGreaterEqual(len(tur.replays), 6) + + with self.subTest("Convenience function"): + tur = TableUpdateRecorder() + mlh = merged_listen([t1, t2, t3], tml) + ensure_ugp_cycles(tur, cycles=3) + mlh.stop() + mlh.start() + ensure_ugp_cycles(tur, cycles=6) + mlh.stop() + self.assertGreaterEqual(len(tur.replays), 6) + + def test_merged_listener_func(self): + t1 = time_table("PT1s").update(["X=i % 11"]) + t2 = time_table("PT2s").update(["Y=i % 8"]) + t3 = time_table("PT3s").update(["Z=i % 5"]) + + def test_ml_func(updates: Dict[Table, TableUpdate], is_replay: bool) -> None: + if updates[t1] or updates[t3]: + tur.record(updates[t1], is_replay) + + with self.subTest("Direct Handle"): + tur = TableUpdateRecorder() + mlh = MergedListenerHandle([t1, t2, t3], test_ml_func) + mlh.start() + ensure_ugp_cycles(tur, cycles=3) + mlh.stop() + mlh.start() + ensure_ugp_cycles(tur, cycles=6) + mlh.stop() + self.assertGreaterEqual(len(tur.replays), 6) + + with self.subTest("Convenience function"): + tur = TableUpdateRecorder() + mlh = merged_listen([t1, t2, t3], test_ml_func) + ensure_ugp_cycles(tur, cycles=3) + mlh.stop() + mlh.start() + ensure_ugp_cycles(tur, cycles=6) + mlh.stop() + self.assertGreaterEqual(len(tur.replays), 6) + + def test_merged_listener_with_deps(self): + t1 = time_table("PT1s").update(["X=i % 11"]) + t2 = time_table("PT2s").update(["Y=i % 8"]) + t3 = time_table("PT3s").update(["Z=i % 5"]) + + dep_table = time_table("PT00:00:05").update("X = i % 11") + ec = get_exec_ctx() + + tur = TableUpdateRecorder() + j_arrays = [] + class TestMergedListener(MergedListener): + def on_update(self, updates: Dict[Table, TableUpdate], is_replay: bool) -> None: + if updates[t1] and updates[t2]: + tur.record(updates[t2], is_replay) + + with ec: + t = dep_table.view(["Y = i % 8"]) + j_arrays.append(_JColumnVectors.of(t.j_table, "Y").copyToArray()) + + tml = TestMergedListener() + mlh = MergedListenerHandle(tables=[t1, t2, t3], listener=tml, dependencies=dep_table) + mlh.start() + ensure_ugp_cycles(tur, cycles=3) + mlh.stop() + mlh.start() + ensure_ugp_cycles(tur, cycles=6) + mlh.stop() + self.assertGreaterEqual(len(tur.replays), 6) + self.assertTrue(len(j_arrays) > 0 and all([len(ja) > 0 for ja in j_arrays])) + + def test_merged_listener_error(self): + t1 = time_table("PT1s").update(["X=i % 11"]) + + def test_ml_func(updates: Dict[Table, TableUpdate]) -> None: + pass + + with self.assertRaises(DHError) as cm: + mlh = MergedListenerHandle([t1], test_ml_func) + self.assertIn("at least 2 refreshing tables", str(cm.exception)) + + et = empty_table(1) + with self.assertRaises(DHError) as cm: + mlh = merged_listen([t1, et], test_ml_func) + self.assertIn("must be a refreshing table", str(cm.exception)) + + def test_merged_listener_replay(self): + t1 = time_table("PT1s").update(["X=i % 11"]) + t2 = time_table("PT2s").update(["Y=i % 8"]) + t3 = time_table("PT3s").update(["Z=i % 5"]) + + class TestMergedListener(MergedListener): + def on_update(self, updates: Dict[Table, TableUpdate], is_replay: bool) -> None: + for update in updates.values(): + tur.record(update, is_replay) + + tml = TestMergedListener() + t1.await_update() + t2.await_update() + t3.await_update() + with self.subTest("MergedListener - replay"): + tur = TableUpdateRecorder() + mlh = merged_listen([t1, t2, t3], tml, do_replay=True) + ensure_ugp_cycles(tur, cycles=3) + mlh.stop() + mlh.start(do_replay=True) + ensure_ugp_cycles(tur, cycles=6) + mlh.stop() + self.assertGreaterEqual(len(tur.replays), 6) + self.assertEqual(tur.replays.count(True), 2 * 3) + + def test_ml_func(updates: Dict[Table, TableUpdate], is_replay: bool) -> None: + tur.record(updates[t3], is_replay) + + with self.subTest("Direct Handle - replay"): + tur = TableUpdateRecorder() + mlh = MergedListenerHandle([t1, t2, t3], test_ml_func) + mlh.start(do_replay=True) + ensure_ugp_cycles(tur, cycles=3) + mlh.stop() + mlh.start(do_replay=True) + ensure_ugp_cycles(tur, cycles=6) + mlh.stop() + self.assertGreaterEqual(len(tur.replays), 6) + self.assertEqual(tur.replays.count(True), 2) + if __name__ == "__main__": unittest.main() diff --git a/py/server/tests/test_udf_scalar_args.py b/py/server/tests/test_udf_scalar_args.py index 1eebdd914a2..408b58486aa 100644 --- a/py/server/tests/test_udf_scalar_args.py +++ b/py/server/tests/test_udf_scalar_args.py @@ -609,9 +609,9 @@ def f(p1: float, p2: np.float64) -> bool: dv = 0.05 with warnings.catch_warnings(record=True) as w: t = empty_table(10).update("X = f(dv, dv)") - self.assertEqual(w[-1].category, UserWarning) - self.assertRegex(str(w[-1].message), "numpy scalar type.*is used") - self.assertEqual(10, t.to_string().count("true")) + self.assertEqual(w[-1].category, UserWarning) + self.assertRegex(str(w[-1].message), "numpy scalar type.*is used") + self.assertEqual(10, t.to_string().count("true")) if __name__ == "__main__":