Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement reverse viewport logic on BarrageSnapshotRequest and BarrageSubscriptionRequest #1968

Merged
merged 24 commits into from
Feb 28, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
c0bdf60
work in progress
lbooker42 Feb 9, 2022
5c68ccf
builds, untested but shows potential impl
lbooker42 Feb 10, 2022
b580a0f
WIP commit
lbooker42 Feb 14, 2022
372f61a
tested worker to worker for snapshot & subscription
lbooker42 Feb 14, 2022
8ac9648
Merge branch 'deephaven:main' into lab-reverse-viewport
lbooker42 Feb 18, 2022
7829fdc
addressing review comments
lbooker42 Feb 18, 2022
791f1fc
spotless checks
lbooker42 Feb 18, 2022
96f4993
added reverseViewport to test utilities, existing tests pass
lbooker42 Feb 18, 2022
cfe2306
most tests passing (not swiss reverse)
lbooker42 Feb 22, 2022
bcc1802
added reverse viewport to unit tests, all passing
lbooker42 Feb 23, 2022
a1b3ea7
spotless checks completed
lbooker42 Feb 23, 2022
c7174e6
passing all existing tests
lbooker42 Feb 23, 2022
1127817
small change
lbooker42 Feb 23, 2022
5ae4553
new tests for , corrections to
lbooker42 Feb 23, 2022
f06eaf4
forward & reverse viewport transition tests added
lbooker42 Feb 23, 2022
006d6f9
addressing PR comments
lbooker42 Feb 23, 2022
cd80cb0
all tests pass
lbooker42 Feb 24, 2022
4728f48
more informative comments on scoping
lbooker42 Feb 24, 2022
a8c3fd3
spotless cleanup
lbooker42 Feb 24, 2022
cef5ef0
formatting changes
lbooker42 Feb 24, 2022
aa545c5
Merge branch 'deephaven:main' into lab-reverse-viewport
lbooker42 Feb 24, 2022
e98f56e
Merge branch 'deephaven:main' into lab-reverse-viewport
lbooker42 Feb 25, 2022
e74a3eb
Merge branch 'deephaven:main' into lab-reverse-viewport
lbooker42 Feb 28, 2022
516f53e
addressed PR comments
lbooker42 Feb 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions engine/rowset/src/main/java/io/deephaven/engine/rowset/RowSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -428,30 +428,30 @@ public long binarySearchValue(TargetComparator targetComparator, int direction)
WritableRowSet subSetByKeyRange(long startKey, long endKey);

/**
* Get a subset of this RowSet according to the supplied set of row positions in {@code posRowSet}.
* Get a subset of this RowSet according to the supplied sequence of row positions in {@code posRowSequence}.
*
* @param posRowSet The RowSet of position-based ranges to extract.
* @param reversed use a reversed view of the table (index 0 is last row of table)
* @return A new RowSet, containing values at the locations in the provided RowSet.
* @param posRowSequence The {@link RowSequence} of positions ranges to get (as in {@link #get(long)})
* @param reversed Whether to treat {@code posRowSet} as offsets relative to {@link #size()} rather than {@code 0}
* @return A new RowSet, containing the row keys from this RowSet at the row positions in {@code posRowSequence}
*/
WritableRowSet subSetForPositions(RowSet posRowSet, boolean reversed);
WritableRowSet subSetForPositions(RowSequence posRowSequence, boolean reversed);

/**
* Get a subset of this RowSet according to the supplied set of row positions in {@code posRowSet}.
* Get a subset of this RowSet according to the supplied sequence of row positions in {@code posRowSequence}.
*
* @param posRowSet The RowSet of position-based ranges to extract.
* @param posRowSequence The {@link RowSequence} of position-based ranges to extract.
* @return A new RowSet, containing values at the locations in the provided RowSet.
*/
WritableRowSet subSetForPositions(RowSet posRowSet);
WritableRowSet subSetForPositions(RowSequence posRowSequence);

/**
* Get a subset of this RowSet according to the supplied set of row positions in {@code posRowSet} in reversed
* position space (index 0 is last row of table)
* Get a subset of this RowSet according to the supplied sequence of row positions relative to {@link #size()} in
* {@code posRowSequence}.
*
* @param posRowSet The RowSet of position-based ranges to extract.
* @return A new RowSet, containing values at the locations in the provided RowSet.
* @param posRowSequence The {@link RowSequence} of positions ranges to get (as in {@link #get(long)})
* @return A new RowSet, containing the row keys from this RowSet at the row positions in {@code posRowSequence}
*/
WritableRowSet subSetForReversePositions(RowSet posRowSet);
WritableRowSet subSetForReversePositions(RowSequence posRowSequence);

/**
* Returns the row key at the given row position.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,29 +334,30 @@ public final WritableRowSet subSetByKeyRange(final long startKey, final long end
}

@Override
public final WritableRowSet subSetForPositions(RowSet posRowSet, boolean reversed) {
public final WritableRowSet subSetForPositions(RowSequence posRowSequence, boolean reversed) {
if (reversed) {
return subSetForReversePositions(posRowSet);
return subSetForReversePositions(posRowSequence);
}
return subSetForPositions(posRowSet);
return subSetForPositions(posRowSequence);
}

@Override
public final WritableRowSet subSetForPositions(RowSet posRowSet) {
public final WritableRowSet subSetForPositions(RowSequence positions) {
if (positions.isEmpty()) {
return RowSetFactory.empty();
}
final MutableLong currentOffset = new MutableLong();
final RowSequence.Iterator iter = getRowSequenceIterator();
final RowSetBuilderSequential builder = RowSetFactory.builderSequential();
posRowSet.forEachRowKeyRange((start, end) -> {
positions.forEachRowKeyRange((start, end) -> {
if (currentOffset.longValue() < start) {
// skip items until the beginning of this range
iter.getNextRowSequenceWithLength(start - currentOffset.longValue());
currentOffset.setValue(start);
}

if (!iter.hasMore()) {
return false;
}

iter.getNextRowSequenceWithLength(end + 1 - currentOffset.longValue())
.forAllRowKeyRanges(builder::appendRange);
currentOffset.setValue(end + 1);
Expand All @@ -366,17 +367,45 @@ public final WritableRowSet subSetForPositions(RowSet posRowSet) {
}

@Override
public final WritableRowSet subSetForReversePositions(RowSet posRowSet) {
final RowSetBuilderRandom builder = RowSetFactory.builderRandom();
public final WritableRowSet subSetForReversePositions(RowSequence positions) {
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
if (positions.isEmpty()) {
return RowSetFactory.empty();
}

final long lastRowPosition = size() - 1;
if (positions.size() == positions.lastRowKey() - positions.firstRowKey() + 1) {
// We have a single range in the input sequence
final long forwardEnd = lastRowPosition - positions.firstRowKey();
if (forwardEnd < 0) {
// The single range does not overlap with the available positions at all
return RowSetFactory.empty();
}
// Clamp the single range end to 0
final long forwardStart = Math.max(lastRowPosition - positions.lastRowKey(), 0);
try (final RowSequence forwardPositions = RowSequenceFactory.forRange(forwardStart, forwardEnd)) {
return subSetForPositions(forwardPositions);
}
}

long lastIndex = size() - 1;
// We have some non-trivial input sequence
final RowSetBuilderRandom builder = RowSetFactory.builderRandom();
positions.forEachRowKeyRange((start, end) -> {
final long forwardEnd = lastRowPosition - start;
if (forwardEnd < 0) {
// This range does not overlap with the available positions at all, and thus neither can subsequent
// ranges that are offset further from the lastRowPosition.
return false;
}
// Clamp the range end to 0
final long forwardStart = Math.max(lastRowPosition - end, 0);
builder.addRange(forwardStart, forwardEnd);

posRowSet.forEachRowKeyRange((start, end) -> {
builder.addRange(lastIndex - end, lastIndex - start);
return true;
// Continue iff subsequent ranges may overlap the available positions
return forwardStart != 0;
});

return subSetForPositions(builder.build());
try (final RowSequence forwardPositions = builder.build()) {
return subSetForPositions(forwardPositions);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,29 +570,26 @@ public static BarrageMessage constructBackplaneSnapshotInPositionSpace(final Obj
if (positionsToSnapshot == null && reversePositionsToSnapshot == null) {
keysToSnapshot = null;
} else {
final RowSetBuilderRandom keyBuilder = RowSetFactory.builderRandom();
if (usePrev) {
// perform actions on the previous rowset
try (final RowSet prevRowSet = table.getRowSet().copyPrev()) {
if (positionsToSnapshot != null) {
keyBuilder.addRowSet(prevRowSet.subSetForPositions(positionsToSnapshot));
final RowSet rowSetToUse = usePrev ? table.getRowSet().copyPrev() : table.getRowSet();
try (final SafeCloseable ignored = usePrev ? rowSetToUse : null) {
final WritableRowSet forwardKeys = positionsToSnapshot == null ? null :
rowSetToUse.subSetForPositions(positionsToSnapshot);
final RowSet reverseKeys = reversePositionsToSnapshot == null ? null :
rowSetToUse.subSetForReversePositions(reversePositionsToSnapshot);
if (forwardKeys != null) {
if (reverseKeys != null) {
forwardKeys.insert(reverseKeys);
reverseKeys.close();
}
if (reversePositionsToSnapshot != null) {
keyBuilder.addRowSet(prevRowSet.subSetForReversePositions(reversePositionsToSnapshot));
}
}
} else {
// perform actions on the current rowset
if (positionsToSnapshot != null) {
keyBuilder.addRowSet(table.getRowSet().subSetForPositions(positionsToSnapshot));
}
if (reversePositionsToSnapshot != null) {
keyBuilder.addRowSet(table.getRowSet().subSetForReversePositions(reversePositionsToSnapshot));
keysToSnapshot = forwardKeys;
} else {
keysToSnapshot = reverseKeys;
}
}
keysToSnapshot = keyBuilder.build();
}
return serializeAllTable(usePrev, snapshot, table, logIdentityObject, columnsToSerialize, keysToSnapshot);
try (final RowSet ignored = keysToSnapshot) {
return serializeAllTable(usePrev, snapshot, table, logIdentityObject, columnsToSerialize, keysToSnapshot);
}
};

snapshot.step = callDataSnapshotFunction(System.identityHashCode(logIdentityObject), control, doSnapshot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public static class AddColumnData {

public boolean isSnapshot;
public RowSet snapshotRowSet;
public boolean snapshotReverseRowSet;
public boolean snapshotRowSetIsReversed;
public BitSet snapshotColumns;

public RowSet rowsAdded;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,47 @@
*/
public class ShiftInversionHelper {

final RowSetShiftData shifted;
final private RowSetShiftData shifted;
final private boolean reverseOrder;
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved

private int destShiftIdx = 0;
private int destShiftIdx;

public ShiftInversionHelper(final RowSetShiftData shifted) {
// if not specified, assume forward viewport ordering
this(shifted, false);
}

public ShiftInversionHelper(final RowSetShiftData shifted, final boolean reverseOrder) {
this.shifted = shifted;
this.reverseOrder = reverseOrder;
this.destShiftIdx = reverseOrder ? shifted.size() : 0;
}

private void advanceDestShiftIdx(final long destKey) {
Assert.geq(destKey, "destKey", 0);
destShiftIdx = (int) binarySearch(destShiftIdx, shifted.size(), innerShiftIdx -> {
long destEnd = shifted.getEndRange((int) innerShiftIdx) + shifted.getShiftDelta((int) innerShiftIdx);
// due to destKey's expected range, we know this subtraction will not overflow
return destEnd - destKey;
});
destShiftIdx = (int) binarySearch(
reverseOrder ? 0 : destShiftIdx,
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
reverseOrder ? destShiftIdx : shifted.size(),
innerShiftIdx -> {
long destEnd = shifted.getEndRange((int) innerShiftIdx) + shifted.getShiftDelta((int) innerShiftIdx);
// due to destKey's expected range, we know this subtraction will not overflow
return destEnd - destKey;
});
}

/**
* Converts post-keyspace key to pre-keyspace key. It expects to be invoked in ascending key order.
*/
public long mapToPrevKeyspace(final long key, final boolean isEnd) {
if (shifted.empty()) {
return key;
}

advanceDestShiftIdx(key);

final long retval;
final int idx = destShiftIdx;

if (idx < shifted.size() && shifted.getBeginRange(idx) + shifted.getShiftDelta(idx) <= key) {
// inside of a destination shift; this is easy to map to prev
retval = key - shifted.getShiftDelta(idx);
Expand Down
2 changes: 1 addition & 1 deletion extensions/barrage/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ dependencies {
api project(':engine-table')
implementation project(':proto:proto-backplane-grpc-flight')
implementation project(':log-factory')
api "io.deephaven.barrage:barrage-format:0.4.0"
api "io.deephaven.barrage:barrage-format:0.4.1"

Classpaths.inheritFlatbuffer(project, 'implementation')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ private UpdateCoalescer processUpdate(final BarrageMessage update, final UpdateC

if (update.isSnapshot) {
serverViewport = update.snapshotRowSet == null ? null : update.snapshotRowSet.copy();
serverReverseViewport = update.snapshotReverseRowSet;
serverReverseViewport = update.snapshotRowSetIsReversed;
serverColumns = update.snapshotColumns == null ? null : (BitSet) update.snapshotColumns.clone();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public BarrageMessage safelyParseFrom(final StreamReaderOptions options,
msg = new BarrageMessage();

msg.isSnapshot = metadata.isSnapshot();
msg.snapshotReverseRowSet = metadata.effectiveReverseViewport();
msg.snapshotRowSetIsReversed = metadata.effectiveReverseViewport();

numAddBatchesRemaining = metadata.numAddBatches();
numModBatchesRemaining = metadata.numModBatches();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ protected void execute(final BarrageSession client) throws Exception {

try (final TableHandle handle = manager.executeLogic(logic());
final BarrageSubscription subscription = client.subscribe(handle, options)) {
// final BarrageTable table = subscription.entireTable();

final BarrageTable table = subscription.partialTable(RowSetFactory.flat(10), null, true);
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved

final CountDownLatch countDownLatch = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ BarrageSnapshot snapshot(TableSpec tableSpec, BarrageSnapshotOptions options)
* Request a partial snapshot of the data limited by viewport or column set and populate a {@link BarrageTable} with
* the data that is received.
*
* @param viewport the position-space viewport to use for the snapshot
* @param columns the columns to include in the snapshot
*
* @return the {@code BarrageTable}
*/
BarrageTable partialTable(RowSet viewport, BitSet columns) throws InterruptedException;
Expand All @@ -58,6 +61,11 @@ BarrageSnapshot snapshot(TableSpec tableSpec, BarrageSnapshotOptions options)
* Request a partial snapshot of the data limited by viewport or column set and populate a {@link BarrageTable} with
* the data that is received. Allows the viewport to be reversed.
*
* @param viewport the position-space viewport to use for the snapshot
* @param columns the columns to include in the snapshot
* @param reverseViewport Whether to treat {@code posRowSet} as offsets from to {@link #size()} rather than
* {@code 0}
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
*
* @return the {@code BarrageTable}
*/
BarrageTable partialTable(RowSet viewport, BitSet columns, boolean reverseViewport) throws InterruptedException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ public synchronized BarrageTable partialTable(RowSet viewport, BitSet columns) t
}

@Override
public synchronized BarrageTable partialTable(RowSet viewport, BitSet columns, boolean reverseViewport) throws InterruptedException {
public synchronized BarrageTable partialTable(RowSet viewport, BitSet columns, boolean reverseViewport)
throws InterruptedException {
// notify user when connection has already been used and closed
if (prevUsed) {
throw new UnsupportedOperationException("Snapshot object already used");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ BarrageSubscription subscribe(TableSpec tableSpec, BarrageSubscriptionOptions op
* Request a partial subscription of the data limited by viewport or column set and populate a {@link BarrageTable}
* with the data that is received.
*
* @param viewport the position-space viewport to use for the subscription
* @param columns the columns to include in the subscription
*
* @return the {@code BarrageTable}
*/
BarrageTable partialTable(RowSet viewport, BitSet columns) throws InterruptedException;
Expand All @@ -60,6 +63,11 @@ BarrageSubscription subscribe(TableSpec tableSpec, BarrageSubscriptionOptions op
* Request a partial subscription of the data limited by viewport or column set and populate a {@link BarrageTable}
* with the data that is received. Allows the viewport to be reversed.
*
* @param viewport the position-space viewport to use for the subscription
* @param columns the columns to include in the subscription
* @param reverseViewport Whether to treat {@code posRowSet} as offsets from to {@link #size()} rather than
* {@code 0} *
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
*
* @return the {@code BarrageTable}
*/
BarrageTable partialTable(RowSet viewport, BitSet columns, boolean reverseViewport) throws InterruptedException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ public synchronized BarrageTable partialTable(RowSet viewport, BitSet columns, b
if (!subscribed) {
// Send the initial subscription:
observer.onNext(FlightData.newBuilder()
.setAppMetadata(ByteStringAccess.wrap(makeRequestInternal(viewport, columns, reverseViewport, options)))
.setAppMetadata(
ByteStringAccess.wrap(makeRequestInternal(viewport, columns, reverseViewport, options)))
.build());
subscribed = true;
}
Expand Down
Loading