diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationContext.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationContext.java index 711b2aae192..bde0c7694b6 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationContext.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationContext.java @@ -234,10 +234,11 @@ ModifiedColumnSet[] getInputModifiedColumnSets(QueryTable input) { * initialization. * * @param resultTable The result {@link QueryTable} after initialization + * @param startingDestinationsCount The number of used destinations at the beginning of this step */ - void propagateInitialStateToOperators(@NotNull final QueryTable resultTable) { + void propagateInitialStateToOperators(@NotNull final QueryTable resultTable, final int startingDestinationsCount) { for (final IterativeChunkedAggregationOperator operator : operators) { - operator.propagateInitialState(resultTable); + operator.propagateInitialState(resultTable, startingDestinationsCount); } } @@ -266,7 +267,7 @@ UnaryOperator[] initializeRefreshing(@NotNull final QueryTabl /** * Allow all operators to reset any per-step internal state. Note that the arguments to this method should not be * mutated in any way. - * + * * @param upstream The upstream {@link TableUpdateImpl} * @param startingDestinationsCount The number of used destinations at the beginning of this step */ @@ -280,7 +281,7 @@ void resetOperatorsForStep(@NotNull final TableUpdate upstream, final int starti * Allow all operators to perform any internal state keeping needed for destinations that were added (went from 0 * keys to > 0), removed (went from > 0 keys to 0), or modified (keys added or removed, or keys modified) by * this iteration. Note that the arguments to this method should not be mutated in any way. - * + * * @param downstream The downstream {@link TableUpdate} (which does not have its {@link ModifiedColumnSet} * finalized yet) * @param newDestinations New destinations added on this update diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/BitmapRandomBuilder.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/BitmapRandomBuilder.java new file mode 100644 index 00000000000..2ded5be0150 --- /dev/null +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/BitmapRandomBuilder.java @@ -0,0 +1,96 @@ +package io.deephaven.engine.table.impl.by; + +import io.deephaven.engine.rowset.RowSetBuilderRandom; +import io.deephaven.engine.rowset.RowSetBuilderSequential; +import io.deephaven.engine.rowset.RowSetFactory; +import io.deephaven.engine.rowset.WritableRowSet; + +import java.util.Arrays; + +/** + * The output RowSet of an aggregation is fairly special. It is always from zero to the number of output rows, and while + * modifying states we randomly add rows to it, potentially touching the same state many times. The normal index random + * builder does not guarantee those values are de-duplicated and requires O(lg n) operations for each insertion and + * building the RowSet. + *

+ * This version is O(1) for updating a modified slot, then linear in the number of output positions (not the number of + * result values) to build the RowSet. The memory usage is 1 bit per output position, vs. the standard builder is 128 + * bits per used value (though with the possibility of collapsing adjacent ranges when they are modified back-to-back). + * For random access patterns, this version will be more efficient; for friendly patterns the default random builder is + * likely more efficient. + *

+ * We also know that we will only modify the rows that existed when we start, so that we can clamp the maximum key for + * the builder to the maximum output position without loss of fidelity. + */ +public class BitmapRandomBuilder implements RowSetBuilderRandom { + + /** + * An upper bound on {@code lastUsed}. That is, the highest bit index that may be used in {@code bitset}. + */ + final int maxKey; + + /** + * The lowest set bit index in {@code bitset}. + */ + int firstUsed = Integer.MAX_VALUE; + + /** + * The highest set bit index in {@code bitset}. + */ + int lastUsed = -1; + + /** + * The bitset itself. + */ + long[] bitset; + + public BitmapRandomBuilder(int maxKey) { + this.maxKey = maxKey; + } + + private static int rowKeyToArrayIndex(long rowKey) { + return (int) (rowKey / 64); + } + + @Override + public WritableRowSet build() { + final RowSetBuilderSequential seqBuilder = RowSetFactory.builderSequential(); + for (int ii = firstUsed; ii <= lastUsed; ++ii) { + long word = bitset[ii]; + long rowKey = ii * 64L; + + while (word != 0) { + if ((word & 1) != 0) { + seqBuilder.appendKey(rowKey); + } + rowKey++; + word >>>= 1; + } + } + return seqBuilder.build(); + } + + @Override + public void addKey(final long rowKey) { + if (rowKey >= maxKey) { + return; + } + int index = rowKeyToArrayIndex(rowKey); + if (bitset == null) { + final int maxSize = (maxKey + 63) / 64; + bitset = new long[Math.min(maxSize, (index + 1) * 2)]; + } else if (index >= bitset.length) { + final int maxSize = (maxKey + 63) / 64; + bitset = Arrays.copyOf(bitset, Math.min(maxSize, Math.max(bitset.length * 2, index + 1))); + } + bitset[index] |= 1L << rowKey; + firstUsed = Math.min(index, firstUsed); + lastUsed = Math.max(index, lastUsed); + } + + @Override + public void addRange(final long firstRowKey, final long lastRowKey) { + // This class is used only with aggregation state managers, which never call addRange. + throw new UnsupportedOperationException(); + } +} diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ByteStreamSortedFirstOrLastChunkedOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ByteStreamSortedFirstOrLastChunkedOperator.java index 2b67c7d4e17..923dfded2a6 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ByteStreamSortedFirstOrLastChunkedOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ByteStreamSortedFirstOrLastChunkedOperator.java @@ -138,7 +138,7 @@ private boolean addChunk(@NotNull final ByteChunk values, } @Override - public void propagateInitialState(@NotNull final QueryTable resultTable) { + public void propagateInitialState(@NotNull final QueryTable resultTable, int startingDestinationsCount) { copyStreamToResult(resultTable.getRowSet()); redirections = null; } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/CharStreamSortedFirstOrLastChunkedOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/CharStreamSortedFirstOrLastChunkedOperator.java index c993b44ee86..e8fa4672a25 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/CharStreamSortedFirstOrLastChunkedOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/CharStreamSortedFirstOrLastChunkedOperator.java @@ -133,7 +133,7 @@ private boolean addChunk(@NotNull final CharChunk values, } @Override - public void propagateInitialState(@NotNull final QueryTable resultTable) { + public void propagateInitialState(@NotNull final QueryTable resultTable, int startingDestinationsCount) { copyStreamToResult(resultTable.getRowSet()); redirections = null; } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ChunkedOperatorAggregationHelper.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ChunkedOperatorAggregationHelper.java index 280df34f166..2bca15a0c77 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ChunkedOperatorAggregationHelper.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ChunkedOperatorAggregationHelper.java @@ -219,7 +219,7 @@ private static QueryTable aggregation( // Construct the result table final QueryTable result = new QueryTable(resultRowSet, resultColumnSourceMap); - ac.propagateInitialStateToOperators(result); + ac.propagateInitialStateToOperators(result, outputPosition.intValue()); if (input.isRefreshing()) { assert keyColumnsCopied != null; @@ -1593,7 +1593,7 @@ private static QueryTable staticGroupedAggregation(QueryTable withView, String k final QueryTable result = new QueryTable(RowSetFactory.flat(responsiveGroups).toTracking(), resultColumnSourceMap); - ac.propagateInitialStateToOperators(result); + ac.propagateInitialStateToOperators(result, responsiveGroups); final ReverseLookupListener rll = ReverseLookupListener.makeReverseLookupListenerWithSnapshot(result, keyName); ac.setReverseLookupFunction(k -> (int) rll.get(k)); @@ -1957,7 +1957,8 @@ private static QueryTable noKeyAggregation( final QueryTable result = new QueryTable(RowSetFactory.flat(initialResultSize).toTracking(), resultColumnSourceMap); - ac.propagateInitialStateToOperators(result); + // always will create one result for zerokey + ac.propagateInitialStateToOperators(result, 1); if (table.isRefreshing()) { ac.startTrackingPrevValues(); @@ -2397,92 +2398,5 @@ public void addRange(final long firstRowKey, final long lastRowKey) { } } - /** - * The output RowSet of an aggregation is fairly special. It is always from zero to the number of output rows, and - * while modifying states we randomly add rows to it, potentially touching the same state many times. The normal - * index random builder does not guarantee those values are de-duplicated and requires O(lg n) operations for each - * insertion and building the RowSet. - * - * This version is O(1) for updating a modified slot, then linear in the number of output positions (not the number - * of result values) to build the RowSet. The memory usage is 1 bit per output position, vs. the standard builder is - * 128 bits per used value (though with the possibility of collapsing adjacent ranges when they are modified - * back-to-back). For random access patterns, this version will be more efficient; for friendly patterns the default - * random builder is likely more efficient. - * - * We also know that we will only modify the rows that existed when we start, so that we can clamp the maximum key - * for the builder to the maximum output position without loss of fidelity. - */ - private static class BitmapRandomBuilder implements RowSetBuilderRandom { - - /** - * An upper bound on {@code lastUsed}. That is, the highest bit index that may be used in {@code bitset}. - */ - final int maxKey; - - /** - * The lowest set bit index in {@code bitset}. - */ - int firstUsed = Integer.MAX_VALUE; - - /** - * The highest set bit index in {@code bitset}. - */ - int lastUsed = -1; - - /** - * The bitset itself. - */ - long[] bitset; - - private BitmapRandomBuilder(int maxKey) { - this.maxKey = maxKey; - } - - private static int rowKeyToArrayIndex(long rowKey) { - return (int) (rowKey / 64); - } - - @Override - public WritableRowSet build() { - final RowSetBuilderSequential seqBuilder = RowSetFactory.builderSequential(); - for (int ii = firstUsed; ii <= lastUsed; ++ii) { - long word = bitset[ii]; - long rowKey = ii * 64L; - - while (word != 0) { - if ((word & 1) != 0) { - seqBuilder.appendKey(rowKey); - } - rowKey++; - word >>>= 1; - } - } - return seqBuilder.build(); - } - - @Override - public void addKey(final long rowKey) { - if (rowKey >= maxKey) { - return; - } - int index = rowKeyToArrayIndex(rowKey); - if (bitset == null) { - final int maxSize = (maxKey + 63) / 64; - bitset = new long[Math.min(maxSize, (index + 1) * 2)]; - } else if (index >= bitset.length) { - final int maxSize = (maxKey + 63) / 64; - bitset = Arrays.copyOf(bitset, Math.min(maxSize, Math.max(bitset.length * 2, index + 1))); - } - bitset[index] |= 1L << rowKey; - firstUsed = Math.min(index, firstUsed); - lastUsed = Math.max(index, lastUsed); - } - - @Override - public void addRange(final long firstRowKey, final long lastRowKey) { - // This class is used only with aggregation state managers, which never call addRange. - throw new UnsupportedOperationException(); - } - } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/DoubleStreamSortedFirstOrLastChunkedOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/DoubleStreamSortedFirstOrLastChunkedOperator.java index 5598e6b07e7..4410267e9a9 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/DoubleStreamSortedFirstOrLastChunkedOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/DoubleStreamSortedFirstOrLastChunkedOperator.java @@ -138,7 +138,7 @@ private boolean addChunk(@NotNull final DoubleChunk values, } @Override - public void propagateInitialState(@NotNull final QueryTable resultTable) { + public void propagateInitialState(@NotNull final QueryTable resultTable, int startingDestinationsCount) { copyStreamToResult(resultTable.getRowSet()); redirections = null; } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/FloatStreamSortedFirstOrLastChunkedOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/FloatStreamSortedFirstOrLastChunkedOperator.java index bd9171b5954..a6834cbc9ea 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/FloatStreamSortedFirstOrLastChunkedOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/FloatStreamSortedFirstOrLastChunkedOperator.java @@ -138,7 +138,7 @@ private boolean addChunk(@NotNull final FloatChunk values, } @Override - public void propagateInitialState(@NotNull final QueryTable resultTable) { + public void propagateInitialState(@NotNull final QueryTable resultTable, int startingDestinationsCount) { copyStreamToResult(resultTable.getRowSet()); redirections = null; } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/FormulaChunkedOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/FormulaChunkedOperator.java index 0b5b64950ed..203d8eb6451 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/FormulaChunkedOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/FormulaChunkedOperator.java @@ -28,7 +28,6 @@ import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; -import java.util.function.LongConsumer; import java.util.function.UnaryOperator; import static io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource.BLOCK_SIZE; @@ -36,7 +35,7 @@ /** * An {@link IterativeChunkedAggregationOperator} used in the implementation of {@link Table#applyToAllBy}. */ -class FormulaChunkedOperator implements StateChangeRecorder, IterativeChunkedAggregationOperator { +class FormulaChunkedOperator implements IterativeChunkedAggregationOperator { private final GroupByChunkedOperator groupBy; private final boolean delegateToBy; @@ -101,16 +100,6 @@ class FormulaChunkedOperator implements StateChangeRecorder, IterativeChunkedAgg } } - @Override - public void startRecording(LongConsumer reincarnatedDestinationCallback, LongConsumer emptiedDestinationCallback) { - groupBy.startRecording(reincarnatedDestinationCallback, emptiedDestinationCallback); - } - - @Override - public void finishRecording() { - groupBy.finishRecording(); - } - @Override public void addChunk(final BucketedContext bucketedContext, final Chunk values, @NotNull final LongChunk inputRowKeys, @@ -257,9 +246,9 @@ public void ensureCapacity(final long tableSize) { } @Override - public void propagateInitialState(@NotNull final QueryTable resultTable) { + public void propagateInitialState(@NotNull final QueryTable resultTable, int startingDestinationsCount) { if (delegateToBy) { - groupBy.propagateInitialState(resultTable); + groupBy.propagateInitialState(resultTable, startingDestinationsCount); } final Map> byResultColumns = groupBy.getResultColumns(); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByChunkedOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByChunkedOperator.java index fb8018e0ddf..1a860953f3a 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByChunkedOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByChunkedOperator.java @@ -20,6 +20,7 @@ import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeys; import io.deephaven.engine.rowset.chunkattributes.RowKeys; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import java.util.Arrays; import java.util.LinkedHashMap; @@ -34,20 +35,25 @@ * {@link io.deephaven.api.agg.spec.AggSpecGroup}, and {@link io.deephaven.api.agg.Aggregation#AggGroup(String...)}. */ public final class GroupByChunkedOperator - extends BasicStateChangeRecorder implements IterativeChunkedAggregationOperator { private final QueryTable inputTable; private final boolean registeredWithHelper; private final boolean live; private final ObjectArraySource rowSets; + private final ObjectArraySource addedBuilders; + private final ObjectArraySource removedBuilders; + private final String[] inputColumnNames; private final Map> resultColumns; private final ModifiedColumnSet resultInputsModifiedColumnSet; + private RowSetBuilderRandom stepDestinationsModified; + private boolean stepValuesModified; private boolean someKeyHasAddsOrRemoves; private boolean someKeyHasModifies; + private boolean initialized; GroupByChunkedOperator(@NotNull final QueryTable inputTable, final boolean registeredWithHelper, @NotNull final MatchPair... resultColumnPairs) { @@ -55,6 +61,7 @@ public final class GroupByChunkedOperator this.registeredWithHelper = registeredWithHelper; live = inputTable.isRefreshing(); rowSets = new ObjectArraySource<>(WritableRowSet.class); + addedBuilders = new ObjectArraySource<>(Object.class); resultColumns = Arrays.stream(resultColumnPairs).collect(Collectors.toMap(MatchPair::leftColumn, matchPair -> AggregateColumnSource .make(inputTable.getColumnSource(matchPair.rightColumn()), rowSets), @@ -62,9 +69,12 @@ public final class GroupByChunkedOperator inputColumnNames = MatchPair.getRightColumns(resultColumnPairs); if (live) { resultInputsModifiedColumnSet = inputTable.newModifiedColumnSet(inputColumnNames); + removedBuilders = new ObjectArraySource<>(Object.class); } else { resultInputsModifiedColumnSet = null; + removedBuilders = null; } + initialized = false; } @Override @@ -75,13 +85,12 @@ public void addChunk(final BucketedContext bucketedContext, final Chunk 0; // noinspection unchecked - final LongChunk inputIndicesAsOrdered = (LongChunk) inputRowKeys; + final LongChunk inputRowKeysAsOrdered = (LongChunk) inputRowKeys; for (int ii = 0; ii < startPositions.size(); ++ii) { final int startPosition = startPositions.get(ii); final int runLength = length.get(ii); final long destination = destinations.get(startPosition); - - addChunk(inputIndicesAsOrdered, startPosition, runLength, destination); + addChunk(inputRowKeysAsOrdered, startPosition, runLength, destination); } stateModified.fillWithValue(0, startPositions.size(), true); } @@ -94,13 +103,12 @@ public void removeChunk(final BucketedContext bucketedContext, final Chunk 0; // noinspection unchecked - final LongChunk inputIndicesAsOrdered = (LongChunk) inputRowKeys; + final LongChunk inputRowKeysAsOrdered = (LongChunk) inputRowKeys; for (int ii = 0; ii < startPositions.size(); ++ii) { final int startPosition = startPositions.get(ii); final int runLength = length.get(ii); final long destination = destinations.get(startPosition); - - removeChunk(inputIndicesAsOrdered, startPosition, runLength, destination); + removeChunk(inputRowKeysAsOrdered, startPosition, runLength, destination); } stateModified.fillWithValue(0, startPositions.size(), true); } @@ -214,53 +222,145 @@ public boolean modifyRowKeys(final SingletonContext context, @NotNull final Long private void addChunk(@NotNull final LongChunk indices, final int start, final int length, final long destination) { - final WritableRowSet rowSet = rowSetForSlot(destination); - final boolean wasEmpty = rowSet.isEmpty(); - rowSet.insert(indices, start, length); - if (wasEmpty && rowSet.isNonempty()) { - onReincarnated(destination); + if (length == 0) { + return; + } + if (!initialized) { + // during initialization, all rows are guaranteed to be in-order + accumulateToBuilderSequential(addedBuilders, indices, start, length, destination); + } else { + accumulateToBuilderRandom(addedBuilders, indices, start, length, destination); + } + if (stepDestinationsModified != null) { + stepDestinationsModified.addKey(destination); } } private void addRowsToSlot(@NotNull final RowSet addRowSet, final long destination) { - final WritableRowSet rowSet = rowSetForSlot(destination); - final boolean wasEmpty = rowSet.isEmpty(); - rowSet.insert(addRowSet); - if (wasEmpty && rowSet.isNonempty()) { - onReincarnated(destination); + if (addRowSet.isEmpty()) { + return; + } + if (!initialized) { + // during initialization, all rows are guaranteed to be in-order + accumulateToBuilderSequential(addedBuilders, addRowSet, destination); + } else { + accumulateToBuilderRandom(addedBuilders, addRowSet, destination); } } private void removeChunk(@NotNull final LongChunk indices, final int start, final int length, final long destination) { - final WritableRowSet rowSet = rowSetForSlot(destination); - final boolean wasNonEmpty = rowSet.isNonempty(); - rowSet.remove(indices, start, length); - if (wasNonEmpty && rowSet.isEmpty()) { - onEmptied(destination); + if (length == 0) { + return; } + accumulateToBuilderRandom(removedBuilders, indices, start, length, destination); + stepDestinationsModified.addKey(destination); } - private void doShift(@NotNull final LongChunk preShiftIndices, - @NotNull final LongChunk postShiftIndices, + private void doShift(@NotNull final LongChunk preShiftRowKeys, + @NotNull final LongChunk postShiftRowKeys, final int startPosition, final int runLength, final long destination) { - final WritableRowSet rowSet = rowSetForSlot(destination); - rowSet.remove(preShiftIndices, startPosition, runLength); - rowSet.insert(postShiftIndices, startPosition, runLength); + // treat shift as remove + add + removeChunk(preShiftRowKeys, startPosition, runLength, destination); + addChunk(postShiftRowKeys, startPosition, runLength, destination); + } + + private static void accumulateToBuilderSequential( + @NotNull final ObjectArraySource builderColumn, + @NotNull final LongChunk rowKeysToAdd, + final int start, final int length, final long destination) { + final RowSetBuilderSequential builder = (RowSetBuilderSequential) builderColumn.getUnsafe(destination); + if (builder == null) { + // create (and store) a new builder, fill with these keys + final RowSetBuilderSequential newBuilder = RowSetFactory.builderSequential(); + newBuilder.appendOrderedRowKeysChunk(rowKeysToAdd, start, length); + builderColumn.set(destination, newBuilder); + return; + } + // add the keys to the stored builder + builder.appendOrderedRowKeysChunk(rowKeysToAdd, start, length); + } + + private static void accumulateToBuilderSequential( + @NotNull final ObjectArraySource builderColumn, + @NotNull final RowSet rowSetToAdd, final long destination) { + final RowSetBuilderSequential builder = (RowSetBuilderSequential) builderColumn.getUnsafe(destination); + if (builder == null) { + // create (and store) a new builder, fill with this rowset + final RowSetBuilderSequential newBuilder = RowSetFactory.builderSequential(); + newBuilder.appendRowSequence(rowSetToAdd); + builderColumn.set(destination, newBuilder); + return; + } + // add the rowset to the stored builder + builder.appendRowSequence(rowSetToAdd); + } + + + private static void accumulateToBuilderRandom(@NotNull final ObjectArraySource builderColumn, + @NotNull final LongChunk rowKeysToAdd, + final int start, final int length, final long destination) { + final RowSetBuilderRandom builder = (RowSetBuilderRandom) builderColumn.getUnsafe(destination); + if (builder == null) { + // create (and store) a new builder, fill with these keys + final RowSetBuilderRandom newBuilder = RowSetFactory.builderRandom(); + newBuilder.addOrderedRowKeysChunk(rowKeysToAdd, start, length); + builderColumn.set(destination, newBuilder); + return; + } + // add the keys to the stored builder + builder.addOrderedRowKeysChunk(rowKeysToAdd, start, length); + } + + private static void accumulateToBuilderRandom(@NotNull final ObjectArraySource builderColumn, + @NotNull final RowSet rowSetToAdd, final long destination) { + final RowSetBuilderRandom builder = (RowSetBuilderRandom) builderColumn.getUnsafe(destination); + if (builder == null) { + // create (and store) a new builder, fill with this rowset + final RowSetBuilderRandom newBuilder = RowSetFactory.builderRandom(); + newBuilder.addRowSet(rowSetToAdd); + builderColumn.set(destination, newBuilder); + return; + } + // add the rowset to the stored builder + builder.addRowSet(rowSetToAdd); } - private WritableRowSet rowSetForSlot(final long destination) { - WritableRowSet rowSet = rowSets.getUnsafe(destination); - if (rowSet == null) { - final WritableRowSet empty = RowSetFactory.empty(); - rowSets.set(destination, rowSet = live ? empty.toTracking() : empty); + private static WritableRowSet extractAndClearBuilderRandom( + @NotNull final WritableObjectChunk builderChunk, + final int offset) { + final RowSetBuilderRandom builder = builderChunk.get(offset); + if (builder != null) { + final WritableRowSet rowSet = builder.build(); + builderChunk.set(offset, null); + return rowSet; } - return rowSet; + return null; + } + + private static WritableRowSet extractAndClearBuilderSequential( + @NotNull final WritableObjectChunk builderChunk, + final int offset) { + final RowSetBuilderSequential builder = builderChunk.get(offset); + if (builder != null) { + final WritableRowSet rowSet = builder.build(); + builderChunk.set(offset, null); + return rowSet; + } + return null; + } + + private static WritableRowSet nullToEmpty(@Nullable final WritableRowSet rowSet) { + return rowSet == null ? RowSetFactory.empty() : rowSet; } @Override public void ensureCapacity(final long tableSize) { rowSets.ensureCapacity(tableSize); + addedBuilders.ensureCapacity(tableSize); + if (live) { + removedBuilders.ensureCapacity(tableSize); + } } @Override @@ -335,11 +435,122 @@ public void resetForStep(@NotNull final TableUpdate upstream, final int starting && upstream.modifiedColumnSet().containsAny(resultInputsModifiedColumnSet); someKeyHasAddsOrRemoves = false; someKeyHasModifies = false; + stepDestinationsModified = new BitmapRandomBuilder(startingDestinationsCount); + } + + @Override + public void propagateInitialState(@NotNull final QueryTable resultTable, int startingDestinationsCount) { + Assert.neqTrue(initialized, "initialized"); + + // use the builders to create the initial rowsets + try (final RowSet initialDestinations = RowSetFactory.flat(startingDestinationsCount); + final ResettableWritableObjectChunk rowSetResettableChunk = + ResettableWritableObjectChunk.makeResettableChunk(); + final ResettableWritableObjectChunk addedBuildersResettableChunk = + ResettableWritableObjectChunk.makeResettableChunk(); + final RowSequence.Iterator destinationsIterator = + initialDestinations.getRowSequenceIterator()) { + + // noinspection unchecked + final WritableObjectChunk rowSetBackingChunk = + rowSetResettableChunk.asWritableObjectChunk(); + // noinspection unchecked + final WritableObjectChunk addedBuildersBackingChunk = + addedBuildersResettableChunk.asWritableObjectChunk(); + + while (destinationsIterator.hasMore()) { + final long firstSliceDestination = destinationsIterator.peekNextKey(); + final long firstBackingChunkDestination = + rowSets.resetWritableChunkToBackingStore(rowSetResettableChunk, firstSliceDestination); + addedBuilders.resetWritableChunkToBackingStore(addedBuildersResettableChunk, firstSliceDestination); + + final long lastBackingChunkDestination = + firstBackingChunkDestination + rowSetBackingChunk.size() - 1; + final RowSequence initialDestinationsSlice = + destinationsIterator.getNextRowSequenceThrough(lastBackingChunkDestination); + + initialDestinationsSlice.forAllRowKeys((final long destination) -> { + final int backingChunkOffset = + Math.toIntExact(destination - firstBackingChunkDestination); + final WritableRowSet addRowSet = nullToEmpty( + extractAndClearBuilderSequential(addedBuildersBackingChunk, backingChunkOffset)); + rowSetBackingChunk.set(backingChunkOffset, live ? addRowSet.toTracking() : addRowSet); + }); + } + } + initialized = true; } @Override - public void propagateUpdates(@NotNull final TableUpdate downstream, - @NotNull final RowSet newDestinations) { + public void propagateUpdates(@NotNull final TableUpdate downstream, @NotNull final RowSet newDestinations) { + // get the rowset for the updated items + try (final WritableRowSet stepDestinations = stepDestinationsModified.build()) { + // add the new destinations so a rowset will get created if it doesn't exist + stepDestinations.insert(newDestinations); + + if (stepDestinations.isEmpty()) { + return; + } + + // use the builders to modify the rowsets + try (final ResettableWritableObjectChunk rowSetResettableChunk = + ResettableWritableObjectChunk.makeResettableChunk(); + final ResettableWritableObjectChunk addedBuildersResettableChunk = + ResettableWritableObjectChunk.makeResettableChunk(); + final ResettableWritableObjectChunk removedBuildersResettableChunk = + ResettableWritableObjectChunk.makeResettableChunk(); + final RowSequence.Iterator destinationsIterator = + stepDestinations.getRowSequenceIterator()) { + + // noinspection unchecked + final WritableObjectChunk rowSetBackingChunk = + rowSetResettableChunk.asWritableObjectChunk(); + // noinspection unchecked + final WritableObjectChunk addedBuildersBackingChunk = + addedBuildersResettableChunk.asWritableObjectChunk(); + // noinspection unchecked + final WritableObjectChunk removedBuildersBackingChunk = + removedBuildersResettableChunk.asWritableObjectChunk(); + + while (destinationsIterator.hasMore()) { + final long firstSliceDestination = destinationsIterator.peekNextKey(); + final long firstBackingChunkDestination = + rowSets.resetWritableChunkToBackingStore(rowSetResettableChunk, firstSliceDestination); + addedBuilders.resetWritableChunkToBackingStore(addedBuildersResettableChunk, + firstSliceDestination); + removedBuilders.resetWritableChunkToBackingStore(removedBuildersResettableChunk, + firstSliceDestination); + + final long lastBackingChunkDestination = + firstBackingChunkDestination + rowSetBackingChunk.size() - 1; + final RowSequence initialDestinationsSlice = + destinationsIterator.getNextRowSequenceThrough(lastBackingChunkDestination); + + initialDestinationsSlice.forAllRowKeys((final long destination) -> { + final int backingChunkOffset = + Math.toIntExact(destination - firstBackingChunkDestination); + final WritableRowSet workingRowSet = rowSetBackingChunk.get(backingChunkOffset); + if (workingRowSet == null) { + // use the addRowSet as the new rowset + final WritableRowSet addRowSet = nullToEmpty( + extractAndClearBuilderRandom(addedBuildersBackingChunk, backingChunkOffset)); + rowSetBackingChunk.set(backingChunkOffset, live ? addRowSet.toTracking() : addRowSet); + } else { + try (final WritableRowSet addRowSet = + nullToEmpty(extractAndClearBuilderRandom(addedBuildersBackingChunk, + backingChunkOffset)); + final WritableRowSet removeRowSet = + nullToEmpty(extractAndClearBuilderRandom(removedBuildersBackingChunk, + backingChunkOffset))) { + workingRowSet.remove(removeRowSet); + workingRowSet.insert(addRowSet); + } + } + }); + } + } + stepDestinationsModified = null; + } initializeNewIndexPreviousValues(newDestinations); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IntStreamSortedFirstOrLastChunkedOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IntStreamSortedFirstOrLastChunkedOperator.java index c3abd87b3a7..206055bf83a 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IntStreamSortedFirstOrLastChunkedOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IntStreamSortedFirstOrLastChunkedOperator.java @@ -138,7 +138,7 @@ private boolean addChunk(@NotNull final IntChunk values, } @Override - public void propagateInitialState(@NotNull final QueryTable resultTable) { + public void propagateInitialState(@NotNull final QueryTable resultTable, int startingDestinationsCount) { copyStreamToResult(resultTable.getRowSet()); redirections = null; } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IterativeChunkedAggregationOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IterativeChunkedAggregationOperator.java index 3baad4f4be6..13cad9b5b51 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IterativeChunkedAggregationOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IterativeChunkedAggregationOperator.java @@ -275,8 +275,9 @@ default boolean addRowSet(SingletonContext context, RowSet rowSet, long destinat * Perform any internal state keeping needed for destinations that were added during initialization. * * @param resultTable The result {@link QueryTable} after initialization + * @param startingDestinationsCount The number of used destinations at the beginning of this step */ - default void propagateInitialState(@NotNull final QueryTable resultTable) {} + default void propagateInitialState(@NotNull final QueryTable resultTable, int startingDestinationsCount) {} /** * Called after initialization; when the operator's result columns must have previous tracking enabled. @@ -301,7 +302,7 @@ default UnaryOperator initializeRefreshing(@NotNull final Que /** * Reset any per-step internal state. Note that the arguments to this method should not be mutated in any way. - * + * * @param upstream The upstream ShiftAwareListener.Update * @param startingDestinationsCount The number of used destinations at the beginning of this step */ @@ -315,6 +316,7 @@ default void resetForStep(@NotNull TableUpdate upstream, int startingDestination * @param downstream The downstream TableUpdate (which does not have its {@link ModifiedColumnSet} * finalized yet) * @param newDestinations New destinations added on this update + * */ default void propagateUpdates(@NotNull final TableUpdate downstream, @NotNull final RowSet newDestinations) {} diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/LongStreamSortedFirstOrLastChunkedOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/LongStreamSortedFirstOrLastChunkedOperator.java index 0434a9023e7..3a457850f1a 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/LongStreamSortedFirstOrLastChunkedOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/LongStreamSortedFirstOrLastChunkedOperator.java @@ -138,7 +138,7 @@ private boolean addChunk(@NotNull final LongChunk values, } @Override - public void propagateInitialState(@NotNull final QueryTable resultTable) { + public void propagateInitialState(@NotNull final QueryTable resultTable, int startingDestinationsCount) { copyStreamToResult(resultTable.getRowSet()); redirections = null; } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ObjectStreamSortedFirstOrLastChunkedOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ObjectStreamSortedFirstOrLastChunkedOperator.java index 0ae2bf8270a..b1d164da2df 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ObjectStreamSortedFirstOrLastChunkedOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ObjectStreamSortedFirstOrLastChunkedOperator.java @@ -138,7 +138,7 @@ private boolean addChunk(@NotNull final ObjectChunk va } @Override - public void propagateInitialState(@NotNull final QueryTable resultTable) { + public void propagateInitialState(@NotNull final QueryTable resultTable, int startingDestinationsCount) { copyStreamToResult(resultTable.getRowSet()); redirections = null; } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/PartitionByChunkedOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/PartitionByChunkedOperator.java index e12f155c060..43f448ae049 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/PartitionByChunkedOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/PartitionByChunkedOperator.java @@ -529,7 +529,7 @@ public void ensureCapacity(final long tableSize) { public void startTrackingPrevValues() {} @Override - public void propagateInitialState(@NotNull final QueryTable resultTable) { + public void propagateInitialState(@NotNull final QueryTable resultTable, int startingDestinationsCount) { Assert.neqTrue(initialized, "initialized"); final RowSet initialDestinations = resultTable.getRowSet(); if (initialDestinations.isNonempty()) { diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ShortStreamSortedFirstOrLastChunkedOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ShortStreamSortedFirstOrLastChunkedOperator.java index 76bc6b23a25..305a6d189fe 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ShortStreamSortedFirstOrLastChunkedOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ShortStreamSortedFirstOrLastChunkedOperator.java @@ -138,7 +138,7 @@ private boolean addChunk(@NotNull final ShortChunk values, } @Override - public void propagateInitialState(@NotNull final QueryTable resultTable) { + public void propagateInitialState(@NotNull final QueryTable resultTable, int startingDestinationsCount) { copyStreamToResult(resultTable.getRowSet()); redirections = null; } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/StreamFirstChunkedOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/StreamFirstChunkedOperator.java index 1e090e089da..bd5d0ab77a5 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/StreamFirstChunkedOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/StreamFirstChunkedOperator.java @@ -149,7 +149,7 @@ private boolean maybeAssignFirst(final long destination, final long sourceIndexK } @Override - public void propagateInitialState(@NotNull final QueryTable resultTable) { + public void propagateInitialState(@NotNull final QueryTable resultTable, int startingDestinationsCount) { copyStreamToResult(resultTable.getRowSet()); redirections = null; Assert.eq(resultTable.size(), "resultTable.size()", nextDestination, "nextDestination"); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/StreamLastChunkedOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/StreamLastChunkedOperator.java index b37fef1356a..8363824d2af 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/StreamLastChunkedOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/StreamLastChunkedOperator.java @@ -75,7 +75,7 @@ public boolean addRowSet(final SingletonContext context, } @Override - public void propagateInitialState(@NotNull final QueryTable resultTable) { + public void propagateInitialState(@NotNull final QueryTable resultTable, int startingDestinationsCount) { copyStreamToResult(resultTable.getRowSet()); redirections = null; } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/TDigestPercentileOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/TDigestPercentileOperator.java index e3baa25cd91..a24475410e7 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/TDigestPercentileOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/TDigestPercentileOperator.java @@ -184,7 +184,7 @@ public boolean modifyChunk(SingletonContext context, int chunkSize, Chunk