Skip to content

Commit

Permalink
Implement reverse viewport logic on BarrageSnapshotRequest and `Bar…
Browse files Browse the repository at this point in the history
…rageSubscriptionRequest` (#1968)

* added reverseViewport to test utilities, confirm existing tests pass

* added capability to `BarrageMessageProducer`
  • Loading branch information
lbooker42 authored Feb 28, 2022
1 parent 81a9758 commit 05c79ad
Show file tree
Hide file tree
Showing 20 changed files with 730 additions and 139 deletions.
24 changes: 21 additions & 3 deletions engine/rowset/src/main/java/io/deephaven/engine/rowset/RowSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -428,12 +428,30 @@ public long binarySearchValue(TargetComparator targetComparator, int direction)
WritableRowSet subSetByKeyRange(long startKey, long endKey);

/**
* Get a subset of this RowSet according to the supplied set of row positions in {@code posRowSet}.
* Get a subset of this RowSet according to the supplied sequence of row positions in {@code posRowSequence}.
*
* @param posRowSet The RowSet of position-based ranges to extract.
* @param posRowSequence The {@link RowSequence} of positions ranges to get (as in {@link #get(long)})
* @param reversed Whether to treat {@code posRowSet} as offsets relative to {@link #size()} rather than {@code 0}
* @return A new RowSet, containing the row keys from this RowSet at the row positions in {@code posRowSequence}
*/
WritableRowSet subSetForPositions(RowSequence posRowSequence, boolean reversed);

/**
* Get a subset of this RowSet according to the supplied sequence of row positions in {@code posRowSequence}.
*
* @param posRowSequence The {@link RowSequence} of position-based ranges to extract.
* @return A new RowSet, containing values at the locations in the provided RowSet.
*/
WritableRowSet subSetForPositions(RowSet posRowSet);
WritableRowSet subSetForPositions(RowSequence posRowSequence);

/**
* Get a subset of this RowSet according to the supplied sequence of row positions relative to {@link #size()} in
* {@code posRowSequence}.
*
* @param posRowSequence The {@link RowSequence} of positions ranges to get (as in {@link #get(long)})
* @return A new RowSet, containing the row keys from this RowSet at the row positions in {@code posRowSequence}
*/
WritableRowSet subSetForReversePositions(RowSequence posRowSequence);

/**
* Returns the row key at the given row position.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Objects;
import java.util.PrimitiveIterator;
import java.util.*;
import java.util.function.LongConsumer;

public class WritableRowSetImpl extends RowSequenceAsChunkImpl implements WritableRowSet, Externalizable {
Expand Down Expand Up @@ -335,21 +334,30 @@ public final WritableRowSet subSetByKeyRange(final long startKey, final long end
}

@Override
public final WritableRowSet subSetForPositions(RowSet posRowSet) {
public final WritableRowSet subSetForPositions(RowSequence posRowSequence, boolean reversed) {
if (reversed) {
return subSetForReversePositions(posRowSequence);
}
return subSetForPositions(posRowSequence);
}

@Override
public final WritableRowSet subSetForPositions(RowSequence positions) {
if (positions.isEmpty()) {
return RowSetFactory.empty();
}
final MutableLong currentOffset = new MutableLong();
final RowSequence.Iterator iter = getRowSequenceIterator();
final RowSetBuilderSequential builder = RowSetFactory.builderSequential();
posRowSet.forEachRowKeyRange((start, end) -> {
positions.forEachRowKeyRange((start, end) -> {
if (currentOffset.longValue() < start) {
// skip items until the beginning of this range
iter.getNextRowSequenceWithLength(start - currentOffset.longValue());
currentOffset.setValue(start);
}

if (!iter.hasMore()) {
return false;
}

iter.getNextRowSequenceWithLength(end + 1 - currentOffset.longValue())
.forAllRowKeyRanges(builder::appendRange);
currentOffset.setValue(end + 1);
Expand All @@ -358,6 +366,48 @@ public final WritableRowSet subSetForPositions(RowSet posRowSet) {
return builder.build();
}

@Override
public final WritableRowSet subSetForReversePositions(RowSequence positions) {
if (positions.isEmpty()) {
return RowSetFactory.empty();
}

final long lastRowPosition = size() - 1;
if (positions.size() == positions.lastRowKey() - positions.firstRowKey() + 1) {
// We have a single range in the input sequence
final long forwardEnd = lastRowPosition - positions.firstRowKey();
if (forwardEnd < 0) {
// The single range does not overlap with the available positions at all
return RowSetFactory.empty();
}
// Clamp the single range end to 0
final long forwardStart = Math.max(lastRowPosition - positions.lastRowKey(), 0);
try (final RowSequence forwardPositions = RowSequenceFactory.forRange(forwardStart, forwardEnd)) {
return subSetForPositions(forwardPositions);
}
}

// We have some non-trivial input sequence
final RowSetBuilderRandom builder = RowSetFactory.builderRandom();
positions.forEachRowKeyRange((start, end) -> {
final long forwardEnd = lastRowPosition - start;
if (forwardEnd < 0) {
// This range does not overlap with the available positions at all, and thus neither can subsequent
// ranges that are offset further from the lastRowPosition.
return false;
}
// Clamp the range end to 0
final long forwardStart = Math.max(lastRowPosition - end, 0);
builder.addRange(forwardStart, forwardEnd);

// Continue iff subsequent ranges may overlap the available positions
return forwardStart != 0;
});
try (final RowSequence forwardPositions = builder.build()) {
return subSetForPositions(forwardPositions);
}
}

@Override
public final long get(final long rowPosition) {
return innerSet.ixGet(rowPosition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.deephaven.engine.rowset.*;
import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeyRanges;
import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeys;
import io.deephaven.engine.rowset.impl.rsp.container.MutableInteger;
import io.deephaven.util.datastructures.LongRangeIterator;
import io.deephaven.chunk.LongChunk;
import io.deephaven.chunk.WritableLongChunk;
Expand Down Expand Up @@ -3250,4 +3251,122 @@ public void testRemoveSrFromRspFullBlockRegression2() {
final OrderedLongSet result = rb.ixRemove(sr);
assertEquals(card - sr.getCardinality(), result.ixCardinality());
}

public void testSubSetForPositions() {
final RowSetBuilderSequential b = RowSetFactory.builderSequential();
final long[] vs = new long[] {3, 4, 5, 8, 10, 12, 29, 31, 44, 45, 46, 59, 60, 61, 72, 65537, 65539, 65536 * 3,
65536 * 3 + 5};
for (long v : vs) {
b.appendKey(v);
}
final RowSet ix = b.build();
final long sz = ix.size();

// test the empty ranges
try (final RowSet empty = RowSetFactory.empty()) {
assertEquals("empty range, fwd", ix.subSetForPositions(empty, false).size(), 0);
assertEquals("empty range, rev", ix.subSetForPositions(empty, true).size(), 0);
}

final int EXTRA_RANGE = 10;

// single range, deliberately allow some to be outside the upper boundary
for (int start = 0; start < vs.length; start++) {
for (int end = start; end <= vs.length + EXTRA_RANGE; end++) {

int coercedEnd = Math.min(end, vs.length - 1);
int expectedSize = coercedEnd - start + 1; // rowset ranges are inclusive

String m = "single range, s=" + start + ", end=" + end;
try (final RowSequence rs = RowSetFactory.fromRange(start, end)) {
try (final RowSet ret = ix.subSetForPositions(rs, false)) {
// verify the size is correct
assertEquals(m + ", fwd: size check", expectedSize, ret.size());
for (int i = 0; i < ret.size(); i++) {
int idx = start + i;
assertEquals(m + ", fwd: i=" + i, vs[idx], ret.get(i));
}
}
// now test the reversed functionality
try (final RowSet ret = ix.subSetForPositions(rs, true)) {
// verify the size is correct
assertEquals(m + ", rev: size check", expectedSize, ret.size());
int lastPos = vs.length - 1;
for (int i = 0; i < ret.size(); i++) {
int idx = lastPos - coercedEnd + i;
assertEquals(m + ", rev: i=" + i, vs[idx], ret.get(i));
}
}
}
}
}

// complex ranges, deliberately allow some to be outside the upper boundary
for (int rangeSize = 1; rangeSize < vs.length + EXTRA_RANGE + 1 / 2; rangeSize++) {
String m = "complex range, rangeSize=" + rangeSize;

final RowSetBuilderSequential rb = RowSetFactory.builderSequential();

int expectedSize = 0;

// create ranges of these sizes, skipping 1 value each time
long idx = 0;
while (idx < vs.length + EXTRA_RANGE) {
long s = idx;
long e = idx + rangeSize - 1;

// compute the expected size
if (s < vs.length) {
expectedSize += Math.min(e, vs.length - 1) - s + 1;
}

rb.appendRange(idx, e); // range is inclusive
idx += rangeSize + 1; // skip a value
}

try (final RowSequence rs = rb.build()) {
try (final RowSet ret = ix.subSetForPositions(rs, false)) {
// verify the size is correct
assertEquals(m + ", fwd: size check", expectedSize, ret.size());

// keep track of the index of the working set
final MutableInteger retIndex = new MutableInteger(0);

rs.forEachRowKeyRange((final long start, final long end) -> {
for (long i = start; i <= end; i++) {
int index = (int) i;
if (index >= 0 && index < vs.length) {
assertEquals(m + ", fwd: i=" + index, vs[index], ret.get(retIndex.value));
retIndex.value++;
}
}
return true;
});
}

// now test the reversed functionality
try (final RowSet ret = ix.subSetForPositions(rs, true)) {
// verify the size is correct
assertEquals(m + ", rev: size check", expectedSize, ret.size());

// keep track of the index of the working set
final MutableInteger retIndex = new MutableInteger(0);
final int lastPos = vs.length - 1;

rs.forEachRowKeyRange((final long start, final long end) -> {
// translate into reversed positions
for (long i = start; i <= end; i++) {
int index = lastPos - (int) i;
if (index >= 0 && index < vs.length) {
assertEquals(m + ", fwd: i=" + index, vs[index],
ret.get(ret.size() - retIndex.value - 1));
retIndex.value++;
}
}
return true;
});
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@
import io.deephaven.base.verify.Assert;
import io.deephaven.configuration.Configuration;
import io.deephaven.datastructures.util.CollectionUtil;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.rowset.*;
import io.deephaven.engine.table.SharedContext;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.updategraph.UpdateGraphProcessor;
import io.deephaven.engine.table.impl.ShiftObliviousInstrumentedListener;
import io.deephaven.engine.table.impl.sources.ReinterpretUtils;
Expand Down Expand Up @@ -522,7 +520,7 @@ static InitialSnapshot constructInitialSnapshotInPositionSpace(final Object logI
*/
public static BarrageMessage constructBackplaneSnapshot(final Object logIdentityObject,
final BaseTable table) {
return constructBackplaneSnapshotInPositionSpace(logIdentityObject, table, null, null);
return constructBackplaneSnapshotInPositionSpace(logIdentityObject, table, null, null, null);
}

/**
Expand All @@ -539,9 +537,10 @@ public static BarrageMessage constructBackplaneSnapshot(final Object logIdentity
public static BarrageMessage constructBackplaneSnapshotInPositionSpace(final Object logIdentityObject,
final BaseTable table,
@Nullable final BitSet columnsToSerialize,
@Nullable final RowSet positionsToSnapshot) {
@Nullable final RowSet positionsToSnapshot,
@Nullable final RowSet reversePositionsToSnapshot) {
return constructBackplaneSnapshotInPositionSpace(logIdentityObject, table, columnsToSerialize,
positionsToSnapshot, makeSnapshotControl(false, table));
positionsToSnapshot, reversePositionsToSnapshot, makeSnapshotControl(false, table));
}

/**
Expand All @@ -559,6 +558,7 @@ public static BarrageMessage constructBackplaneSnapshotInPositionSpace(final Obj
@NotNull final BaseTable table,
@Nullable final BitSet columnsToSerialize,
@Nullable final RowSet positionsToSnapshot,
@Nullable final RowSet reversePositionsToSnapshot,
@NotNull final SnapshotControl control) {

final BarrageMessage snapshot = new BarrageMessage();
Expand All @@ -567,16 +567,30 @@ public static BarrageMessage constructBackplaneSnapshotInPositionSpace(final Obj

final SnapshotFunction doSnapshot = (usePrev, beforeClockValue) -> {
final RowSet keysToSnapshot;
if (positionsToSnapshot == null) {
if (positionsToSnapshot == null && reversePositionsToSnapshot == null) {
keysToSnapshot = null;
} else if (usePrev) {
try (final RowSet prevRowSet = table.getRowSet().copyPrev()) {
keysToSnapshot = prevRowSet.subSetForPositions(positionsToSnapshot);
}
} else {
keysToSnapshot = table.getRowSet().subSetForPositions(positionsToSnapshot);
final RowSet rowSetToUse = usePrev ? table.getRowSet().copyPrev() : table.getRowSet();
try (final SafeCloseable ignored = usePrev ? rowSetToUse : null) {
final WritableRowSet forwardKeys =
positionsToSnapshot == null ? null : rowSetToUse.subSetForPositions(positionsToSnapshot);
final RowSet reverseKeys = reversePositionsToSnapshot == null ? null
: rowSetToUse.subSetForReversePositions(reversePositionsToSnapshot);
if (forwardKeys != null) {
if (reverseKeys != null) {
forwardKeys.insert(reverseKeys);
reverseKeys.close();
}
keysToSnapshot = forwardKeys;
} else {
keysToSnapshot = reverseKeys;
}
}
}
try (final RowSet ignored = keysToSnapshot) {
return serializeAllTable(usePrev, snapshot, table, logIdentityObject, columnsToSerialize,
keysToSnapshot);
}
return serializeAllTable(usePrev, snapshot, table, logIdentityObject, columnsToSerialize, keysToSnapshot);
};

snapshot.step = callDataSnapshotFunction(System.identityHashCode(logIdentityObject), control, doSnapshot);
Expand Down Expand Up @@ -1273,7 +1287,6 @@ public static boolean serializeAllTable(boolean usePrev,
* @param logIdentityObject an object for use with log() messages
* @param columnsToSerialize A {@link BitSet} of columns to include, null for all
* @param keysToSnapshot A RowSet of keys within the table to include, null for all
*
* @return true if the snapshot was computed with an unchanged clock, false otherwise.
*/
public static boolean serializeAllTable(final boolean usePrev,
Expand All @@ -1282,6 +1295,7 @@ public static boolean serializeAllTable(final boolean usePrev,
final Object logIdentityObject,
final BitSet columnsToSerialize,
final RowSet keysToSnapshot) {

snapshot.rowsAdded = (usePrev ? table.getRowSet().copyPrev() : table.getRowSet()).copy();
snapshot.rowsRemoved = RowSetFactory.empty();
snapshot.addColumnData = new BarrageMessage.AddColumnData[table.getColumnSources().size()];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public static class AddColumnData {

public boolean isSnapshot;
public RowSet snapshotRowSet;
public boolean snapshotRowSetIsReversed;
public BitSet snapshotColumns;

public RowSet rowsAdded;
Expand Down
Loading

0 comments on commit 05c79ad

Please sign in to comment.