Skip to content

Commit

Permalink
Don't keep hard references to Table (#2158)
Browse files Browse the repository at this point in the history
* Fix StreamToTableAdapter, don't keep hard reference to Table

* Refactor ApplicationServiceGrpcImpl, don't keep hard ref to table

Fixes #2130

* Undo docker-compose changes

* Execute multiple console scripts back-to-back

* review changes

* close() suggestion

Co-authored-by: Ryan Caudy <ryan@deephaven.io>
  • Loading branch information
devinrsmith and rcaudy authored Apr 3, 2022
1 parent 450784b commit 1e56e0d
Show file tree
Hide file tree
Showing 3 changed files with 238 additions and 197 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.attributes.Any;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.liveness.ReferenceCountedLivenessNode;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.Table;
Expand All @@ -27,6 +28,7 @@
import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.NotNull;

import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
Expand All @@ -35,7 +37,8 @@
/**
* Adapter for converting streams of data into columnar Deephaven {@link Table tables}.
*/
public class StreamToTableAdapter implements SafeCloseable, StreamConsumer, Runnable {
public class StreamToTableAdapter extends ReferenceCountedLivenessNode
implements SafeCloseable, StreamConsumer, Runnable {

private static final Logger log = LoggerFactory.getLogger(StreamToTableAdapter.class);

Expand All @@ -44,7 +47,7 @@ public class StreamToTableAdapter implements SafeCloseable, StreamConsumer, Runn
private final UpdateSourceRegistrar updateSourceRegistrar;
private final String name;

private final QueryTable table;
private final WeakReference<QueryTable> tableRef;
private final TrackingWritableRowSet rowSet;
private final SwitchColumnSource<?>[] switchSources;

Expand All @@ -61,13 +64,15 @@ public class StreamToTableAdapter implements SafeCloseable, StreamConsumer, Runn
/** A list of failures that have occurred. */
private List<Exception> enqueuedFailure;

private volatile QueryTable table;
private volatile Runnable shutdownCallback;
private volatile boolean alive = true;

public StreamToTableAdapter(@NotNull final TableDefinition tableDefinition,
@NotNull final StreamPublisher streamPublisher,
@NotNull final UpdateSourceRegistrar updateSourceRegistrar,
@NotNull final String name) {
super(false);
this.tableDefinition = tableDefinition;
this.streamPublisher = streamPublisher;
this.updateSourceRegistrar = updateSourceRegistrar;
Expand All @@ -91,12 +96,8 @@ public StreamToTableAdapter(@NotNull final TableDefinition tableDefinition,
setAttribute(Table.STREAM_TABLE_ATTRIBUTE, Boolean.TRUE);
addParentReference(StreamToTableAdapter.this);
}

@Override
public void destroy() {
StreamToTableAdapter.this.close();
}
};
tableRef = new WeakReference<>(table);
}

/**
Expand Down Expand Up @@ -238,12 +239,16 @@ private static Class<?> replacementType(Class<?> columnType) {
}

/**
* Return the stream table that this adapter is producing.
* Return the {@link Table#STREAM_TABLE_ATTRIBUTE stream} {@link Table table} that this adapter is producing, and
* ensure that this StreamToTableAdapter no longer enforces strong reachability of the result. May return
* {@code null} if invoked more than once and the initial caller does not enforce strong reachability of the result.
*
* @return the resultant stream table
* @return The resulting stream table
*/
public Table table() {
return table;
final QueryTable localTable = tableRef.get();
table = null;
return localTable;
}

@Override
Expand All @@ -267,15 +272,25 @@ public void close() {
}
}

@Override
public void destroy() {
close();
}

@Override
public void run() {
try {
doRefresh();
} catch (Exception e) {
log.error().append("Error refreshing ").append(StreamToTableAdapter.class.getSimpleName()).append('-')
.append(name).append(": ").append(e).endl();
table.notifyListenersOnError(e, null);
updateSourceRegistrar.removeSource(this);
final QueryTable localTable = tableRef.get();
if (localTable != null) {
localTable.notifyListenersOnError(e, null);
} else {
close();
}
}
}

Expand Down Expand Up @@ -312,10 +327,12 @@ private void doRefresh() {
if (capturedBufferSources == null) {
// null out our current values
for (int ii = 0; ii < switchSources.length; ++ii) {
// noinspection unchecked
switchSources[ii].setNewCurrent((ColumnSource) nullColumnSources[ii]);
}
} else {
for (int ii = 0; ii < switchSources.length; ++ii) {
// noinspection unchecked
switchSources[ii].setNewCurrent((ColumnSource) capturedBufferSources[ii]);
}
}
Expand All @@ -329,9 +346,14 @@ private void doRefresh() {
rowSet.removeRange(newSize, oldSize - 1);
}

table.notifyListeners(new TableUpdateImpl(RowSetFactory.flat(newSize),
RowSetFactory.flat(oldSize), RowSetFactory.empty(),
RowSetShiftData.EMPTY, ModifiedColumnSet.EMPTY));
final QueryTable localTable = tableRef.get();
if (localTable != null) {
localTable.notifyListeners(new TableUpdateImpl(RowSetFactory.flat(newSize),
RowSetFactory.flat(oldSize), RowSetFactory.empty(),
RowSetShiftData.EMPTY, ModifiedColumnSet.EMPTY));
} else {
close();
}
}

@SafeVarargs
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
package io.deephaven.client.examples;

import io.deephaven.client.impl.ConsoleSession;
import io.deephaven.client.impl.script.Changes;
import picocli.CommandLine;
import picocli.CommandLine.Command;
import picocli.CommandLine.Parameters;

import java.nio.file.Path;
import java.util.List;

@Command(name = "execute-script", mixinStandardHelpOptions = true,
description = "Execute a script", version = "0.1.0")
class ExecuteScript extends ConsoleExampleBase {

@Parameters(arity = "1", paramLabel = "SCRIPT", description = "The script to send.")
Path script;
@Parameters(arity = "1+", paramLabel = "SCRIPT", description = "The script to send.")
List<Path> scripts;

@Override
protected void execute(ConsoleSession consoleSession) throws Exception {
System.out.println(toPrettyString(consoleSession.executeScript(script)));
for (Path path : scripts) {
final Changes changes = consoleSession.executeScript(path);
System.out.println(path);
System.out.println(toPrettyString(changes));
}
}

public static void main(String[] args) {
Expand Down
Loading

0 comments on commit 1e56e0d

Please sign in to comment.