Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement reverse viewport logic on BarrageSnapshotRequest and BarrageSubscriptionRequest #1968

Merged
merged 24 commits into from
Feb 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
c0bdf60
work in progress
lbooker42 Feb 9, 2022
5c68ccf
builds, untested but shows potential impl
lbooker42 Feb 10, 2022
b580a0f
WIP commit
lbooker42 Feb 14, 2022
372f61a
tested worker to worker for snapshot & subscription
lbooker42 Feb 14, 2022
8ac9648
Merge branch 'deephaven:main' into lab-reverse-viewport
lbooker42 Feb 18, 2022
7829fdc
addressing review comments
lbooker42 Feb 18, 2022
791f1fc
spotless checks
lbooker42 Feb 18, 2022
96f4993
added reverseViewport to test utilities, existing tests pass
lbooker42 Feb 18, 2022
cfe2306
most tests passing (not swiss reverse)
lbooker42 Feb 22, 2022
bcc1802
added reverse viewport to unit tests, all passing
lbooker42 Feb 23, 2022
a1b3ea7
spotless checks completed
lbooker42 Feb 23, 2022
c7174e6
passing all existing tests
lbooker42 Feb 23, 2022
1127817
small change
lbooker42 Feb 23, 2022
5ae4553
new tests for , corrections to
lbooker42 Feb 23, 2022
f06eaf4
forward & reverse viewport transition tests added
lbooker42 Feb 23, 2022
006d6f9
addressing PR comments
lbooker42 Feb 23, 2022
cd80cb0
all tests pass
lbooker42 Feb 24, 2022
4728f48
more informative comments on scoping
lbooker42 Feb 24, 2022
a8c3fd3
spotless cleanup
lbooker42 Feb 24, 2022
cef5ef0
formatting changes
lbooker42 Feb 24, 2022
aa545c5
Merge branch 'deephaven:main' into lab-reverse-viewport
lbooker42 Feb 24, 2022
e98f56e
Merge branch 'deephaven:main' into lab-reverse-viewport
lbooker42 Feb 25, 2022
e74a3eb
Merge branch 'deephaven:main' into lab-reverse-viewport
lbooker42 Feb 28, 2022
516f53e
addressed PR comments
lbooker42 Feb 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions engine/rowset/src/main/java/io/deephaven/engine/rowset/RowSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -428,12 +428,30 @@ public long binarySearchValue(TargetComparator targetComparator, int direction)
WritableRowSet subSetByKeyRange(long startKey, long endKey);

/**
* Get a subset of this RowSet according to the supplied set of row positions in {@code posRowSet}.
* Get a subset of this RowSet according to the supplied sequence of row positions in {@code posRowSequence}.
*
* @param posRowSet The RowSet of position-based ranges to extract.
* @param posRowSequence The {@link RowSequence} of positions ranges to get (as in {@link #get(long)})
* @param reversed Whether to treat {@code posRowSet} as offsets relative to {@link #size()} rather than {@code 0}
* @return A new RowSet, containing the row keys from this RowSet at the row positions in {@code posRowSequence}
*/
WritableRowSet subSetForPositions(RowSequence posRowSequence, boolean reversed);

/**
* Get a subset of this RowSet according to the supplied sequence of row positions in {@code posRowSequence}.
*
* @param posRowSequence The {@link RowSequence} of position-based ranges to extract.
* @return A new RowSet, containing values at the locations in the provided RowSet.
*/
WritableRowSet subSetForPositions(RowSet posRowSet);
WritableRowSet subSetForPositions(RowSequence posRowSequence);

/**
* Get a subset of this RowSet according to the supplied sequence of row positions relative to {@link #size()} in
* {@code posRowSequence}.
*
* @param posRowSequence The {@link RowSequence} of positions ranges to get (as in {@link #get(long)})
* @return A new RowSet, containing the row keys from this RowSet at the row positions in {@code posRowSequence}
*/
WritableRowSet subSetForReversePositions(RowSequence posRowSequence);

/**
* Returns the row key at the given row position.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Objects;
import java.util.PrimitiveIterator;
import java.util.*;
import java.util.function.LongConsumer;

public class WritableRowSetImpl extends RowSequenceAsChunkImpl implements WritableRowSet, Externalizable {
Expand Down Expand Up @@ -335,21 +334,30 @@ public final WritableRowSet subSetByKeyRange(final long startKey, final long end
}

@Override
public final WritableRowSet subSetForPositions(RowSet posRowSet) {
public final WritableRowSet subSetForPositions(RowSequence posRowSequence, boolean reversed) {
if (reversed) {
return subSetForReversePositions(posRowSequence);
}
return subSetForPositions(posRowSequence);
}

@Override
public final WritableRowSet subSetForPositions(RowSequence positions) {
if (positions.isEmpty()) {
return RowSetFactory.empty();
}
final MutableLong currentOffset = new MutableLong();
final RowSequence.Iterator iter = getRowSequenceIterator();
final RowSetBuilderSequential builder = RowSetFactory.builderSequential();
posRowSet.forEachRowKeyRange((start, end) -> {
positions.forEachRowKeyRange((start, end) -> {
if (currentOffset.longValue() < start) {
// skip items until the beginning of this range
iter.getNextRowSequenceWithLength(start - currentOffset.longValue());
currentOffset.setValue(start);
}

if (!iter.hasMore()) {
return false;
}

iter.getNextRowSequenceWithLength(end + 1 - currentOffset.longValue())
.forAllRowKeyRanges(builder::appendRange);
currentOffset.setValue(end + 1);
Expand All @@ -358,6 +366,48 @@ public final WritableRowSet subSetForPositions(RowSet posRowSet) {
return builder.build();
}

@Override
public final WritableRowSet subSetForReversePositions(RowSequence positions) {
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
if (positions.isEmpty()) {
return RowSetFactory.empty();
}

final long lastRowPosition = size() - 1;
if (positions.size() == positions.lastRowKey() - positions.firstRowKey() + 1) {
// We have a single range in the input sequence
final long forwardEnd = lastRowPosition - positions.firstRowKey();
if (forwardEnd < 0) {
// The single range does not overlap with the available positions at all
return RowSetFactory.empty();
}
// Clamp the single range end to 0
final long forwardStart = Math.max(lastRowPosition - positions.lastRowKey(), 0);
try (final RowSequence forwardPositions = RowSequenceFactory.forRange(forwardStart, forwardEnd)) {
return subSetForPositions(forwardPositions);
}
}

// We have some non-trivial input sequence
final RowSetBuilderRandom builder = RowSetFactory.builderRandom();
positions.forEachRowKeyRange((start, end) -> {
final long forwardEnd = lastRowPosition - start;
if (forwardEnd < 0) {
// This range does not overlap with the available positions at all, and thus neither can subsequent
// ranges that are offset further from the lastRowPosition.
return false;
}
// Clamp the range end to 0
final long forwardStart = Math.max(lastRowPosition - end, 0);
builder.addRange(forwardStart, forwardEnd);

// Continue iff subsequent ranges may overlap the available positions
return forwardStart != 0;
});
try (final RowSequence forwardPositions = builder.build()) {
return subSetForPositions(forwardPositions);
}
}

@Override
public final long get(final long rowPosition) {
return innerSet.ixGet(rowPosition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.deephaven.engine.rowset.*;
import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeyRanges;
import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeys;
import io.deephaven.engine.rowset.impl.rsp.container.MutableInteger;
import io.deephaven.util.datastructures.LongRangeIterator;
import io.deephaven.chunk.LongChunk;
import io.deephaven.chunk.WritableLongChunk;
Expand Down Expand Up @@ -3250,4 +3251,122 @@ public void testRemoveSrFromRspFullBlockRegression2() {
final OrderedLongSet result = rb.ixRemove(sr);
assertEquals(card - sr.getCardinality(), result.ixCardinality());
}

public void testSubSetForPositions() {
final RowSetBuilderSequential b = RowSetFactory.builderSequential();
final long[] vs = new long[] {3, 4, 5, 8, 10, 12, 29, 31, 44, 45, 46, 59, 60, 61, 72, 65537, 65539, 65536 * 3,
65536 * 3 + 5};
for (long v : vs) {
b.appendKey(v);
}
final RowSet ix = b.build();
final long sz = ix.size();

// test the empty ranges
try (final RowSet empty = RowSetFactory.empty()) {
assertEquals("empty range, fwd", ix.subSetForPositions(empty, false).size(), 0);
assertEquals("empty range, rev", ix.subSetForPositions(empty, true).size(), 0);
}

final int EXTRA_RANGE = 10;

// single range, deliberately allow some to be outside the upper boundary
for (int start = 0; start < vs.length; start++) {
for (int end = start; end <= vs.length + EXTRA_RANGE; end++) {

int coercedEnd = Math.min(end, vs.length - 1);
int expectedSize = coercedEnd - start + 1; // rowset ranges are inclusive

String m = "single range, s=" + start + ", end=" + end;
try (final RowSequence rs = RowSetFactory.fromRange(start, end)) {
try (final RowSet ret = ix.subSetForPositions(rs, false)) {
// verify the size is correct
assertEquals(m + ", fwd: size check", expectedSize, ret.size());
for (int i = 0; i < ret.size(); i++) {
int idx = start + i;
assertEquals(m + ", fwd: i=" + i, vs[idx], ret.get(i));
}
}
// now test the reversed functionality
try (final RowSet ret = ix.subSetForPositions(rs, true)) {
// verify the size is correct
assertEquals(m + ", rev: size check", expectedSize, ret.size());
int lastPos = vs.length - 1;
for (int i = 0; i < ret.size(); i++) {
int idx = lastPos - coercedEnd + i;
assertEquals(m + ", rev: i=" + i, vs[idx], ret.get(i));
}
}
}
}
}

// complex ranges, deliberately allow some to be outside the upper boundary
for (int rangeSize = 1; rangeSize < vs.length + EXTRA_RANGE + 1 / 2; rangeSize++) {
String m = "complex range, rangeSize=" + rangeSize;

final RowSetBuilderSequential rb = RowSetFactory.builderSequential();

int expectedSize = 0;

// create ranges of these sizes, skipping 1 value each time
long idx = 0;
while (idx < vs.length + EXTRA_RANGE) {
long s = idx;
long e = idx + rangeSize - 1;

// compute the expected size
if (s < vs.length) {
expectedSize += Math.min(e, vs.length - 1) - s + 1;
}

rb.appendRange(idx, e); // range is inclusive
idx += rangeSize + 1; // skip a value
}

try (final RowSequence rs = rb.build()) {
try (final RowSet ret = ix.subSetForPositions(rs, false)) {
// verify the size is correct
assertEquals(m + ", fwd: size check", expectedSize, ret.size());

// keep track of the index of the working set
final MutableInteger retIndex = new MutableInteger(0);

rs.forEachRowKeyRange((final long start, final long end) -> {
for (long i = start; i <= end; i++) {
int index = (int) i;
if (index >= 0 && index < vs.length) {
assertEquals(m + ", fwd: i=" + index, vs[index], ret.get(retIndex.value));
retIndex.value++;
}
}
return true;
});
}

// now test the reversed functionality
try (final RowSet ret = ix.subSetForPositions(rs, true)) {
// verify the size is correct
assertEquals(m + ", rev: size check", expectedSize, ret.size());

// keep track of the index of the working set
final MutableInteger retIndex = new MutableInteger(0);
final int lastPos = vs.length - 1;

rs.forEachRowKeyRange((final long start, final long end) -> {
// translate into reversed positions
for (long i = start; i <= end; i++) {
int index = lastPos - (int) i;
if (index >= 0 && index < vs.length) {
assertEquals(m + ", fwd: i=" + index, vs[index],
ret.get(ret.size() - retIndex.value - 1));
retIndex.value++;
}
}
return true;
});
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@
import io.deephaven.base.verify.Assert;
import io.deephaven.configuration.Configuration;
import io.deephaven.datastructures.util.CollectionUtil;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.rowset.*;
import io.deephaven.engine.table.SharedContext;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.updategraph.UpdateGraphProcessor;
import io.deephaven.engine.table.impl.ShiftObliviousInstrumentedListener;
import io.deephaven.engine.table.impl.sources.ReinterpretUtils;
Expand Down Expand Up @@ -522,7 +520,7 @@ static InitialSnapshot constructInitialSnapshotInPositionSpace(final Object logI
*/
public static BarrageMessage constructBackplaneSnapshot(final Object logIdentityObject,
final BaseTable table) {
return constructBackplaneSnapshotInPositionSpace(logIdentityObject, table, null, null);
return constructBackplaneSnapshotInPositionSpace(logIdentityObject, table, null, null, null);
}

/**
Expand All @@ -539,9 +537,10 @@ public static BarrageMessage constructBackplaneSnapshot(final Object logIdentity
public static BarrageMessage constructBackplaneSnapshotInPositionSpace(final Object logIdentityObject,
final BaseTable table,
@Nullable final BitSet columnsToSerialize,
@Nullable final RowSet positionsToSnapshot) {
@Nullable final RowSet positionsToSnapshot,
@Nullable final RowSet reversePositionsToSnapshot) {
return constructBackplaneSnapshotInPositionSpace(logIdentityObject, table, columnsToSerialize,
positionsToSnapshot, makeSnapshotControl(false, table));
positionsToSnapshot, reversePositionsToSnapshot, makeSnapshotControl(false, table));
}

/**
Expand All @@ -559,6 +558,7 @@ public static BarrageMessage constructBackplaneSnapshotInPositionSpace(final Obj
@NotNull final BaseTable table,
@Nullable final BitSet columnsToSerialize,
@Nullable final RowSet positionsToSnapshot,
@Nullable final RowSet reversePositionsToSnapshot,
@NotNull final SnapshotControl control) {

final BarrageMessage snapshot = new BarrageMessage();
Expand All @@ -567,16 +567,30 @@ public static BarrageMessage constructBackplaneSnapshotInPositionSpace(final Obj

final SnapshotFunction doSnapshot = (usePrev, beforeClockValue) -> {
final RowSet keysToSnapshot;
if (positionsToSnapshot == null) {
if (positionsToSnapshot == null && reversePositionsToSnapshot == null) {
keysToSnapshot = null;
} else if (usePrev) {
try (final RowSet prevRowSet = table.getRowSet().copyPrev()) {
keysToSnapshot = prevRowSet.subSetForPositions(positionsToSnapshot);
}
} else {
keysToSnapshot = table.getRowSet().subSetForPositions(positionsToSnapshot);
final RowSet rowSetToUse = usePrev ? table.getRowSet().copyPrev() : table.getRowSet();
try (final SafeCloseable ignored = usePrev ? rowSetToUse : null) {
final WritableRowSet forwardKeys =
positionsToSnapshot == null ? null : rowSetToUse.subSetForPositions(positionsToSnapshot);
final RowSet reverseKeys = reversePositionsToSnapshot == null ? null
: rowSetToUse.subSetForReversePositions(reversePositionsToSnapshot);
if (forwardKeys != null) {
if (reverseKeys != null) {
forwardKeys.insert(reverseKeys);
reverseKeys.close();
}
keysToSnapshot = forwardKeys;
} else {
keysToSnapshot = reverseKeys;
}
}
}
try (final RowSet ignored = keysToSnapshot) {
return serializeAllTable(usePrev, snapshot, table, logIdentityObject, columnsToSerialize,
keysToSnapshot);
}
return serializeAllTable(usePrev, snapshot, table, logIdentityObject, columnsToSerialize, keysToSnapshot);
};

snapshot.step = callDataSnapshotFunction(System.identityHashCode(logIdentityObject), control, doSnapshot);
Expand Down Expand Up @@ -1273,7 +1287,6 @@ public static boolean serializeAllTable(boolean usePrev,
* @param logIdentityObject an object for use with log() messages
* @param columnsToSerialize A {@link BitSet} of columns to include, null for all
* @param keysToSnapshot A RowSet of keys within the table to include, null for all
*
* @return true if the snapshot was computed with an unchanged clock, false otherwise.
*/
public static boolean serializeAllTable(final boolean usePrev,
Expand All @@ -1282,6 +1295,7 @@ public static boolean serializeAllTable(final boolean usePrev,
final Object logIdentityObject,
final BitSet columnsToSerialize,
final RowSet keysToSnapshot) {

snapshot.rowsAdded = (usePrev ? table.getRowSet().copyPrev() : table.getRowSet()).copy();
snapshot.rowsRemoved = RowSetFactory.empty();
snapshot.addColumnData = new BarrageMessage.AddColumnData[table.getColumnSources().size()];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public static class AddColumnData {

public boolean isSnapshot;
public RowSet snapshotRowSet;
public boolean snapshotRowSetIsReversed;
public BitSet snapshotColumns;

public RowSet rowsAdded;
Expand Down
Loading