Skip to content

Commit

Permalink
Charles' Feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Nov 2, 2023
1 parent d467857 commit 283b824
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import io.deephaven.base.Pair;
import io.deephaven.base.log.LogOutputAppendable;
import io.deephaven.base.verify.Assert;
import io.deephaven.datastructures.util.CollectionUtil;
import io.deephaven.engine.liveness.LivenessNode;
import io.deephaven.engine.rowset.RowSet;
Expand Down Expand Up @@ -34,9 +33,12 @@

import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.stream.Stream;

public abstract class SelectAndViewAnalyzer implements LogOutputAppendable {
private static final Consumer<ColumnSource<?>> NOOP = ignore -> {};

public enum Mode {
VIEW_LAZY, VIEW_EAGER, SELECT_STATIC, SELECT_REFRESHING, SELECT_REDIRECTED_REFRESHING, SELECT_REDIRECTED_STATIC
}
Expand Down Expand Up @@ -100,8 +102,8 @@ public static SelectAndViewAnalyzerWrapper create(
FormulaColumn shiftColumn = null;
boolean shiftColumnHasPositiveOffset = false;

final HashSet<String> resultColumns = flattenedResult ? new HashSet<>() : null;
final HashMap<String, ColumnSource<?>> resultAlias = flattenedResult ? new HashMap<>() : null;
final HashSet<String> resultColumns = new HashSet<>();
final HashMap<String, ColumnSource<?>> resultAlias = new HashMap<>();
for (final SelectColumn sc : selectColumns) {
if (remainingCols != null) {
remainingCols.add(sc);
Expand All @@ -125,11 +127,9 @@ public static SelectAndViewAnalyzerWrapper create(
sc.initInputs(rowSet, analyzer.getAllColumnSources());
}

if (flattenedResult) {
resultColumns.add(sc.getName());
// this shadows any known alias
resultAlias.remove(sc.getName());
}
resultColumns.add(sc.getName());
// this shadows any known alias
resultAlias.remove(sc.getName());

final Stream<String> allDependencies =
Stream.concat(sc.getColumns().stream(), sc.getColumnArrays().stream());
Expand Down Expand Up @@ -171,11 +171,8 @@ public static SelectAndViewAnalyzerWrapper create(
realColumn = null;
}

if (shouldPreserve(sc)) {
// this must be a source column to be preserved
Assert.neqNull(realColumn, "realColumn");

boolean sourceIsNew = resultColumns != null && resultColumns.contains(realColumn.getSourceName());
if (realColumn != null && shouldPreserve(sc)) {
boolean sourceIsNew = resultColumns.contains(realColumn.getSourceName());
if (!sourceIsNew && numberOfInternallyFlattenedColumns > 0) {
// we must preserve this column, but have already created an analyzer for the internally flattened
// column, therefore must start over without permitting internal flattening
Expand All @@ -193,8 +190,8 @@ public static SelectAndViewAnalyzerWrapper create(
continue;
}

if (flattenedResult && realColumn != null) {
// this could be a duplicate of a previously flattened column
// look for an existing alias that can be preserved instead
if (realColumn != null) {
final ColumnSource<?> alias = resultAlias.get(realColumn.getSourceName());
if (alias != null) {
analyzer = analyzer.createLayerForPreserve(
Expand All @@ -203,16 +200,22 @@ public static SelectAndViewAnalyzerWrapper create(
}
}

// if this is a source column, then results are eligible for aliasing
final Consumer<ColumnSource<?>> maybeCreateAlias = realColumn == null ? NOOP
: cs -> resultAlias.put(realColumn.getSourceName(), cs);

final long targetDestinationCapacity =
rowSet.isEmpty() ? 0 : (flattenedResult ? rowSet.size() : rowSet.lastRowKey() + 1);
switch (mode) {
case VIEW_LAZY: {
final ColumnSource<?> viewCs = sc.getLazyView();
maybeCreateAlias.accept(viewCs);
analyzer = analyzer.createLayerForView(sc.getName(), sc, viewCs, distinctDeps, mcsBuilder);
break;
}
case VIEW_EAGER: {
final ColumnSource<?> viewCs = sc.getDataView();
maybeCreateAlias.accept(viewCs);
analyzer = analyzer.createLayerForView(sc.getName(), sc, viewCs, distinctDeps, mcsBuilder);
break;
}
Expand All @@ -222,12 +225,7 @@ public static SelectAndViewAnalyzerWrapper create(
final WritableColumnSource<?> scs =
flatResult || flattenedResult ? sc.newFlatDestInstance(targetDestinationCapacity)
: sc.newDestInstance(targetDestinationCapacity);

if (flattenedResult && realColumn != null && !resultColumns.contains(realColumn.getSourceName())) {
// this source column a candidate for preservation if referenced again
resultAlias.put(realColumn.getSourceName(), scs);
}

maybeCreateAlias.accept(scs);
analyzer = analyzer.createLayerForSelect(updateGraph, rowSet, sc.getName(), sc, scs, null,
distinctDeps, mcsBuilder, false, flattenedResult, flatResult && flattenedResult);
if (flattenedResult) {
Expand All @@ -239,6 +237,7 @@ public static SelectAndViewAnalyzerWrapper create(
final WritableColumnSource<?> underlyingSource = sc.newDestInstance(rowSet.size());
final WritableColumnSource<?> scs = WritableRedirectedColumnSource.maybeRedirect(
rowRedirection, underlyingSource, rowSet.size());
maybeCreateAlias.accept(scs);
analyzer = analyzer.createLayerForSelect(updateGraph, rowSet, sc.getName(), sc, scs,
underlyingSource, distinctDeps, mcsBuilder, true, false, false);
break;
Expand All @@ -255,6 +254,7 @@ public static SelectAndViewAnalyzerWrapper create(
scs = WritableRedirectedColumnSource.maybeRedirect(
rowRedirection, underlyingSource, rowSet.intSize());
}
maybeCreateAlias.accept(scs);
analyzer = analyzer.createLayerForSelect(updateGraph, rowSet, sc.getName(), sc, scs,
underlyingSource, distinctDeps, mcsBuilder, rowRedirection != null, false, false);
break;
Expand Down Expand Up @@ -309,10 +309,7 @@ private static boolean hasConstantValue(final SelectColumn sc) {
}

private static boolean shouldPreserve(final SelectColumn sc) {
if (!(sc instanceof SourceColumn)
&& (!(sc instanceof SwitchColumn) || !(((SwitchColumn) sc).getRealColumn() instanceof SourceColumn))) {
return false;
}
// we already know sc is a SourceColumn or switches to a SourceColumn
final ColumnSource<?> sccs = sc.getDataView();
return sccs instanceof InMemoryColumnSource && ((InMemoryColumnSource) sccs).isInMemory()
&& !Vector.class.isAssignableFrom(sc.getReturnedType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1133,8 +1133,8 @@ public void testStaticSelectPreserveAlreadyFlattenedColumns() {
Assert.assertEquals(rowKey * 2, baz.getLong(rowKey));
});

Assert.assertEquals(foo, bar);
Assert.assertEquals(foo, baz);
Assert.assertSame(foo, bar);
Assert.assertSame(foo, baz);
}

@Test
Expand All @@ -1155,9 +1155,9 @@ public void testStaticSelectPreserveColumn() {
});

// These columns were preserved and no flattening occurred.
Assert.assertEquals(orig, foo);
Assert.assertEquals(orig, bar);
Assert.assertEquals(orig, baz);
Assert.assertSame(orig, foo);
Assert.assertSame(orig, bar);
Assert.assertSame(orig, baz);
}

@Test
Expand All @@ -1181,8 +1181,8 @@ public void testStaticSelectFlattenNotReusedWithRename() {
}
});

Assert.assertNotEquals(orig, foo); // this column was flattened
Assert.assertNotEquals(newI, baz); // vector columns cannot be preserved; so this should be a copy
Assert.assertNotSame(orig, foo); // this column was flattened
Assert.assertNotSame(newI, baz); // vector columns cannot be preserved; so this should be a copy
}

@Test
Expand All @@ -1208,8 +1208,32 @@ public void testStaticSelectRevertInternalFlatten() {
});

// Note that Foo is still being "selected" and therefore "brought into memory"
Assert.assertNotEquals(foo, source.getColumnSource("J"));
Assert.assertEquals(bar, source.getColumnSource("I"));
Assert.assertEquals(baz, foo);
Assert.assertNotSame(foo, source.getColumnSource("J"));
Assert.assertSame(bar, source.getColumnSource("I"));
Assert.assertSame(baz, foo);
}

@Test
public void testAliasColumnSelectRefreshing() {
final long size = 100;
final MutableInt numCalls = new MutableInt();
QueryScope.addParam("counter", numCalls);
final QueryTable source = testRefreshingTable(RowSetFactory.flat(size).toTracking());
final Table result = source.update("id = counter.getAndIncrement()")
.select("id_a = id", "id_b = id");

final ColumnSource<?> id_a = result.getColumnSource("id_a");
final ColumnSource<?> id_b = result.getColumnSource("id_b");
Assert.assertSame(id_a, id_b);
Assert.assertEquals(numCalls.intValue(), size);

final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast();
updateGraph.runWithinUnitTestCycle(() -> {
final WritableRowSet added = RowSetFactory.fromRange(size, size * 2 - 1);
addToTable(source, added);
source.notifyListeners(added, i(), i());
});

Assert.assertEquals(numCalls.intValue(), 2 * size);
}
}

0 comments on commit 283b824

Please sign in to comment.