Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement reverse viewport logic on BarrageSnapshotRequest and BarrageSubscriptionRequest #1968

Merged
merged 24 commits into from
Feb 28, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
c0bdf60
work in progress
lbooker42 Feb 9, 2022
5c68ccf
builds, untested but shows potential impl
lbooker42 Feb 10, 2022
b580a0f
WIP commit
lbooker42 Feb 14, 2022
372f61a
tested worker to worker for snapshot & subscription
lbooker42 Feb 14, 2022
8ac9648
Merge branch 'deephaven:main' into lab-reverse-viewport
lbooker42 Feb 18, 2022
7829fdc
addressing review comments
lbooker42 Feb 18, 2022
791f1fc
spotless checks
lbooker42 Feb 18, 2022
96f4993
added reverseViewport to test utilities, existing tests pass
lbooker42 Feb 18, 2022
cfe2306
most tests passing (not swiss reverse)
lbooker42 Feb 22, 2022
bcc1802
added reverse viewport to unit tests, all passing
lbooker42 Feb 23, 2022
a1b3ea7
spotless checks completed
lbooker42 Feb 23, 2022
c7174e6
passing all existing tests
lbooker42 Feb 23, 2022
1127817
small change
lbooker42 Feb 23, 2022
5ae4553
new tests for , corrections to
lbooker42 Feb 23, 2022
f06eaf4
forward & reverse viewport transition tests added
lbooker42 Feb 23, 2022
006d6f9
addressing PR comments
lbooker42 Feb 23, 2022
cd80cb0
all tests pass
lbooker42 Feb 24, 2022
4728f48
more informative comments on scoping
lbooker42 Feb 24, 2022
a8c3fd3
spotless cleanup
lbooker42 Feb 24, 2022
cef5ef0
formatting changes
lbooker42 Feb 24, 2022
aa545c5
Merge branch 'deephaven:main' into lab-reverse-viewport
lbooker42 Feb 24, 2022
e98f56e
Merge branch 'deephaven:main' into lab-reverse-viewport
lbooker42 Feb 25, 2022
e74a3eb
Merge branch 'deephaven:main' into lab-reverse-viewport
lbooker42 Feb 28, 2022
516f53e
addressed PR comments
lbooker42 Feb 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions engine/rowset/src/main/java/io/deephaven/engine/rowset/RowSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -428,12 +428,30 @@ public long binarySearchValue(TargetComparator targetComparator, int direction)
WritableRowSet subSetByKeyRange(long startKey, long endKey);

/**
* Get a subset of this RowSet according to the supplied set of row positions in {@code posRowSet}.
* Get a subset of this RowSet according to the supplied sequence of row positions in {@code posRowSequence}.
*
* @param posRowSet The RowSet of position-based ranges to extract.
* @param posRowSequence The {@link RowSequence} of positions ranges to get (as in {@link #get(long)})
* @param reversed Whether to treat {@code posRowSet} as offsets relative to {@link #size()} rather than {@code 0}
* @return A new RowSet, containing the row keys from this RowSet at the row positions in {@code posRowSequence}
*/
WritableRowSet subSetForPositions(RowSequence posRowSequence, boolean reversed);

/**
* Get a subset of this RowSet according to the supplied sequence of row positions in {@code posRowSequence}.
*
* @param posRowSequence The {@link RowSequence} of position-based ranges to extract.
* @return A new RowSet, containing values at the locations in the provided RowSet.
*/
WritableRowSet subSetForPositions(RowSet posRowSet);
WritableRowSet subSetForPositions(RowSequence posRowSequence);

/**
* Get a subset of this RowSet according to the supplied sequence of row positions relative to {@link #size()} in
* {@code posRowSequence}.
*
* @param posRowSequence The {@link RowSequence} of positions ranges to get (as in {@link #get(long)})
* @return A new RowSet, containing the row keys from this RowSet at the row positions in {@code posRowSequence}
*/
WritableRowSet subSetForReversePositions(RowSequence posRowSequence);

/**
* Returns the row key at the given row position.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Objects;
import java.util.PrimitiveIterator;
import java.util.*;
import java.util.function.LongConsumer;

public class WritableRowSetImpl extends RowSequenceAsChunkImpl implements WritableRowSet, Externalizable {
Expand Down Expand Up @@ -335,21 +334,30 @@ public final WritableRowSet subSetByKeyRange(final long startKey, final long end
}

@Override
public final WritableRowSet subSetForPositions(RowSet posRowSet) {
public final WritableRowSet subSetForPositions(RowSequence posRowSequence, boolean reversed) {
if (reversed) {
return subSetForReversePositions(posRowSequence);
}
return subSetForPositions(posRowSequence);
}

@Override
public final WritableRowSet subSetForPositions(RowSequence positions) {
if (positions.isEmpty()) {
return RowSetFactory.empty();
}
final MutableLong currentOffset = new MutableLong();
final RowSequence.Iterator iter = getRowSequenceIterator();
final RowSetBuilderSequential builder = RowSetFactory.builderSequential();
posRowSet.forEachRowKeyRange((start, end) -> {
positions.forEachRowKeyRange((start, end) -> {
if (currentOffset.longValue() < start) {
// skip items until the beginning of this range
iter.getNextRowSequenceWithLength(start - currentOffset.longValue());
currentOffset.setValue(start);
}

if (!iter.hasMore()) {
return false;
}

iter.getNextRowSequenceWithLength(end + 1 - currentOffset.longValue())
.forAllRowKeyRanges(builder::appendRange);
currentOffset.setValue(end + 1);
Expand All @@ -358,6 +366,48 @@ public final WritableRowSet subSetForPositions(RowSet posRowSet) {
return builder.build();
}

@Override
public final WritableRowSet subSetForReversePositions(RowSequence positions) {
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
if (positions.isEmpty()) {
return RowSetFactory.empty();
}

final long lastRowPosition = size() - 1;
if (positions.size() == positions.lastRowKey() - positions.firstRowKey() + 1) {
// We have a single range in the input sequence
final long forwardEnd = lastRowPosition - positions.firstRowKey();
if (forwardEnd < 0) {
// The single range does not overlap with the available positions at all
return RowSetFactory.empty();
}
// Clamp the single range end to 0
final long forwardStart = Math.max(lastRowPosition - positions.lastRowKey(), 0);
try (final RowSequence forwardPositions = RowSequenceFactory.forRange(forwardStart, forwardEnd)) {
return subSetForPositions(forwardPositions);
}
}

// We have some non-trivial input sequence
final RowSetBuilderRandom builder = RowSetFactory.builderRandom();
positions.forEachRowKeyRange((start, end) -> {
final long forwardEnd = lastRowPosition - start;
if (forwardEnd < 0) {
// This range does not overlap with the available positions at all, and thus neither can subsequent
// ranges that are offset further from the lastRowPosition.
return false;
}
// Clamp the range end to 0
final long forwardStart = Math.max(lastRowPosition - end, 0);
builder.addRange(forwardStart, forwardEnd);

// Continue iff subsequent ranges may overlap the available positions
return forwardStart != 0;
});
try (final RowSequence forwardPositions = builder.build()) {
return subSetForPositions(forwardPositions);
}
}

@Override
public final long get(final long rowPosition) {
return innerSet.ixGet(rowPosition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@
import io.deephaven.base.verify.Assert;
import io.deephaven.configuration.Configuration;
import io.deephaven.datastructures.util.CollectionUtil;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.rowset.*;
import io.deephaven.engine.table.SharedContext;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.updategraph.UpdateGraphProcessor;
import io.deephaven.engine.table.impl.ShiftObliviousInstrumentedListener;
import io.deephaven.engine.table.impl.sources.ReinterpretUtils;
Expand Down Expand Up @@ -522,7 +520,7 @@ static InitialSnapshot constructInitialSnapshotInPositionSpace(final Object logI
*/
public static BarrageMessage constructBackplaneSnapshot(final Object logIdentityObject,
final BaseTable table) {
return constructBackplaneSnapshotInPositionSpace(logIdentityObject, table, null, null);
return constructBackplaneSnapshotInPositionSpace(logIdentityObject, table, null, null, null);
}

/**
Expand All @@ -539,9 +537,10 @@ public static BarrageMessage constructBackplaneSnapshot(final Object logIdentity
public static BarrageMessage constructBackplaneSnapshotInPositionSpace(final Object logIdentityObject,
final BaseTable table,
@Nullable final BitSet columnsToSerialize,
@Nullable final RowSet positionsToSnapshot) {
@Nullable final RowSet positionsToSnapshot,
@Nullable final RowSet reversePositionsToSnapshot) {
return constructBackplaneSnapshotInPositionSpace(logIdentityObject, table, columnsToSerialize,
positionsToSnapshot, makeSnapshotControl(false, table));
positionsToSnapshot, reversePositionsToSnapshot, makeSnapshotControl(false, table));
}

/**
Expand All @@ -559,6 +558,7 @@ public static BarrageMessage constructBackplaneSnapshotInPositionSpace(final Obj
@NotNull final BaseTable table,
@Nullable final BitSet columnsToSerialize,
@Nullable final RowSet positionsToSnapshot,
@Nullable final RowSet reversePositionsToSnapshot,
@NotNull final SnapshotControl control) {

final BarrageMessage snapshot = new BarrageMessage();
Expand All @@ -567,16 +567,29 @@ public static BarrageMessage constructBackplaneSnapshotInPositionSpace(final Obj

final SnapshotFunction doSnapshot = (usePrev, beforeClockValue) -> {
final RowSet keysToSnapshot;
if (positionsToSnapshot == null) {
if (positionsToSnapshot == null && reversePositionsToSnapshot == null) {
keysToSnapshot = null;
} else if (usePrev) {
try (final RowSet prevRowSet = table.getRowSet().copyPrev()) {
keysToSnapshot = prevRowSet.subSetForPositions(positionsToSnapshot);
}
} else {
keysToSnapshot = table.getRowSet().subSetForPositions(positionsToSnapshot);
final RowSet rowSetToUse = usePrev ? table.getRowSet().copyPrev() : table.getRowSet();
try (final SafeCloseable ignored = usePrev ? rowSetToUse : null) {
final WritableRowSet forwardKeys = positionsToSnapshot == null ? null :
rowSetToUse.subSetForPositions(positionsToSnapshot);
final RowSet reverseKeys = reversePositionsToSnapshot == null ? null :
rowSetToUse.subSetForReversePositions(reversePositionsToSnapshot);
if (forwardKeys != null) {
if (reverseKeys != null) {
forwardKeys.insert(reverseKeys);
reverseKeys.close();
}
keysToSnapshot = forwardKeys;
} else {
keysToSnapshot = reverseKeys;
}
}
}
try (final RowSet ignored = keysToSnapshot) {
return serializeAllTable(usePrev, snapshot, table, logIdentityObject, columnsToSerialize, keysToSnapshot);
}
return serializeAllTable(usePrev, snapshot, table, logIdentityObject, columnsToSerialize, keysToSnapshot);
};

snapshot.step = callDataSnapshotFunction(System.identityHashCode(logIdentityObject), control, doSnapshot);
Expand Down Expand Up @@ -1268,20 +1281,20 @@ public static boolean serializeAllTable(boolean usePrev,
* {@link #constructBackplaneSnapshot} for simple use cases or {@link #callDataSnapshotFunction} for more advanced
* uses.
*
* @param usePrev Use previous values?
* @param snapshot The snapshot to populate
* @param logIdentityObject an object for use with log() messages
* @param usePrev Use previous values?
* @param snapshot The snapshot to populate
* @param logIdentityObject an object for use with log() messages
* @param columnsToSerialize A {@link BitSet} of columns to include, null for all
* @param keysToSnapshot A RowSet of keys within the table to include, null for all
*
* @param keysToSnapshot A RowSet of keys within the table to include, null for all
* @return true if the snapshot was computed with an unchanged clock, false otherwise.
*/
public static boolean serializeAllTable(final boolean usePrev,
final BarrageMessage snapshot,
final BaseTable table,
final Object logIdentityObject,
final BitSet columnsToSerialize,
final RowSet keysToSnapshot) {
final BarrageMessage snapshot,
final BaseTable table,
final Object logIdentityObject,
final BitSet columnsToSerialize,
final RowSet keysToSnapshot) {

snapshot.rowsAdded = (usePrev ? table.getRowSet().copyPrev() : table.getRowSet()).copy();
snapshot.rowsRemoved = RowSetFactory.empty();
snapshot.addColumnData = new BarrageMessage.AddColumnData[table.getColumnSources().size()];
Expand All @@ -1301,7 +1314,7 @@ public static boolean serializeAllTable(final boolean usePrev,
final String[] columnSources = sourceMap.keySet().toArray(CollectionUtil.ZERO_LENGTH_STRING_ARRAY);

try (final SharedContext sharedContext =
(columnSources.length > 1) ? SharedContext.makeSharedContext() : null) {
(columnSources.length > 1) ? SharedContext.makeSharedContext() : null) {
for (int ii = 0; ii < columnSources.length; ++ii) {
if (concurrentAttemptInconsistent()) {
final LogEntry logEntry = log.info().append(System.identityHashCode(logIdentityObject))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public static class AddColumnData {

public boolean isSnapshot;
public RowSet snapshotRowSet;
public boolean snapshotRowSetIsReversed;
public BitSet snapshotColumns;

public RowSet rowsAdded;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,47 @@
*/
public class ShiftInversionHelper {

final RowSetShiftData shifted;
final private RowSetShiftData shifted;
final private boolean reverseOrder;
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved

private int destShiftIdx = 0;
private int destShiftIdx;

public ShiftInversionHelper(final RowSetShiftData shifted) {
// if not specified, assume forward viewport ordering
this(shifted, false);
}

public ShiftInversionHelper(final RowSetShiftData shifted, final boolean reverseOrder) {
this.shifted = shifted;
this.reverseOrder = reverseOrder;
this.destShiftIdx = reverseOrder ? shifted.size() : 0;
}

private void advanceDestShiftIdx(final long destKey) {
Assert.geq(destKey, "destKey", 0);
destShiftIdx = (int) binarySearch(destShiftIdx, shifted.size(), innerShiftIdx -> {
long destEnd = shifted.getEndRange((int) innerShiftIdx) + shifted.getShiftDelta((int) innerShiftIdx);
// due to destKey's expected range, we know this subtraction will not overflow
return destEnd - destKey;
});
destShiftIdx = (int) binarySearch(
reverseOrder ? 0 : destShiftIdx,
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
reverseOrder ? destShiftIdx : shifted.size(),
innerShiftIdx -> {
long destEnd = shifted.getEndRange((int) innerShiftIdx) + shifted.getShiftDelta((int) innerShiftIdx);
// due to destKey's expected range, we know this subtraction will not overflow
return destEnd - destKey;
});
}

/**
* Converts post-keyspace key to pre-keyspace key. It expects to be invoked in ascending key order.
*/
public long mapToPrevKeyspace(final long key, final boolean isEnd) {
if (shifted.empty()) {
return key;
}

advanceDestShiftIdx(key);

final long retval;
final int idx = destShiftIdx;

if (idx < shifted.size() && shifted.getBeginRange(idx) + shifted.getShiftDelta(idx) <= key) {
// inside of a destination shift; this is easy to map to prev
retval = key - shifted.getShiftDelta(idx);
Expand Down
2 changes: 1 addition & 1 deletion extensions/barrage/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ dependencies {
api project(':engine-table')
implementation project(':proto:proto-backplane-grpc-flight')
implementation project(':log-factory')
api "io.deephaven.barrage:barrage-format:0.4.0"
api "io.deephaven.barrage:barrage-format:0.4.1"

Classpaths.inheritFlatbuffer(project, 'implementation')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public class BarrageTable extends QueryTable implements BarrageMessage.Listener,
* re-send data that the client should already have within the existing viewport.
*/
private RowSet serverViewport;
private boolean serverReverseViewport;
private BitSet serverColumns;


Expand Down Expand Up @@ -198,8 +199,7 @@ public void handleBarrageError(Throwable t) {
enqueueError(t);
}

private UpdateCoalescer processUpdate(final BarrageMessage update,
final UpdateCoalescer coalescer) {
private UpdateCoalescer processUpdate(final BarrageMessage update, final UpdateCoalescer coalescer) {
if (DEBUG_ENABLED) {
saveForDebugging(update);

Expand All @@ -223,6 +223,7 @@ private UpdateCoalescer processUpdate(final BarrageMessage update,

if (update.isSnapshot) {
serverViewport = update.snapshotRowSet == null ? null : update.snapshotRowSet.copy();
serverReverseViewport = update.snapshotRowSetIsReversed;
serverColumns = update.snapshotColumns == null ? null : (BitSet) update.snapshotColumns.clone();
}

Expand All @@ -233,7 +234,9 @@ private UpdateCoalescer processUpdate(final BarrageMessage update,

try (final RowSet currRowsFromPrev = currentRowSet.copy();
final WritableRowSet populatedRows =
(serverViewport != null ? currentRowSet.subSetForPositions(serverViewport) : null)) {
(serverViewport != null
? currentRowSet.subSetForPositions(serverViewport, serverReverseViewport)
: null)) {

// removes
currentRowSet.remove(update.rowsRemoved);
Expand Down Expand Up @@ -307,7 +310,8 @@ private UpdateCoalescer processUpdate(final BarrageMessage update,

// remove all data outside of our viewport
if (serverViewport != null) {
try (final RowSet newPopulated = currentRowSet.subSetForPositions(serverViewport)) {
try (final RowSet newPopulated =
currentRowSet.subSetForPositions(serverViewport, serverReverseViewport)) {
populatedRows.remove(newPopulated);
freeRows(populatedRows);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public BarrageMessage safelyParseFrom(final StreamReaderOptions options,
msg = new BarrageMessage();

msg.isSnapshot = metadata.isSnapshot();
msg.snapshotRowSetIsReversed = metadata.effectiveReverseViewport();

numAddBatchesRemaining = metadata.numAddBatches();
numModBatchesRemaining = metadata.numModBatches();
if (numAddBatchesRemaining > 1 || numModBatchesRemaining > 1) {
Expand Down
Loading