Skip to content

Commit

Permalink
feat(api)!: Support merged listening on multiple tables (deephaven#5672)
Browse files Browse the repository at this point in the history
Fixes deephaven#5647

---------

Co-authored-by: Chip Kent <5250374+chipkent@users.noreply.github.com>
Co-authored-by: Ryan Caudy <rcaudy@gmail.com>
  • Loading branch information
3 people authored Jul 15, 2024
1 parent e1c4d9b commit 9776e1d
Show file tree
Hide file tree
Showing 6 changed files with 519 additions and 88 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.integrations.python;

import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.table.ModifiedColumnSet;
import io.deephaven.engine.table.TableUpdate;
import io.deephaven.engine.table.impl.ListenerRecorder;
import io.deephaven.engine.table.impl.MergedListener;
import io.deephaven.engine.table.impl.TableUpdateImpl;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.annotations.ScriptApi;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jpy.PyObject;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Objects;
import java.util.stream.Stream;

/**
* A Deephaven merged listener which fires when any of its bound listener recorders has updates and all of its
* dependencies have been satisfied. The listener then invokes the Python listener object.
*
* The Python listener object must be a Python MergedListener instance that provides a "_process" method implementation
* with no argument.
*/
@ScriptApi
public class PythonMergedListenerAdapter extends MergedListener {
private final PyObject pyCallable;

/**
* Create a Python merged listener.
*
* @param recorders The listener recorders to which this listener will subscribe.
* @param dependencies The tables that must be satisfied before this listener is executed.
* @param listenerDescription A description for the UpdatePerformanceTracker to append to its entry description, may
* be null.
* @param pyObjectIn Python listener object.
*/
private PythonMergedListenerAdapter(
@NotNull ListenerRecorder[] recorders,
@Nullable NotificationQueue.Dependency[] dependencies,
@Nullable String listenerDescription,
@NotNull PyObject pyObjectIn) {
super(Arrays.asList(recorders), Arrays.asList(dependencies), listenerDescription, null);
Arrays.stream(recorders).forEach(rec -> rec.setMergedListener(this));
this.pyCallable = PythonUtils.pyMergeListenerFunc(pyObjectIn);
}

public static PythonMergedListenerAdapter create(
@NotNull ListenerRecorder[] recorders,
@Nullable NotificationQueue.Dependency[] dependencies,
@Nullable String listenerDescription,
@NotNull PyObject pyObjectIn) {
if (recorders.length < 2) {
throw new IllegalArgumentException("At least two listener recorders must be provided");
}

final NotificationQueue.Dependency[] allItems =
Stream.concat(Arrays.stream(recorders), Arrays.stream(dependencies))
.filter(Objects::nonNull)
.toArray(NotificationQueue.Dependency[]::new);

final UpdateGraph updateGraph = allItems[0].getUpdateGraph(allItems);

try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(updateGraph).open()) {
return new PythonMergedListenerAdapter(recorders, dependencies, listenerDescription, pyObjectIn);
}
}

public ArrayList<TableUpdate> currentRowsAsUpdates() {
final ArrayList<TableUpdate> updates = new ArrayList<>();
for (ListenerRecorder recorder : getRecorders()) {
final TableUpdate update = new TableUpdateImpl(
recorder.getParent().getRowSet().copy(),
RowSetFactory.empty(),
RowSetFactory.empty(),
RowSetShiftData.EMPTY,
ModifiedColumnSet.EMPTY);
updates.add(update);
}
return updates;
}

@Override
protected void process() {
pyCallable.call("__call__");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@ static PyObject pyListenerFunc(final PyObject pyObject) {
return pyCallable(pyObject, "on_update");
}

/**
* Gets the python function that should be called by a merged listener. The input can be either (1) a callable or
* (2) an object which provides an "_process" method.
*
* @param pyObject python listener object. This should either be a callable or an object which provides an
* "_process" method.
* @return python function that should be called by a merged listener.
* @throws IllegalArgumentException python listener object is not a valid listener.
*/
static PyObject pyMergeListenerFunc(final PyObject pyObject) {
return pyCallable(pyObject, "_process");
}

/**
* Creates a callable PyObject, either using method.apply() or __call__(), if the pyObjectIn has such methods
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public abstract class MergedListener extends LivenessArtifact implements Notific
AtomicLongFieldUpdater.newUpdater(MergedListener.class, "lastCompletedStep");

private final UpdateGraph updateGraph;

private final Iterable<? extends ListenerRecorder> recorders;
private final Iterable<NotificationQueue.Dependency> dependencies;
private final String listenerDescription;
Expand Down Expand Up @@ -91,6 +92,10 @@ public UpdateGraph getUpdateGraph() {
return updateGraph;
}

protected Iterable<? extends ListenerRecorder> getRecorders() {
return recorders;
}

public final void notifyOnUpstreamError(
@NotNull final Throwable upstreamError, @Nullable final TableListener.Entry errorSourceEntry) {
notifyInternal(upstreamError, errorSourceEntry);
Expand Down
Loading

0 comments on commit 9776e1d

Please sign in to comment.