diff --git a/engine/rowset/src/main/java/io/deephaven/engine/rowset/RowSet.java b/engine/rowset/src/main/java/io/deephaven/engine/rowset/RowSet.java index adaa57e3d17..7eb2e7dafc6 100644 --- a/engine/rowset/src/main/java/io/deephaven/engine/rowset/RowSet.java +++ b/engine/rowset/src/main/java/io/deephaven/engine/rowset/RowSet.java @@ -428,12 +428,30 @@ public long binarySearchValue(TargetComparator targetComparator, int direction) WritableRowSet subSetByKeyRange(long startKey, long endKey); /** - * Get a subset of this RowSet according to the supplied set of row positions in {@code posRowSet}. + * Get a subset of this RowSet according to the supplied sequence of row positions in {@code posRowSequence}. * - * @param posRowSet The RowSet of position-based ranges to extract. + * @param posRowSequence The {@link RowSequence} of positions ranges to get (as in {@link #get(long)}) + * @param reversed Whether to treat {@code posRowSet} as offsets relative to {@link #size()} rather than {@code 0} + * @return A new RowSet, containing the row keys from this RowSet at the row positions in {@code posRowSequence} + */ + WritableRowSet subSetForPositions(RowSequence posRowSequence, boolean reversed); + + /** + * Get a subset of this RowSet according to the supplied sequence of row positions in {@code posRowSequence}. + * + * @param posRowSequence The {@link RowSequence} of position-based ranges to extract. * @return A new RowSet, containing values at the locations in the provided RowSet. */ - WritableRowSet subSetForPositions(RowSet posRowSet); + WritableRowSet subSetForPositions(RowSequence posRowSequence); + + /** + * Get a subset of this RowSet according to the supplied sequence of row positions relative to {@link #size()} in + * {@code posRowSequence}. + * + * @param posRowSequence The {@link RowSequence} of positions ranges to get (as in {@link #get(long)}) + * @return A new RowSet, containing the row keys from this RowSet at the row positions in {@code posRowSequence} + */ + WritableRowSet subSetForReversePositions(RowSequence posRowSequence); /** * Returns the row key at the given row position. diff --git a/engine/rowset/src/main/java/io/deephaven/engine/rowset/impl/WritableRowSetImpl.java b/engine/rowset/src/main/java/io/deephaven/engine/rowset/impl/WritableRowSetImpl.java index 01514e5c7ae..8c0662b8317 100644 --- a/engine/rowset/src/main/java/io/deephaven/engine/rowset/impl/WritableRowSetImpl.java +++ b/engine/rowset/src/main/java/io/deephaven/engine/rowset/impl/WritableRowSetImpl.java @@ -23,8 +23,7 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; -import java.util.Objects; -import java.util.PrimitiveIterator; +import java.util.*; import java.util.function.LongConsumer; public class WritableRowSetImpl extends RowSequenceAsChunkImpl implements WritableRowSet, Externalizable { @@ -335,21 +334,30 @@ public final WritableRowSet subSetByKeyRange(final long startKey, final long end } @Override - public final WritableRowSet subSetForPositions(RowSet posRowSet) { + public final WritableRowSet subSetForPositions(RowSequence posRowSequence, boolean reversed) { + if (reversed) { + return subSetForReversePositions(posRowSequence); + } + return subSetForPositions(posRowSequence); + } + + @Override + public final WritableRowSet subSetForPositions(RowSequence positions) { + if (positions.isEmpty()) { + return RowSetFactory.empty(); + } final MutableLong currentOffset = new MutableLong(); final RowSequence.Iterator iter = getRowSequenceIterator(); final RowSetBuilderSequential builder = RowSetFactory.builderSequential(); - posRowSet.forEachRowKeyRange((start, end) -> { + positions.forEachRowKeyRange((start, end) -> { if (currentOffset.longValue() < start) { // skip items until the beginning of this range iter.getNextRowSequenceWithLength(start - currentOffset.longValue()); currentOffset.setValue(start); } - if (!iter.hasMore()) { return false; } - iter.getNextRowSequenceWithLength(end + 1 - currentOffset.longValue()) .forAllRowKeyRanges(builder::appendRange); currentOffset.setValue(end + 1); @@ -358,6 +366,48 @@ public final WritableRowSet subSetForPositions(RowSet posRowSet) { return builder.build(); } + @Override + public final WritableRowSet subSetForReversePositions(RowSequence positions) { + if (positions.isEmpty()) { + return RowSetFactory.empty(); + } + + final long lastRowPosition = size() - 1; + if (positions.size() == positions.lastRowKey() - positions.firstRowKey() + 1) { + // We have a single range in the input sequence + final long forwardEnd = lastRowPosition - positions.firstRowKey(); + if (forwardEnd < 0) { + // The single range does not overlap with the available positions at all + return RowSetFactory.empty(); + } + // Clamp the single range end to 0 + final long forwardStart = Math.max(lastRowPosition - positions.lastRowKey(), 0); + try (final RowSequence forwardPositions = RowSequenceFactory.forRange(forwardStart, forwardEnd)) { + return subSetForPositions(forwardPositions); + } + } + + // We have some non-trivial input sequence + final RowSetBuilderRandom builder = RowSetFactory.builderRandom(); + positions.forEachRowKeyRange((start, end) -> { + final long forwardEnd = lastRowPosition - start; + if (forwardEnd < 0) { + // This range does not overlap with the available positions at all, and thus neither can subsequent + // ranges that are offset further from the lastRowPosition. + return false; + } + // Clamp the range end to 0 + final long forwardStart = Math.max(lastRowPosition - end, 0); + builder.addRange(forwardStart, forwardEnd); + + // Continue iff subsequent ranges may overlap the available positions + return forwardStart != 0; + }); + try (final RowSequence forwardPositions = builder.build()) { + return subSetForPositions(forwardPositions); + } + } + @Override public final long get(final long rowPosition) { return innerSet.ixGet(rowPosition); diff --git a/engine/rowset/src/test/java/io/deephaven/engine/rowset/impl/WritableRowSetImplTest.java b/engine/rowset/src/test/java/io/deephaven/engine/rowset/impl/WritableRowSetImplTest.java index d432b923464..48ea01d67b0 100644 --- a/engine/rowset/src/test/java/io/deephaven/engine/rowset/impl/WritableRowSetImplTest.java +++ b/engine/rowset/src/test/java/io/deephaven/engine/rowset/impl/WritableRowSetImplTest.java @@ -14,6 +14,7 @@ import io.deephaven.engine.rowset.*; import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeyRanges; import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeys; +import io.deephaven.engine.rowset.impl.rsp.container.MutableInteger; import io.deephaven.util.datastructures.LongRangeIterator; import io.deephaven.chunk.LongChunk; import io.deephaven.chunk.WritableLongChunk; @@ -3250,4 +3251,122 @@ public void testRemoveSrFromRspFullBlockRegression2() { final OrderedLongSet result = rb.ixRemove(sr); assertEquals(card - sr.getCardinality(), result.ixCardinality()); } + + public void testSubSetForPositions() { + final RowSetBuilderSequential b = RowSetFactory.builderSequential(); + final long[] vs = new long[] {3, 4, 5, 8, 10, 12, 29, 31, 44, 45, 46, 59, 60, 61, 72, 65537, 65539, 65536 * 3, + 65536 * 3 + 5}; + for (long v : vs) { + b.appendKey(v); + } + final RowSet ix = b.build(); + final long sz = ix.size(); + + // test the empty ranges + try (final RowSet empty = RowSetFactory.empty()) { + assertEquals("empty range, fwd", ix.subSetForPositions(empty, false).size(), 0); + assertEquals("empty range, rev", ix.subSetForPositions(empty, true).size(), 0); + } + + final int EXTRA_RANGE = 10; + + // single range, deliberately allow some to be outside the upper boundary + for (int start = 0; start < vs.length; start++) { + for (int end = start; end <= vs.length + EXTRA_RANGE; end++) { + + int coercedEnd = Math.min(end, vs.length - 1); + int expectedSize = coercedEnd - start + 1; // rowset ranges are inclusive + + String m = "single range, s=" + start + ", end=" + end; + try (final RowSequence rs = RowSetFactory.fromRange(start, end)) { + try (final RowSet ret = ix.subSetForPositions(rs, false)) { + // verify the size is correct + assertEquals(m + ", fwd: size check", expectedSize, ret.size()); + for (int i = 0; i < ret.size(); i++) { + int idx = start + i; + assertEquals(m + ", fwd: i=" + i, vs[idx], ret.get(i)); + } + } + // now test the reversed functionality + try (final RowSet ret = ix.subSetForPositions(rs, true)) { + // verify the size is correct + assertEquals(m + ", rev: size check", expectedSize, ret.size()); + int lastPos = vs.length - 1; + for (int i = 0; i < ret.size(); i++) { + int idx = lastPos - coercedEnd + i; + assertEquals(m + ", rev: i=" + i, vs[idx], ret.get(i)); + } + } + } + } + } + + // complex ranges, deliberately allow some to be outside the upper boundary + for (int rangeSize = 1; rangeSize < vs.length + EXTRA_RANGE + 1 / 2; rangeSize++) { + String m = "complex range, rangeSize=" + rangeSize; + + final RowSetBuilderSequential rb = RowSetFactory.builderSequential(); + + int expectedSize = 0; + + // create ranges of these sizes, skipping 1 value each time + long idx = 0; + while (idx < vs.length + EXTRA_RANGE) { + long s = idx; + long e = idx + rangeSize - 1; + + // compute the expected size + if (s < vs.length) { + expectedSize += Math.min(e, vs.length - 1) - s + 1; + } + + rb.appendRange(idx, e); // range is inclusive + idx += rangeSize + 1; // skip a value + } + + try (final RowSequence rs = rb.build()) { + try (final RowSet ret = ix.subSetForPositions(rs, false)) { + // verify the size is correct + assertEquals(m + ", fwd: size check", expectedSize, ret.size()); + + // keep track of the index of the working set + final MutableInteger retIndex = new MutableInteger(0); + + rs.forEachRowKeyRange((final long start, final long end) -> { + for (long i = start; i <= end; i++) { + int index = (int) i; + if (index >= 0 && index < vs.length) { + assertEquals(m + ", fwd: i=" + index, vs[index], ret.get(retIndex.value)); + retIndex.value++; + } + } + return true; + }); + } + + // now test the reversed functionality + try (final RowSet ret = ix.subSetForPositions(rs, true)) { + // verify the size is correct + assertEquals(m + ", rev: size check", expectedSize, ret.size()); + + // keep track of the index of the working set + final MutableInteger retIndex = new MutableInteger(0); + final int lastPos = vs.length - 1; + + rs.forEachRowKeyRange((final long start, final long end) -> { + // translate into reversed positions + for (long i = start; i <= end; i++) { + int index = lastPos - (int) i; + if (index >= 0 && index < vs.length) { + assertEquals(m + ", fwd: i=" + index, vs[index], + ret.get(ret.size() - retIndex.value - 1)); + retIndex.value++; + } + } + return true; + }); + } + } + } + } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java index 1338cd262cd..9111f450f75 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java @@ -10,10 +10,8 @@ import io.deephaven.base.verify.Assert; import io.deephaven.configuration.Configuration; import io.deephaven.datastructures.util.CollectionUtil; -import io.deephaven.engine.rowset.RowSetShiftData; +import io.deephaven.engine.rowset.*; import io.deephaven.engine.table.SharedContext; -import io.deephaven.engine.rowset.RowSet; -import io.deephaven.engine.rowset.RowSetFactory; import io.deephaven.engine.updategraph.UpdateGraphProcessor; import io.deephaven.engine.table.impl.ShiftObliviousInstrumentedListener; import io.deephaven.engine.table.impl.sources.ReinterpretUtils; @@ -522,7 +520,7 @@ static InitialSnapshot constructInitialSnapshotInPositionSpace(final Object logI */ public static BarrageMessage constructBackplaneSnapshot(final Object logIdentityObject, final BaseTable table) { - return constructBackplaneSnapshotInPositionSpace(logIdentityObject, table, null, null); + return constructBackplaneSnapshotInPositionSpace(logIdentityObject, table, null, null, null); } /** @@ -539,9 +537,10 @@ public static BarrageMessage constructBackplaneSnapshot(final Object logIdentity public static BarrageMessage constructBackplaneSnapshotInPositionSpace(final Object logIdentityObject, final BaseTable table, @Nullable final BitSet columnsToSerialize, - @Nullable final RowSet positionsToSnapshot) { + @Nullable final RowSet positionsToSnapshot, + @Nullable final RowSet reversePositionsToSnapshot) { return constructBackplaneSnapshotInPositionSpace(logIdentityObject, table, columnsToSerialize, - positionsToSnapshot, makeSnapshotControl(false, table)); + positionsToSnapshot, reversePositionsToSnapshot, makeSnapshotControl(false, table)); } /** @@ -559,6 +558,7 @@ public static BarrageMessage constructBackplaneSnapshotInPositionSpace(final Obj @NotNull final BaseTable table, @Nullable final BitSet columnsToSerialize, @Nullable final RowSet positionsToSnapshot, + @Nullable final RowSet reversePositionsToSnapshot, @NotNull final SnapshotControl control) { final BarrageMessage snapshot = new BarrageMessage(); @@ -567,16 +567,30 @@ public static BarrageMessage constructBackplaneSnapshotInPositionSpace(final Obj final SnapshotFunction doSnapshot = (usePrev, beforeClockValue) -> { final RowSet keysToSnapshot; - if (positionsToSnapshot == null) { + if (positionsToSnapshot == null && reversePositionsToSnapshot == null) { keysToSnapshot = null; - } else if (usePrev) { - try (final RowSet prevRowSet = table.getRowSet().copyPrev()) { - keysToSnapshot = prevRowSet.subSetForPositions(positionsToSnapshot); - } } else { - keysToSnapshot = table.getRowSet().subSetForPositions(positionsToSnapshot); + final RowSet rowSetToUse = usePrev ? table.getRowSet().copyPrev() : table.getRowSet(); + try (final SafeCloseable ignored = usePrev ? rowSetToUse : null) { + final WritableRowSet forwardKeys = + positionsToSnapshot == null ? null : rowSetToUse.subSetForPositions(positionsToSnapshot); + final RowSet reverseKeys = reversePositionsToSnapshot == null ? null + : rowSetToUse.subSetForReversePositions(reversePositionsToSnapshot); + if (forwardKeys != null) { + if (reverseKeys != null) { + forwardKeys.insert(reverseKeys); + reverseKeys.close(); + } + keysToSnapshot = forwardKeys; + } else { + keysToSnapshot = reverseKeys; + } + } + } + try (final RowSet ignored = keysToSnapshot) { + return serializeAllTable(usePrev, snapshot, table, logIdentityObject, columnsToSerialize, + keysToSnapshot); } - return serializeAllTable(usePrev, snapshot, table, logIdentityObject, columnsToSerialize, keysToSnapshot); }; snapshot.step = callDataSnapshotFunction(System.identityHashCode(logIdentityObject), control, doSnapshot); @@ -1273,7 +1287,6 @@ public static boolean serializeAllTable(boolean usePrev, * @param logIdentityObject an object for use with log() messages * @param columnsToSerialize A {@link BitSet} of columns to include, null for all * @param keysToSnapshot A RowSet of keys within the table to include, null for all - * * @return true if the snapshot was computed with an unchanged clock, false otherwise. */ public static boolean serializeAllTable(final boolean usePrev, @@ -1282,6 +1295,7 @@ public static boolean serializeAllTable(final boolean usePrev, final Object logIdentityObject, final BitSet columnsToSerialize, final RowSet keysToSnapshot) { + snapshot.rowsAdded = (usePrev ? table.getRowSet().copyPrev() : table.getRowSet()).copy(); snapshot.rowsRemoved = RowSetFactory.empty(); snapshot.addColumnData = new BarrageMessage.AddColumnData[table.getColumnSources().size()]; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/BarrageMessage.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/BarrageMessage.java index a5bcf3eeff0..f12dce7ba51 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/BarrageMessage.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/BarrageMessage.java @@ -43,6 +43,7 @@ public static class AddColumnData { public boolean isSnapshot; public RowSet snapshotRowSet; + public boolean snapshotRowSetIsReversed; public BitSet snapshotColumns; public RowSet rowsAdded; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/ShiftInversionHelper.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/ShiftInversionHelper.java index 3b397507916..a3b2aab7d8e 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/ShiftInversionHelper.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/ShiftInversionHelper.java @@ -10,31 +10,48 @@ */ public class ShiftInversionHelper { - final RowSetShiftData shifted; + private final RowSetShiftData shifted; + private final boolean reverseOrder; - private int destShiftIdx = 0; + private int destShiftIdx; public ShiftInversionHelper(final RowSetShiftData shifted) { + // if not specified, assume forward viewport ordering + this(shifted, false); + } + + public ShiftInversionHelper(final RowSetShiftData shifted, final boolean reverseOrder) { this.shifted = shifted; + this.reverseOrder = reverseOrder; + this.destShiftIdx = reverseOrder ? shifted.size() : 0; } private void advanceDestShiftIdx(final long destKey) { Assert.geq(destKey, "destKey", 0); - destShiftIdx = (int) binarySearch(destShiftIdx, shifted.size(), innerShiftIdx -> { - long destEnd = shifted.getEndRange((int) innerShiftIdx) + shifted.getShiftDelta((int) innerShiftIdx); - // due to destKey's expected range, we know this subtraction will not overflow - return destEnd - destKey; - }); + destShiftIdx = (int) binarySearch( + reverseOrder ? 0 : destShiftIdx, + reverseOrder ? destShiftIdx : shifted.size(), + innerShiftIdx -> { + long destEnd = + shifted.getEndRange((int) innerShiftIdx) + shifted.getShiftDelta((int) innerShiftIdx); + // due to destKey's expected range, we know this subtraction will not overflow + return destEnd - destKey; + }); } /** * Converts post-keyspace key to pre-keyspace key. It expects to be invoked in ascending key order. */ public long mapToPrevKeyspace(final long key, final boolean isEnd) { + if (shifted.empty()) { + return key; + } + advanceDestShiftIdx(key); final long retval; final int idx = destShiftIdx; + if (idx < shifted.size() && shifted.getBeginRange(idx) + shifted.getShiftDelta(idx) <= key) { // inside of a destination shift; this is easy to map to prev retval = key - shifted.getShiftDelta(idx); diff --git a/extensions/barrage/build.gradle b/extensions/barrage/build.gradle index eb655d8fa45..27eee716d38 100644 --- a/extensions/barrage/build.gradle +++ b/extensions/barrage/build.gradle @@ -16,7 +16,7 @@ dependencies { api project(':engine-table') implementation project(':proto:proto-backplane-grpc-flight') implementation project(':log-factory') - api "io.deephaven.barrage:barrage-format:0.4.0" + api "io.deephaven.barrage:barrage-format:0.4.1" Classpaths.inheritFlatbuffer(project, 'implementation') diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java index 45e8a4c6a02..1a446578e06 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java @@ -86,6 +86,7 @@ public class BarrageTable extends QueryTable implements BarrageMessage.Listener, * re-send data that the client should already have within the existing viewport. */ private RowSet serverViewport; + private boolean serverReverseViewport; private BitSet serverColumns; @@ -198,8 +199,7 @@ public void handleBarrageError(Throwable t) { enqueueError(t); } - private UpdateCoalescer processUpdate(final BarrageMessage update, - final UpdateCoalescer coalescer) { + private UpdateCoalescer processUpdate(final BarrageMessage update, final UpdateCoalescer coalescer) { if (DEBUG_ENABLED) { saveForDebugging(update); @@ -223,6 +223,7 @@ private UpdateCoalescer processUpdate(final BarrageMessage update, if (update.isSnapshot) { serverViewport = update.snapshotRowSet == null ? null : update.snapshotRowSet.copy(); + serverReverseViewport = update.snapshotRowSetIsReversed; serverColumns = update.snapshotColumns == null ? null : (BitSet) update.snapshotColumns.clone(); } @@ -233,7 +234,9 @@ private UpdateCoalescer processUpdate(final BarrageMessage update, try (final RowSet currRowsFromPrev = currentRowSet.copy(); final WritableRowSet populatedRows = - (serverViewport != null ? currentRowSet.subSetForPositions(serverViewport) : null)) { + (serverViewport != null + ? currentRowSet.subSetForPositions(serverViewport, serverReverseViewport) + : null)) { // removes currentRowSet.remove(update.rowsRemoved); @@ -307,7 +310,8 @@ private UpdateCoalescer processUpdate(final BarrageMessage update, // remove all data outside of our viewport if (serverViewport != null) { - try (final RowSet newPopulated = currentRowSet.subSetForPositions(serverViewport)) { + try (final RowSet newPopulated = + currentRowSet.subSetForPositions(serverViewport, serverReverseViewport)) { populatedRows.remove(newPopulated); freeRows(populatedRows); } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageStreamReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageStreamReader.java index dbe71bed4cd..60cb74c16b0 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageStreamReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageStreamReader.java @@ -81,6 +81,8 @@ public BarrageMessage safelyParseFrom(final StreamReaderOptions options, msg = new BarrageMessage(); msg.isSnapshot = metadata.isSnapshot(); + msg.snapshotRowSetIsReversed = metadata.effectiveReverseViewport(); + numAddBatchesRemaining = metadata.numAddBatches(); numModBatchesRemaining = metadata.numModBatches(); if (numAddBatchesRemaining > 1 || numModBatchesRemaining > 1) { diff --git a/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/SnapshotExampleBase.java b/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/SnapshotExampleBase.java index 042c8d25d73..6487b3e0d16 100644 --- a/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/SnapshotExampleBase.java +++ b/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/SnapshotExampleBase.java @@ -106,6 +106,35 @@ protected void execute(final BarrageSession client) throws Exception { TableTools.show(table); } + // example #6 - reverse viewport, all columns + try (final TableHandle handle = manager.executeLogic(logic()); + final RowSet viewport = RowSetFactory.flat(5); // range inclusive + + final BarrageSnapshot snapshot = client.snapshot(handle, options)) { + + // expect this to block until all reading complete + final BarrageTable table = snapshot.partialTable(viewport, null, true); + + System.out.println("Table info: rows = " + table.size() + ", cols = " + table.getColumns().length); + TableTools.show(table); + } + + // example #7 - reverse viewport, some columns + try (final TableHandle handle = manager.executeLogic(logic()); + final RowSet viewport = RowSetFactory.flat(5); // range inclusive + + final BarrageSnapshot snapshot = client.snapshot(handle, options)) { + + final BitSet columns = new BitSet(); + columns.set(0, 2); // range not inclusive (sets bits 0-1) + + // expect this to block until all reading complete + final BarrageTable table = snapshot.partialTable(viewport, columns, true); + + System.out.println("Table info: rows = " + table.size() + ", cols = " + table.getColumns().length); + TableTools.show(table); + } + System.out.println("End of Snapshot examples"); } diff --git a/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/SubscribeExampleBase.java b/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/SubscribeExampleBase.java index 978711be4c7..ca5d442c08e 100644 --- a/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/SubscribeExampleBase.java +++ b/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/SubscribeExampleBase.java @@ -8,6 +8,7 @@ import io.deephaven.client.impl.BarrageSubscription; import io.deephaven.client.impl.TableHandle; import io.deephaven.client.impl.TableHandleManager; +import io.deephaven.engine.rowset.RowSetFactory; import io.deephaven.engine.table.TableUpdate; import io.deephaven.engine.table.impl.InstrumentedTableUpdateListener; import io.deephaven.extensions.barrage.BarrageSubscriptionOptions; @@ -19,6 +20,12 @@ abstract class SubscribeExampleBase extends BarrageClientExampleBase { + @CommandLine.Option(names = {"--tail"}, required = false, description = "Tail viewport size") + int tailSize = 0; + + @CommandLine.Option(names = {"--head"}, required = false, description = "Header viewport size") + int headerSize = 0; + static class Mode { @CommandLine.Option(names = {"-b", "--batch"}, required = true, description = "Batch mode") boolean batch; @@ -42,7 +49,19 @@ protected void execute(final BarrageSession client) throws Exception { try (final TableHandle handle = manager.executeLogic(logic()); final BarrageSubscription subscription = client.subscribe(handle, options)) { - final BarrageTable table = subscription.entireTable(); + + final BarrageTable table; + if (headerSize > 0) { + // create a table subscription with forward viewport of the specified size + table = subscription.partialTable(RowSetFactory.flat(headerSize), null, true); + } else if (tailSize > 0) { + // create a table subscription with reverse viewport of the specified size + table = subscription.partialTable(RowSetFactory.flat(tailSize), null, true); + } else { + // create a table subscription of the entire table + table = subscription.entireTable(); + } + final CountDownLatch countDownLatch = new CountDownLatch(1); table.listenForUpdates(new InstrumentedTableUpdateListener("example-listener") { diff --git a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshot.java b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshot.java index 80f4cb66265..a20ca1bb6eb 100644 --- a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshot.java +++ b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshot.java @@ -50,7 +50,22 @@ BarrageSnapshot snapshot(TableSpec tableSpec, BarrageSnapshotOptions options) * Request a partial snapshot of the data limited by viewport or column set and populate a {@link BarrageTable} with * the data that is received. * + * @param viewport the position-space viewport to use for the snapshot + * @param columns the columns to include in the snapshot + * * @return the {@code BarrageTable} */ BarrageTable partialTable(RowSet viewport, BitSet columns) throws InterruptedException; + + /** + * Request a partial snapshot of the data limited by viewport or column set and populate a {@link BarrageTable} with + * the data that is received. Allows the viewport to be reversed. + * + * @param viewport the position-space viewport to use for the snapshot + * @param columns the columns to include in the snapshot + * @param reverseViewport Whether to treat {@code posRowSet} as offsets from {@link #size()} rather than {@code 0} + * + * @return the {@code BarrageTable} + */ + BarrageTable partialTable(RowSet viewport, BitSet columns, boolean reverseViewport) throws InterruptedException; } diff --git a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshotImpl.java b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshotImpl.java index 7719131a134..e1425b8a97f 100644 --- a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshotImpl.java +++ b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshotImpl.java @@ -152,6 +152,12 @@ public BarrageTable entireTable() throws InterruptedException { @Override public synchronized BarrageTable partialTable(RowSet viewport, BitSet columns) throws InterruptedException { + return partialTable(viewport, columns, false); + } + + @Override + public synchronized BarrageTable partialTable(RowSet viewport, BitSet columns, boolean reverseViewport) + throws InterruptedException { // notify user when connection has already been used and closed if (prevUsed) { throw new UnsupportedOperationException("Snapshot object already used"); @@ -178,7 +184,7 @@ public synchronized BarrageTable partialTable(RowSet viewport, BitSet columns) t // Send the snapshot request: observer.onNext(FlightData.newBuilder() - .setAppMetadata(ByteStringAccess.wrap(makeRequestInternal(viewport, columns, options))) + .setAppMetadata(ByteStringAccess.wrap(makeRequestInternal(viewport, columns, reverseViewport, options))) .build()); observer.onCompleted(); @@ -255,6 +261,7 @@ public LogOutput append(final LogOutput logOutput) { private ByteBuffer makeRequestInternal( @Nullable final RowSet viewport, @Nullable final BitSet columns, + boolean reverseViewport, @Nullable BarrageSnapshotOptions options) { final FlatBufferBuilder metadata = new FlatBufferBuilder(); @@ -278,6 +285,7 @@ private ByteBuffer makeRequestInternal( BarrageSnapshotRequest.addViewport(metadata, vpOffset); BarrageSnapshotRequest.addSnapshotOptions(metadata, optOffset); BarrageSnapshotRequest.addTicket(metadata, ticOffset); + BarrageSnapshotRequest.addReverseViewport(metadata, reverseViewport); metadata.finish(BarrageSnapshotRequest.endBarrageSnapshotRequest(metadata)); final FlatBufferBuilder wrapper = new FlatBufferBuilder(); diff --git a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscription.java b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscription.java index 4cd1cd23cc7..a3569db77c6 100644 --- a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscription.java +++ b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscription.java @@ -52,8 +52,23 @@ BarrageSubscription subscribe(TableSpec tableSpec, BarrageSubscriptionOptions op * Request a partial subscription of the data limited by viewport or column set and populate a {@link BarrageTable} * with the data that is received. * + * @param viewport the position-space viewport to use for the subscription + * @param columns the columns to include in the subscription + * * @return the {@code BarrageTable} */ BarrageTable partialTable(RowSet viewport, BitSet columns) throws InterruptedException; + /** + * Request a partial subscription of the data limited by viewport or column set and populate a {@link BarrageTable} + * with the data that is received. Allows the viewport to be reversed. + * + * @param viewport the position-space viewport to use for the subscription + * @param columns the columns to include in the subscription + * @param reverseViewport Whether to treat {@code posRowSet} as offsets from {@link #size()} rather than {@code 0} + * + * @return the {@code BarrageTable} + */ + BarrageTable partialTable(RowSet viewport, BitSet columns, boolean reverseViewport) throws InterruptedException; + } diff --git a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscriptionImpl.java b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscriptionImpl.java index c77a43e0982..58987a17891 100644 --- a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscriptionImpl.java +++ b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscriptionImpl.java @@ -134,11 +134,16 @@ public void onCompleted() { @Override public BarrageTable entireTable() { - return partialTable(null, null); + return partialTable(null, null, false); } @Override public synchronized BarrageTable partialTable(RowSet viewport, BitSet columns) { + return partialTable(viewport, columns, false); + } + + @Override + public synchronized BarrageTable partialTable(RowSet viewport, BitSet columns, boolean reverseViewport) { if (!connected) { throw new UncheckedDeephavenException( this + " is no longer an active subscription and cannot be retained further"); @@ -146,7 +151,8 @@ public synchronized BarrageTable partialTable(RowSet viewport, BitSet columns) { if (!subscribed) { // Send the initial subscription: observer.onNext(FlightData.newBuilder() - .setAppMetadata(ByteStringAccess.wrap(makeRequestInternal(viewport, columns, options))) + .setAppMetadata( + ByteStringAccess.wrap(makeRequestInternal(viewport, columns, reverseViewport, options))) .build()); subscribed = true; } @@ -192,6 +198,7 @@ public LogOutput append(final LogOutput logOutput) { private ByteBuffer makeRequestInternal( @Nullable final RowSet viewport, @Nullable final BitSet columns, + boolean reverseViewport, @Nullable BarrageSubscriptionOptions options) { final FlatBufferBuilder metadata = new FlatBufferBuilder(); @@ -215,6 +222,7 @@ private ByteBuffer makeRequestInternal( BarrageSubscriptionRequest.addViewport(metadata, vpOffset); BarrageSubscriptionRequest.addSubscriptionOptions(metadata, optOffset); BarrageSubscriptionRequest.addTicket(metadata, ticOffset); + BarrageSubscriptionRequest.addReverseViewport(metadata, reverseViewport); metadata.finish(BarrageSubscriptionRequest.endBarrageSubscriptionRequest(metadata)); final FlatBufferBuilder wrapper = new FlatBufferBuilder(); diff --git a/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java b/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java index 7c92dfce22b..5491beaeffb 100644 --- a/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java +++ b/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java @@ -468,24 +468,33 @@ public void handleMessage(BarrageProtoUtil.MessageInfo message) { hasColumns ? BitSet.valueOf(snapshotRequest.columnsAsByteBuffer()) : null; final boolean hasViewport = snapshotRequest.viewportVector() != null; - final RowSet viewport = + RowSet viewport = hasViewport ? BarrageProtoUtil.toRowSet(snapshotRequest.viewportAsByteBuffer()) : null; - // get ourselves some data! - final BarrageMessage msg = - ConstructSnapshot.constructBackplaneSnapshotInPositionSpace(this, - table, columns, viewport); + final boolean reverseViewport = snapshotRequest.reverseViewport(); + + // get ourselves some data (reversing viewport as instructed) + final BarrageMessage msg; + if (reverseViewport) { + msg = ConstructSnapshot.constructBackplaneSnapshotInPositionSpace(this, table, + columns, null, viewport); + } else { + msg = ConstructSnapshot.constructBackplaneSnapshotInPositionSpace(this, table, + columns, viewport, null); + } msg.modColumnData = ZERO_MOD_COLUMNS; // no mod column data // translate the viewport to keyspace and make the call try (final BarrageStreamGenerator bsg = new BarrageStreamGenerator(msg); final RowSet keySpaceViewport = - hasViewport ? msg.rowsAdded.subSetForPositions(viewport) : null) { + hasViewport + ? msg.rowsAdded.subSetForPositions(viewport, reverseViewport) + : null) { listener.onNext( bsg.getSnapshotView(DEFAULT_SNAPSHOT_DESER_OPTIONS, viewport, - keySpaceViewport, columns)); + reverseViewport, keySpaceViewport, columns)); } listener.onCompleted(); @@ -604,7 +613,10 @@ private synchronized void onExportResolved(final SessionState.ExportObject { * @param options serialization options for this specific view * @param isInitialSnapshot indicates whether or not this is the first snapshot for the listener * @param viewport is the position-space viewport + * @param reverseViewport is the viewport reversed (relative to end of table instead of beginning) * @param keyspaceViewport is the key-space viewport * @param subscribedColumns are the columns subscribed for this view * @return a MessageView filtered by the subscription properties that can be sent to that subscriber */ MessageView getSubView(BarrageSubscriptionOptions options, boolean isInitialSnapshot, @Nullable RowSet viewport, - @Nullable RowSet keyspaceViewport, BitSet subscribedColumns); + boolean reverseViewport, @Nullable RowSet keyspaceViewport, BitSet subscribedColumns); /** * Obtain a Full-Snapshot View of this StreamGenerator that can be sent to a single requestor. @@ -174,11 +150,11 @@ MessageView getSubView(BarrageSubscriptionOptions options, boolean isInitialSnap * * @param options serialization options for this specific view * @param viewport is the position-space viewport - * @param keyspaceViewport is the key-space viewport + * @param reverseViewport is the viewport reversed (relative to end of table instead of beginning) * @param snapshotColumns are the columns included for this view * @return a MessageView filtered by the snapshot properties that can be sent to that requestor */ - MessageView getSnapshotView(BarrageSnapshotOptions options, @Nullable RowSet viewport, + MessageView getSnapshotView(BarrageSnapshotOptions options, @Nullable RowSet viewport, boolean reverseViewport, @Nullable RowSet keyspaceViewport, BitSet snapshotColumns); } @@ -347,7 +323,11 @@ public void close() { * notes on {@link Subscription} for details of the subscription life cycle. */ private RowSet activeViewport = null; + private RowSet activeReverseViewport = null; + private RowSet postSnapshotViewport = null; + private RowSet postSnapshotReverseViewport = null; + private final BitSet activeColumns = new BitSet(); private final BitSet postSnapshotColumns = new BitSet(); private final BitSet objectColumnsToClear = new BitSet(); @@ -445,20 +425,25 @@ private class Subscription { RowSet viewport; // active viewport BitSet subscribedColumns; // active subscription columns + boolean reverseViewport = false; // is the active viewport reversed (indexed from end of table) boolean isActive = false; // is this subscription in our active list? boolean pendingDelete = false; // is this subscription deleted as far as the client is concerned? boolean hasPendingUpdate = false; // is this subscription in our pending list? boolean pendingInitialSnapshot = true; // do we need to send the initial snapshot? RowSet pendingViewport; // if an update is pending this is our new viewport + boolean pendingReverseViewport; // is the pending viewport reversed (indexed from end of table) BitSet pendingColumns; // if an update is pending this is our new column subscription set + RowSet snapshotViewport = null; // captured viewport during snapshot portion of propagation job BitSet snapshotColumns = null; // captured column during snapshot portion of propagation job + boolean snapshotReverseViewport = false; // captured setting during snapshot portion of propagation job private Subscription(final StreamObserver listener, final BarrageSubscriptionOptions options, final BitSet subscribedColumns, - final @Nullable RowSet initialViewport) { + final @Nullable RowSet initialViewport, + final boolean reverseViewport) { this.options = options; this.listener = listener; this.logPrefix = "Sub{" + Integer.toHexString(System.identityHashCode(listener)) + "}: "; @@ -466,6 +451,7 @@ private Subscription(final StreamObserver listener, this.subscribedColumns = new BitSet(); this.pendingColumns = subscribedColumns; this.pendingViewport = initialViewport; + this.pendingReverseViewport = this.reverseViewport = reverseViewport; } public boolean isViewport() { @@ -484,7 +470,8 @@ public boolean isViewport() { public void addSubscription(final StreamObserver listener, final BarrageSubscriptionOptions options, final @Nullable BitSet columnsToSubscribe, - final @Nullable RowSet initialViewport) { + final @Nullable RowSet initialViewport, + final boolean reverseViewport) { synchronized (this) { final boolean hasSubscription = activeSubscriptions.stream().anyMatch(item -> item.listener == listener) || pendingSubscriptions.stream().anyMatch(item -> item.listener == listener); @@ -500,7 +487,8 @@ public void addSubscription(final StreamObserver listener, } else { cols = (BitSet) columnsToSubscribe.clone(); } - final Subscription subscription = new Subscription(listener, options, cols, initialViewport); + final Subscription subscription = + new Subscription(listener, options, cols, initialViewport, reverseViewport); log.info().append(logPrefix) .append(subscription.logPrefix) @@ -556,11 +544,17 @@ public boolean updateSubscription(final StreamObserver listener, public boolean updateViewport(final StreamObserver listener, final RowSet newViewport) { + return updateViewport(listener, newViewport, false); + } + + public boolean updateViewport(final StreamObserver listener, final RowSet newViewport, + final boolean newReverseViewport) { return findAndUpdateSubscription(listener, sub -> { if (sub.pendingViewport != null) { sub.pendingViewport.close(); } sub.pendingViewport = newViewport.copy(); + sub.pendingReverseViewport = newReverseViewport; if (sub.pendingColumns == null) { sub.pendingColumns = (BitSet) sub.subscribedColumns.clone(); } @@ -571,11 +565,17 @@ public boolean updateViewport(final StreamObserver listener, public boolean updateViewportAndColumns(final StreamObserver listener, final RowSet newViewport, final BitSet columnsToSubscribe) { + return updateViewportAndColumns(listener, newViewport, columnsToSubscribe); + } + + public boolean updateViewportAndColumns(final StreamObserver listener, final RowSet newViewport, + final BitSet columnsToSubscribe, final boolean newReverseViewport) { return findAndUpdateSubscription(listener, sub -> { if (sub.pendingViewport != null) { sub.pendingViewport.close(); } sub.pendingViewport = newViewport.copy(); + sub.pendingReverseViewport = newReverseViewport; sub.pendingColumns = (BitSet) columnsToSubscribe.clone(); log.info().append(logPrefix).append(sub.logPrefix) .append("scheduling update immediately, for viewport and column updates.").endl(); @@ -689,8 +689,22 @@ private void enqueueUpdate(final TableUpdate upstream) { if (numFullSubscriptions > 0) { addsToRecord = upstream.added().copy(); modsToRecord = upstream.modified().copy(); - } else if (activeViewport != null) { - try (final WritableRowSet deltaViewport = rowSet.subSetForPositions(activeViewport)) { + } else if (activeViewport != null || activeReverseViewport != null) { + // build the combined position-space viewport (from forward and reverse) + try (final WritableRowSet forwardDeltaViewport = + activeViewport == null ? null : rowSet.subSetForPositions(activeViewport); + final WritableRowSet reverseDeltaViewport = activeReverseViewport == null ? null + : rowSet.subSetForReversePositions(activeReverseViewport)) { + final RowSet deltaViewport; + if (forwardDeltaViewport != null) { + if (reverseDeltaViewport != null) { + forwardDeltaViewport.insert(reverseDeltaViewport); + } + deltaViewport = forwardDeltaViewport; + } else { + deltaViewport = reverseDeltaViewport; + } + addsToRecord = deltaViewport.intersect(upstream.added()); modsToRecord = deltaViewport.intersect(upstream.modified()); } @@ -704,7 +718,8 @@ private void enqueueUpdate(final TableUpdate upstream) { // Note: viewports are in position space, inserted and removed rows may cause the keyspace for a given viewport // to shift. Let's compute which rows are being scoped into view. If current RowSet is empty, we have nothing to // store. If prev RowSet is empty, all rows are new and are already in addsToRecord. - if (activeViewport != null && (upstream.added().isNonempty() || upstream.removed().isNonempty()) + if ((activeViewport != null || activeReverseViewport != null) + && (upstream.added().isNonempty() || upstream.removed().isNonempty()) && rowSet.isNonempty() && rowSet.sizePrev() > 0) { final RowSetBuilderRandom scopedViewBuilder = RowSetFactory.builderRandom(); @@ -715,23 +730,74 @@ private void enqueueUpdate(final TableUpdate upstream) { continue; } - final ShiftInversionHelper inverter = new ShiftInversionHelper(upstream.shifted()); + final ShiftInversionHelper inverter = + new ShiftInversionHelper(upstream.shifted(), sub.reverseViewport); sub.viewport.forAllRowKeyRanges((posStart, posEnd) -> { + final long localStart, localEnd; + + // handle reverse viewports + if (sub.reverseViewport) { + // compute positions to be relative to the final position of rowSet + final long lastRowPosition = rowSet.size() - 1; + + localStart = Math.max(lastRowPosition - posEnd, 0); + localEnd = lastRowPosition - posStart; + + if (localEnd < 0) { + // This range does not overlap with the available positions at all + return; + } + } else { + localStart = posStart; + localEnd = posEnd; + } + // Note: we already know that both rowSet and prevRowSet are non-empty. - final long currKeyStart = - inverter.mapToPrevKeyspace(rowSet.get(Math.min(posStart, rowSet.size() - 1)), false); - final long currKeyEnd = - inverter.mapToPrevKeyspace(rowSet.get(Math.min(posEnd, rowSet.size() - 1)), true); + final long currKeyStart, currKeyEnd; + if (sub.reverseViewport) { + // using the reverse ShiftHelper, must pass `key` in descending order + currKeyEnd = + inverter.mapToPrevKeyspace(rowSet.get(Math.min(localEnd, rowSet.size() - 1)), true); + currKeyStart = + inverter.mapToPrevKeyspace(rowSet.get(Math.min(localStart, rowSet.size() - 1)), + false); + } else { + // using the forward ShiftHelper, must pass `key` in ascending order + currKeyStart = + inverter.mapToPrevKeyspace(rowSet.get(Math.min(localStart, rowSet.size() - 1)), + false); + currKeyEnd = + inverter.mapToPrevKeyspace(rowSet.get(Math.min(localEnd, rowSet.size() - 1)), true); + } // if our current viewport includes no previous values this range may be empty if (currKeyEnd < currKeyStart) { return; } + final long prevStart; + final long prevEnd; + if (sub.reverseViewport) { + final long lastPrevRowPosition = prevRowSet.size() - 1; + + prevStart = Math.max(lastPrevRowPosition - posEnd, 0); + prevEnd = lastPrevRowPosition - posStart; // this can be left of the prev rowset (i.e. <0) + } else { + prevStart = localStart; + prevEnd = localEnd; // this can be right of the prev rowset (i.e. >= size()) + } + + // get the key that represents the start of the viewport in the prev rowset key space or + // prevRowSet.lastRowKey() + 1 if the start is past the end of prev rowset final long prevKeyStart = - posStart >= prevRowSet.size() ? prevRowSet.lastRowKey() + 1 : prevRowSet.get(posStart); - final long prevKeyEnd = prevRowSet.get(Math.min(posEnd, prevRowSet.size() - 1)); + prevStart >= prevRowSet.size() ? prevRowSet.lastRowKey() + 1 + : prevRowSet.get(prevStart); + + // get the key that represents the end of the viewport in the prev rowset key space or + // -1 if the end is before the beginning of prev rowset + final long prevKeyEnd = + prevEnd < 0 ? -1 : prevRowSet.get(Math.min(prevEnd, prevRowSet.size() - 1)); // Note: we already know that scoped rows must touch viewport boundaries if (currKeyStart < prevKeyStart) { @@ -795,14 +861,8 @@ private void enqueueUpdate(final TableUpdate upstream) { final BiConsumer recordRows = (keysToAdd, columnsToRecord) -> { try (final RowSequence.Iterator rsIt = keysToAdd.getRowSequenceIterator()) { while (rsIt.hasMore()) { - final RowSequence srcKeys = rsIt.getNextRowSequenceWithLength(DELTA_CHUNK_SIZE); // NB: This - // will - // never - // return - // more - // keys - // than - // deltaChunkSize + // NB: This will never return more keys than deltaChunkSize + final RowSequence srcKeys = rsIt.getNextRowSequenceWithLength(DELTA_CHUNK_SIZE); try (final RowSequence dstKeys = RowSequenceFactory.forRange(nextFreeDeltaKey, nextFreeDeltaKey + srcKeys.size() - 1)) { @@ -932,6 +992,8 @@ private void updateSubscriptionsSnapshotAndPropagate() { boolean firstSubscription = false; BitSet snapshotColumns = null; RowSetBuilderRandom snapshotRows = null; + RowSetBuilderRandom reverseSnapshotRows = null; + List updatedSubscriptions = null; // first, we take out any new subscriptions (under the lock) @@ -956,6 +1018,7 @@ private void updateSubscriptionsSnapshotAndPropagate() { needsSnapshot = true; snapshotColumns = new BitSet(); snapshotRows = RowSetFactory.builderRandom(); + reverseSnapshotRows = RowSetFactory.builderRandom(); } subscription.hasPendingUpdate = false; @@ -976,7 +1039,12 @@ private void updateSubscriptionsSnapshotAndPropagate() { subscription.snapshotViewport = subscription.pendingViewport; subscription.pendingViewport = null; if (!needsFullSnapshot) { - snapshotRows.addRowSet(subscription.snapshotViewport); + // track forward and reverse viewport rows separately + if (subscription.pendingReverseViewport) { + reverseSnapshotRows.addRowSet(subscription.snapshotViewport); + } else { + snapshotRows.addRowSet(subscription.snapshotViewport); + } } } @@ -988,11 +1056,15 @@ private void updateSubscriptionsSnapshotAndPropagate() { needsFullSnapshot = true; } } - } + + subscription.snapshotReverseViewport = subscription.pendingReverseViewport; + } // end updatedSubscriptions loop boolean haveViewport = false; postSnapshotColumns.clear(); + final RowSetBuilderRandom postSnapshotViewportBuilder = RowSetFactory.builderRandom(); + final RowSetBuilderRandom postSnapshotReverseViewportBuilder = RowSetFactory.builderRandom(); for (int i = 0; i < activeSubscriptions.size(); ++i) { final Subscription sub = activeSubscriptions.get(i); @@ -1009,13 +1081,20 @@ private void updateSubscriptionsSnapshotAndPropagate() { if (sub.isViewport()) { haveViewport = true; - postSnapshotViewportBuilder - .addRowSet(sub.snapshotViewport != null ? sub.snapshotViewport : sub.viewport); + // handle forward and reverse snapshots separately + if (sub.snapshotReverseViewport) { + postSnapshotReverseViewportBuilder + .addRowSet(sub.snapshotViewport != null ? sub.snapshotViewport : sub.viewport); + } else { + postSnapshotViewportBuilder + .addRowSet(sub.snapshotViewport != null ? sub.snapshotViewport : sub.viewport); + } } postSnapshotColumns.or(sub.snapshotColumns != null ? sub.snapshotColumns : sub.subscribedColumns); } postSnapshotViewport = haveViewport ? postSnapshotViewportBuilder.build() : null; + postSnapshotReverseViewport = haveViewport ? postSnapshotReverseViewportBuilder.build() : null; if (!needsSnapshot) { // i.e. We have only removed subscriptions; we can update this state immediately. @@ -1031,9 +1110,11 @@ private void updateSubscriptionsSnapshotAndPropagate() { // then we spend the effort to take a snapshot if (needsSnapshot) { - try (final RowSet snapshotRowSet = snapshotRows.build()) { + try (final RowSet snapshotRowSet = snapshotRows.build(); + final RowSet reverseSnapshotRowSet = reverseSnapshotRows.build()) { snapshot = - getSnapshot(updatedSubscriptions, snapshotColumns, needsFullSnapshot ? null : snapshotRowSet); + getSnapshot(updatedSubscriptions, snapshotColumns, needsFullSnapshot ? null : snapshotRowSet, + needsFullSnapshot ? null : reverseSnapshotRowSet); } } @@ -1087,7 +1168,6 @@ private void updateSubscriptionsSnapshotAndPropagate() { objectColumnsToClear.and(activeColumns); } - nextFreeDeltaKey = 0; for (final Delta delta : pendingDeltas) { delta.close(); @@ -1151,9 +1231,12 @@ private void propagateToSubscribers(final BarrageMessage message, final RowSet p : subscription.subscribedColumns; try (final RowSet clientView = - subscription.isViewport() ? propRowSetForMessage.subSetForPositions(vp) : null) { + subscription.isViewport() + ? propRowSetForMessage.subSetForPositions(vp, subscription.reverseViewport) + : null) { subscription.listener - .onNext(generator.getSubView(subscription.options, false, vp, clientView, cols)); + .onNext(generator.getSubView(subscription.options, false, vp, subscription.reverseViewport, + clientView, cols)); } catch (final Exception e) { try { subscription.listener.onError(GrpcUtil.securelyWrapError(log, e)); @@ -1208,7 +1291,9 @@ private void propagateSnapshotForSubscription(final Subscription subscription, final boolean isViewport = subscription.viewport != null; try (final RowSet keySpaceViewport = - isViewport ? snapshotGenerator.getMessage().rowsAdded.subSetForPositions(subscription.viewport) + isViewport + ? snapshotGenerator.getMessage().rowsAdded + .subSetForPositions(subscription.viewport, subscription.reverseViewport) : null) { if (subscription.pendingInitialSnapshot) { // Send schema metadata to this new client. @@ -1219,7 +1304,8 @@ private void propagateSnapshotForSubscription(final Subscription subscription, subscription.listener .onNext(snapshotGenerator.getSubView(subscription.options, subscription.pendingInitialSnapshot, - subscription.viewport, keySpaceViewport, subscription.subscribedColumns)); + subscription.viewport, subscription.reverseViewport, keySpaceViewport, + subscription.subscribedColumns)); } catch (final Exception e) { GrpcUtil.safelyExecute(() -> subscription.listener.onError(GrpcUtil.securelyWrapError(log, e))); removeSubscription(subscription.listener); @@ -1265,9 +1351,11 @@ private BarrageMessage aggregateUpdatesInRange(final int startDelta, final int e modColumnSet = firstDelta.modifiedColumns; downstream.rowsAdded = firstDelta.update.added().copy(); + downstream.rowsRemoved = firstDelta.update.removed().copy(); downstream.shifted = firstDelta.update.shifted(); downstream.rowsIncluded = firstDelta.recordedAdds.copy(); + downstream.addColumnData = new BarrageMessage.AddColumnData[sourceColumns.length]; downstream.modColumnData = new BarrageMessage.ModColumnData[sourceColumns.length]; @@ -1551,6 +1639,7 @@ private void flipSnapshotStateForSubscriptions(final List subscrip if (subscription.snapshotViewport != null) { final RowSet tmp = subscription.viewport; subscription.viewport = subscription.snapshotViewport; + subscription.reverseViewport = subscription.snapshotReverseViewport; subscription.snapshotViewport = tmp; } if (subscription.snapshotColumns != null) { @@ -1567,8 +1656,19 @@ private void promoteSnapshotToActive() { if (this.activeViewport != null) { this.activeViewport.close(); } - this.activeViewport = this.postSnapshotViewport; + if (this.activeReverseViewport != null) { + this.activeReverseViewport.close(); + } + + this.activeViewport = this.postSnapshotViewport == null || this.postSnapshotViewport.isEmpty() ? null + : this.postSnapshotViewport; + + this.activeReverseViewport = + this.postSnapshotReverseViewport == null || this.postSnapshotReverseViewport.isEmpty() ? null + : this.postSnapshotReverseViewport; + this.postSnapshotViewport = null; + this.postSnapshotReverseViewport = null; // Pre-condition: activeObjectColumns == objectColumns & activeColumns this.objectColumnsToClear.or(postSnapshotColumns); @@ -1648,7 +1748,8 @@ public boolean snapshotCompletedConsistently(final long afterClockValue, final b @VisibleForTesting BarrageMessage getSnapshot(final List snapshotSubscriptions, final BitSet columnsToSnapshot, - final RowSet positionsToSnapshot) { + final RowSet positionsToSnapshot, + final RowSet reversePositionsToSnapshot) { if (onGetSnapshot != null) { onGetSnapshot.run(); } @@ -1657,7 +1758,8 @@ BarrageMessage getSnapshot(final List snapshotSubscriptions, // TODO: Let notification-indifferent use cases skip notification test final SnapshotControl snapshotControl = new SnapshotControl(snapshotSubscriptions); return ConstructSnapshot.constructBackplaneSnapshotInPositionSpace( - this, parent, columnsToSnapshot, positionsToSnapshot, snapshotControl); + this, parent, columnsToSnapshot, positionsToSnapshot, reversePositionsToSnapshot, + snapshotControl); } //////////////////////////////////////////////////// diff --git a/server/src/main/java/io/deephaven/server/barrage/BarrageStreamGenerator.java b/server/src/main/java/io/deephaven/server/barrage/BarrageStreamGenerator.java index 61d6644da37..f01be8b3fc1 100644 --- a/server/src/main/java/io/deephaven/server/barrage/BarrageStreamGenerator.java +++ b/server/src/main/java/io/deephaven/server/barrage/BarrageStreamGenerator.java @@ -190,6 +190,7 @@ public void close() { * @param options serialization options for this specific view * @param isInitialSnapshot indicates whether or not this is the first snapshot for the listener * @param viewport is the position-space viewport + * @param reverseViewport is the viewport reversed (relative to end of table instead of beginning) * @param keyspaceViewport is the key-space viewport * @param subscribedColumns are the columns subscribed for this view * @return a MessageView filtered by the subscription properties that can be sent to that subscriber @@ -198,9 +199,11 @@ public void close() { public SubView getSubView(final BarrageSubscriptionOptions options, final boolean isInitialSnapshot, @Nullable final RowSet viewport, + final boolean reverseViewport, @Nullable final RowSet keyspaceViewport, @Nullable final BitSet subscribedColumns) { - return new SubView(this, options, isInitialSnapshot, viewport, keyspaceViewport, subscribedColumns); + return new SubView(this, options, isInitialSnapshot, viewport, reverseViewport, keyspaceViewport, + subscribedColumns); } /** @@ -212,7 +215,7 @@ public SubView getSubView(final BarrageSubscriptionOptions options, */ @Override public SubView getSubView(BarrageSubscriptionOptions options, boolean isInitialSnapshot) { - return getSubView(options, isInitialSnapshot, null, null, null); + return getSubView(options, isInitialSnapshot, null, false, null, null); } public static class SubView implements View { @@ -220,6 +223,7 @@ public static class SubView implements View { public final BarrageSubscriptionOptions options; public final boolean isInitialSnapshot; public final RowSet viewport; + public final boolean reverseViewport; public final RowSet keyspaceViewport; public final BitSet subscribedColumns; public final boolean hasAddBatch; @@ -229,12 +233,14 @@ public SubView(final BarrageStreamGenerator generator, final BarrageSubscriptionOptions options, final boolean isInitialSnapshot, @Nullable final RowSet viewport, + final boolean reverseViewport, @Nullable final RowSet keyspaceViewport, @Nullable final BitSet subscribedColumns) { this.generator = generator; this.options = options; this.isInitialSnapshot = isInitialSnapshot; this.viewport = viewport; + this.reverseViewport = reverseViewport; this.keyspaceViewport = keyspaceViewport; this.subscribedColumns = subscribedColumns; this.hasModBatch = generator.doesSubViewHaveMods(this); @@ -273,6 +279,7 @@ public final RowSet keyspaceViewport() { * * @param options serialization options for this specific view * @param viewport is the position-space viewport + * @param reverseViewport is the viewport reversed (relative to end of table instead of beginning) * @param keyspaceViewport is the key-space viewport * @param snapshotColumns are the columns subscribed for this view * @return a MessageView filtered by the snapshot properties that can be sent to that subscriber @@ -280,9 +287,10 @@ public final RowSet keyspaceViewport() { @Override public SnapshotView getSnapshotView(final BarrageSnapshotOptions options, @Nullable final RowSet viewport, + final boolean reverseViewport, @Nullable final RowSet keyspaceViewport, @Nullable final BitSet snapshotColumns) { - return new SnapshotView(this, options, viewport, keyspaceViewport, snapshotColumns); + return new SnapshotView(this, options, viewport, reverseViewport, keyspaceViewport, snapshotColumns); } /** @@ -293,13 +301,14 @@ public SnapshotView getSnapshotView(final BarrageSnapshotOptions options, */ @Override public SnapshotView getSnapshotView(BarrageSnapshotOptions options) { - return getSnapshotView(options, null, null, null); + return getSnapshotView(options, null, false, null, null); } public static class SnapshotView implements View { public final BarrageStreamGenerator generator; public final BarrageSnapshotOptions options; public final RowSet viewport; + public final boolean reverseViewport; public final RowSet keyspaceViewport; public final BitSet subscribedColumns; public final boolean hasAddBatch; @@ -308,11 +317,14 @@ public static class SnapshotView implements View { public SnapshotView(final BarrageStreamGenerator generator, final BarrageSnapshotOptions options, @Nullable final RowSet viewport, + final boolean reverseViewport, @Nullable final RowSet keyspaceViewport, @Nullable final BitSet subscribedColumns) { this.generator = generator; this.options = options; this.viewport = viewport; + this.reverseViewport = reverseViewport; + this.keyspaceViewport = keyspaceViewport; this.subscribedColumns = subscribedColumns; this.hasModBatch = false; @@ -501,6 +513,7 @@ private long appendAddColumns(final View view, final Consumer addStream, final ChunkInputStreamGenerator.FieldNodeListener fieldNodeListener, final ChunkInputStreamGenerator.BufferListener bufferListener) throws IOException { + // Added Chunk Data: final RowSet myAddedOffsets; if (view.isViewport()) { @@ -632,6 +645,7 @@ private ByteBuffer getSubscriptionMetadata(final SubView view) throws IOExceptio BarrageUpdateMetadata.addLastSeq(metadata, lastSeq); BarrageUpdateMetadata.addEffectiveViewport(metadata, effectiveViewportOffset); BarrageUpdateMetadata.addEffectiveColumnSet(metadata, effectiveColumnSetOffset); + BarrageUpdateMetadata.addEffectiveReverseViewport(metadata, view.reverseViewport); BarrageUpdateMetadata.addAddedRows(metadata, rowsAddedOffset); BarrageUpdateMetadata.addRemovedRows(metadata, rowsRemovedOffset); BarrageUpdateMetadata.addShiftData(metadata, shiftDataOffset); diff --git a/server/src/main/java/io/deephaven/server/uri/BarrageTableResolver.java b/server/src/main/java/io/deephaven/server/uri/BarrageTableResolver.java index 8041efc53a1..8b59ab94dd2 100644 --- a/server/src/main/java/io/deephaven/server/uri/BarrageTableResolver.java +++ b/server/src/main/java/io/deephaven/server/uri/BarrageTableResolver.java @@ -137,11 +137,29 @@ public Table subscribe(DeephavenTarget target, TableSpec table, BarrageSubscript * * @param targetUri the target URI * @param table the table spec + * @param viewport the position-space viewport to use for the subscription + * @param columns the columns to include in the subscription * @return the subscribed table */ public Table subscribe(String targetUri, TableSpec table, RowSet viewport, BitSet columns) throws TableHandleException, InterruptedException { - return subscribe(DeephavenTarget.of(URI.create(targetUri)), table, SUB_OPTIONS, viewport, columns); + return subscribe(DeephavenTarget.of(URI.create(targetUri)), table, SUB_OPTIONS, viewport, columns, false); + } + + /** + * Create a partial table subscription to the {@code table} via the {@code targetUri}. Uses {@link #SUB_OPTIONS}. + * + * @param targetUri the target URI + * @param table the table spec + * @param viewport the position-space viewport to use for the subscription + * @param columns the columns to include in the subscription + * @param reverseViewport Whether to treat {@code viewport} as offsets from {@link #size()} rather than {@code 0} + * @return the subscribed table + */ + public Table subscribe(String targetUri, TableSpec table, RowSet viewport, BitSet columns, boolean reverseViewport) + throws TableHandleException, InterruptedException { + return subscribe(DeephavenTarget.of(URI.create(targetUri)), table, SUB_OPTIONS, viewport, columns, + reverseViewport); } /** @@ -150,14 +168,17 @@ public Table subscribe(String targetUri, TableSpec table, RowSet viewport, BitSe * @param target the target * @param table the table * @param options the options + * @param viewport the position-space viewport to use for the subscription + * @param columns the columns to include in the subscription + * @param reverseViewport Whether to treat {@code viewport} as offsets from {@link #size()} rather than {@code 0} * @return the subscribed table */ public Table subscribe(DeephavenTarget target, TableSpec table, BarrageSubscriptionOptions options, RowSet viewport, - BitSet columns) + BitSet columns, boolean reverseViewport) throws TableHandleException, InterruptedException { final BarrageSession session = session(target); final BarrageSubscription sub = session.subscribe(table, options); - return sub.partialTable(viewport, columns); + return sub.partialTable(viewport, columns, reverseViewport); } /** @@ -203,11 +224,29 @@ public Table snapshot(DeephavenTarget target, TableSpec table, BarrageSnapshotOp * * @param targetUri the target URI * @param table the table spec + * @param viewport the position-space viewport to use for the snapshot + * @param columns the columns to include in the snapshot * @return the table to snapshot */ public Table snapshot(String targetUri, TableSpec table, RowSet viewport, BitSet columns) throws TableHandleException, InterruptedException { - return snapshot(DeephavenTarget.of(URI.create(targetUri)), table, SNAP_OPTIONS, viewport, columns); + return snapshot(DeephavenTarget.of(URI.create(targetUri)), table, SNAP_OPTIONS, viewport, columns, false); + } + + /** + * Create a partial table snapshot to the {@code table} via the {@code targetUri}. Uses {@link #SNAP_OPTIONS}. + * + * @param targetUri the target URI + * @param table the table spec + * @param viewport the position-space viewport to use for the snapshot + * @param columns the columns to include in the snapshot + * @param reverseViewport Whether to treat {@code viewport} as offsets from {@link #size()} rather than {@code 0} + * @return the table to snapshot + */ + public Table snapshot(String targetUri, TableSpec table, RowSet viewport, BitSet columns, boolean reverseViewport) + throws TableHandleException, InterruptedException { + return snapshot(DeephavenTarget.of(URI.create(targetUri)), table, SNAP_OPTIONS, viewport, columns, + reverseViewport); } /** @@ -216,14 +255,17 @@ public Table snapshot(String targetUri, TableSpec table, RowSet viewport, BitSet * @param target the target * @param table the table * @param options the options + * @param viewport the position-space viewport to use for the snapshot + * @param columns the columns to include in the snapshot + * @param reverseViewport Whether to treat {@code viewport} as offsets from {@link #size()} rather than {@code 0} * @return the table to snapshot */ public Table snapshot(DeephavenTarget target, TableSpec table, BarrageSnapshotOptions options, RowSet viewport, - BitSet columns) + BitSet columns, boolean reverseViewport) throws TableHandleException, InterruptedException { final BarrageSession session = session(target); final BarrageSnapshot snap = session.snapshot(table, options); - return snap.partialTable(viewport, columns); + return snap.partialTable(viewport, columns, reverseViewport); } diff --git a/server/src/test/java/io/deephaven/server/barrage/BarrageMessageRoundTripTest.java b/server/src/test/java/io/deephaven/server/barrage/BarrageMessageRoundTripTest.java index f1b424702dc..fdb58246edb 100644 --- a/server/src/test/java/io/deephaven/server/barrage/BarrageMessageRoundTripTest.java +++ b/server/src/test/java/io/deephaven/server/barrage/BarrageMessageRoundTripTest.java @@ -41,6 +41,7 @@ import io.deephaven.server.arrow.ArrowModule; import io.deephaven.server.util.Scheduler; import io.deephaven.server.util.TestControlledScheduler; +import io.deephaven.tablelogger.Row; import io.deephaven.test.types.OutOfBandTest; import io.deephaven.time.DateTimeUtils; import io.deephaven.util.annotations.ReferentialIntegrity; @@ -153,6 +154,8 @@ public void onFailureInternal(Throwable originalException, Entry sourceEntry) { private class RemoteClient { private RowSet viewport; + private boolean reverseViewport; + private BitSet subscribedColumns; private final String name; @@ -175,13 +178,15 @@ private class RemoteClient { RemoteClient(final RowSet viewport, final BitSet subscribedColumns, final BarrageMessageProducer barrageMessageProducer, final String name) { - this(viewport, subscribedColumns, barrageMessageProducer, name, false); + // assume a forward viewport when not specified + this(viewport, subscribedColumns, barrageMessageProducer, name, false, false); } RemoteClient(final RowSet viewport, final BitSet subscribedColumns, final BarrageMessageProducer barrageMessageProducer, - final String name, final boolean deferSubscription) { + final String name, final boolean reverseViewport, final boolean deferSubscription) { this.viewport = viewport; + this.reverseViewport = reverseViewport; this.subscribedColumns = subscribedColumns; this.name = name; this.barrageMessageProducer = barrageMessageProducer; @@ -220,7 +225,7 @@ public void doSubscribe() { .useDeephavenNulls(useDeephavenNulls) .build(); barrageMessageProducer.addSubscription(dummyObserver, options, subscribedColumns, - viewport == null ? null : viewport.copy()); + viewport == null ? null : viewport.copy(), reverseViewport); } public void validate(final String msg, QueryTable expected) { @@ -232,8 +237,10 @@ public void validate(final String msg, QueryTable expected) { QueryTable toCheck = barrageTable; if (viewport != null) { - expected = expected.getSubTable(expected.getRowSet().subSetForPositions(viewport).toTracking()); - toCheck = toCheck.getSubTable(toCheck.getRowSet().subSetForPositions(viewport).toTracking()); + expected = expected + .getSubTable(expected.getRowSet().subSetForPositions(viewport, reverseViewport).toTracking()); + toCheck = toCheck + .getSubTable(toCheck.getRowSet().subSetForPositions(viewport, reverseViewport).toTracking()); } if (subscribedColumns.cardinality() != expected.getColumns().length) { final List columns = new ArrayList<>(); @@ -301,8 +308,15 @@ public void flushEventsToReplicatedTable() { } public void setViewport(final RowSet newViewport) { + // assume a forward viewport when not specified + setViewport(newViewport, false); + } + + public void setViewport(final RowSet newViewport, final boolean newReverseViewport) { viewport = newViewport; - barrageMessageProducer.updateViewport(dummyObserver, viewport); + reverseViewport = newReverseViewport; + + barrageMessageProducer.updateViewport(dummyObserver, viewport, reverseViewport); } public void setSubscribedColumns(final BitSet newColumns) { @@ -311,7 +325,14 @@ public void setSubscribedColumns(final BitSet newColumns) { } public void setViewportAndColumns(final RowSet newViewport, final BitSet newColumns) { + // assume a forward viewport when not specified + setViewportAndColumns(newViewport, newColumns, false); + } + + public void setViewportAndColumns(final RowSet newViewport, final BitSet newColumns, + final boolean newReverseViewport) { viewport = newViewport; + reverseViewport = newReverseViewport; subscribedColumns = newColumns; barrageMessageProducer.updateViewportAndColumns(dummyObserver, viewport, subscribedColumns); } @@ -366,7 +387,14 @@ public void flushClientEvents() { } public RemoteClient newClient(final RowSet viewport, final BitSet subscribedColumns, final String name) { - clients.add(new RemoteClient(viewport, subscribedColumns, barrageMessageProducer, name)); + // assume a forward viewport when not specified + return newClient(viewport, subscribedColumns, false, name); + } + + public RemoteClient newClient(final RowSet viewport, final BitSet subscribedColumns, + final boolean reverseViewport, final String name) { + clients.add(new RemoteClient(viewport, subscribedColumns, barrageMessageProducer, name, reverseViewport, + false)); return clients.get(clients.size() - 1); } @@ -472,13 +500,28 @@ void createNuggetsForTableMaker(final Supplier makeTable) { RowSetFactory.fromRange(size / 2, size * 3 / 4), subscribedColumns, "floating"); + nuggets.add(new RemoteNugget(makeTable)); + nuggets.get(nuggets.size() - 1).newClient( + RowSetFactory.fromRange(0, size / 10), + subscribedColumns, true, "footer"); + nuggets.add(new RemoteNugget(makeTable)); + nuggets.get(nuggets.size() - 1).newClient( + RowSetFactory.fromRange(size / 2, size * 3L / 4), + subscribedColumns, true, "reverse floating"); + final RowSetBuilderSequential swissIndexBuilder = RowSetFactory.builderSequential(); final long rangeSize = Math.max(1, size / 20); for (long nr = 1; nr < 20; nr += 2) { swissIndexBuilder.appendRange(nr * rangeSize, (nr + 1) * rangeSize - 1); } + final RowSet rs = swissIndexBuilder.build(); + + nuggets.add(new RemoteNugget(makeTable)); + nuggets.get(nuggets.size() - 1).newClient(rs, subscribedColumns, "swiss"); + + final RemoteNugget nugget = new RemoteNugget(makeTable); - nugget.newClient(swissIndexBuilder.build(), subscribedColumns, "swiss viewport"); + nugget.newClient(rs.copy(), subscribedColumns, true, "reverse swiss"); nuggets.add(nugget); } } @@ -495,18 +538,27 @@ void createNuggetsForTableMaker(final Supplier
makeTable) { final BitSet subscribedColumns = new BitSet(); subscribedColumns.set(0, nugget.originalTable.getColumns().length); + nugget.newClient(null, subscribedColumns, "full"); nugget.newClient(RowSetFactory.fromRange(0, size / 10), subscribedColumns, "header"); nugget.newClient(RowSetFactory.fromRange(size / 2, size * 3L / 4), subscribedColumns, "floating"); + nugget.newClient(RowSetFactory.fromRange(0, size / 10), subscribedColumns, true, "footer"); + nugget.newClient(RowSetFactory.fromRange(size / 2, size * 3L / 4), subscribedColumns, true, + "reverse floating"); + final RowSetBuilderSequential swissIndexBuilder = RowSetFactory.builderSequential(); final long rangeSize = Math.max(1, size / 20); for (long nr = 1; nr < 20; nr += 2) { swissIndexBuilder.appendRange(nr * rangeSize, (nr + 1) * rangeSize - 1); } - nugget.newClient(swissIndexBuilder.build(), subscribedColumns, "swiss viewport"); + + final RowSet rs = swissIndexBuilder.build(); + nugget.newClient(rs, subscribedColumns, "swiss"); + + nugget.newClient(rs.copy(), subscribedColumns, true, "reverse swiss"); } } @@ -800,7 +852,56 @@ void maybeChangeSub(final int step, final int rt, final int pt) { final RemoteClient client = nugget.clients.get(nugget.clients.size() - 1); final WritableRowSet viewport = client.viewport.copy(); viewport.shiftInPlace(Math.max(size / 25, 1)); - client.setViewport(viewport); + + // maintain viewport direction in this test + client.setViewport(viewport, client.reverseViewport); + } + } + }.runTest(); + } + } + } + } + } + } + + public void testViewportDirectionChange() { + for (final int size : new int[] {10, 100}) { + for (final int numProducerCoalesce : new int[] {1, 4}) { + for (final int numConsumerCoalesce : new int[] {1, 4}) { + for (int subProducerCoalesce = + 0; subProducerCoalesce < numProducerCoalesce; ++subProducerCoalesce) { + for (int subConsumerCoalesce = + 0; subConsumerCoalesce < numConsumerCoalesce; ++subConsumerCoalesce) { + final int finalSubProducerCoalesce = 0; + final int finalSubConsumerCoalesce = 1; + new SubscriptionChangingHelper(numProducerCoalesce, numConsumerCoalesce, size, 0, + new MutableInt(25)) { + @Override + void createNuggetsForTableMaker(final Supplier
makeTable) { + final RemoteNugget nugget = new RemoteNugget(makeTable); + nuggets.add(nugget); + + final BitSet columns = new BitSet(); + columns.set(0, 4); + nugget.clients.add( + new RemoteClient(RowSetFactory.fromRange(0, size / 5), + columns, nugget.barrageMessageProducer, "sub-changer")); + } + + void maybeChangeSub(final int step, final int rt, final int pt) { + if (step % 2 != 0 || rt != finalSubConsumerCoalesce + || pt != finalSubProducerCoalesce) { + return; + } + + for (final RemoteNugget nugget : nuggets) { + final RemoteClient client = nugget.clients.get(nugget.clients.size() - 1); + final WritableRowSet viewport = client.viewport.copy(); + viewport.shiftInPlace(Math.max(size / 25, 1)); + + // alternate viewport direction with every call to this function + client.setViewport(viewport, !client.reverseViewport); } } }.runTest(); @@ -896,7 +997,7 @@ public void onGetSnapshot() { final boolean deferSubscription = true; nugget.clients.add(new RemoteClient( RowSetFactory.fromRange(size / 5, 2 * size / 5), - columns, nugget.barrageMessageProducer, "sub-changer", deferSubscription)); + columns, nugget.barrageMessageProducer, "sub-changer", false, deferSubscription)); } }.runTest(); @@ -926,8 +1027,7 @@ public void createNuggets() { columns.set(0, 4); nugget.clients.add( new RemoteClient( - RowSetFactory.fromRange(size / 5, - 3 * size / 5), + RowSetFactory.fromRange(size / 5, 3 * size / 5), columns, nugget.barrageMessageProducer, "sub-changer")); } } @@ -941,7 +1041,9 @@ void maybeChangeSub(final int step, final int rt, final int pt) { final RemoteClient client = nugget.clients.get(nugget.clients.size() - 1); final WritableRowSet viewport = client.viewport.copy(); viewport.shiftInPlace(size / 5); - client.setViewport(viewport); + + // maintain viewport direction in this test + client.setViewport(viewport, client.reverseViewport); } } }.runTest(); @@ -978,7 +1080,7 @@ void maybeChangeSub(final int step, final int rt, final int pt) { final RemoteClient client = nugget.clients.get(nugget.clients.size() - 1); final int firstKey = random.nextInt(size); client.setViewport(RowSetFactory.fromRange(firstKey, - firstKey + random.nextInt(size - firstKey))); + firstKey + random.nextInt(size - firstKey)), client.reverseViewport); } } }.runTest();