Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add a query paginator #1530

Merged
merged 9 commits into from
Nov 29, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.bigtable.data.v2.models;

import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.v2.RowFilter;
Expand Down Expand Up @@ -248,6 +249,29 @@ public List<Query> shard(SortedSet<ByteString> splitPoints) {
return shards;
}

/**
* Create a query paginator that'll split the query into smaller chunks.
*
* <p>Example usage:
*
* <pre>{@code
* Query query = Query.create(...).range("a", "z");
* Query.QueryPaginator paginator = query.createQueryPaginator(100);
mutianf marked this conversation as resolved.
Show resolved Hide resolved
* ByteString lastSeenRowKey = ByteString.EMPTY;
* while (paginator.advance(lastSeenRowKey)) {
* List<Row> rows = client.readRowsCallable().all().call(paginator.getNextQuery());
* for (Row row : rows) {
* // do some processing
* lastSeenRow = row;
* }
* }
* }</pre>
*/
@BetaApi("This surface is stable yet it might be removed in the future.")
public QueryPaginator createQueryPaginator(int chunkSize) {
mutianf marked this conversation as resolved.
Show resolved Hide resolved
return new QueryPaginator(this, chunkSize);
}

/** Get the minimal range that encloses all of the row keys and ranges in this Query. */
public ByteStringRange getBound() {
return RowSetUtil.getBound(builder.getRows());
Expand Down Expand Up @@ -297,6 +321,75 @@ private static ByteString wrapKey(String key) {
return ByteString.copyFromUtf8(key);
}

/**
* A Query Paginator that will split a query into small chunks. See {@link
* Query#createQueryPaginator(int)} for example usage.
*/
@BetaApi("This surface is stable yet it might be removed in the future.")
public class QueryPaginator {
mutianf marked this conversation as resolved.
Show resolved Hide resolved

private final long originalLimit;
mutianf marked this conversation as resolved.
Show resolved Hide resolved
private long newLimit;
mutianf marked this conversation as resolved.
Show resolved Hide resolved
private Query query;
private final int chunkSize;
private ByteString prevSplitPoint;
private boolean firstRun;

QueryPaginator(@Nonnull Query query, int chunkSize) {
this.originalLimit = query.builder.getRowsLimit();
this.newLimit = query.builder.getRowsLimit();
this.query = query;
this.chunkSize = chunkSize;
this.prevSplitPoint = ByteString.EMPTY;
this.firstRun = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need to set the limit on the query here? otherwise gtNextQuery will return the wrong thing

Copy link
Contributor Author

@mutianf mutianf Nov 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm this should be handled by advnace and getNextQuery needs to be called after advance:

Query query = Query.create(...).range("a", "z");
Query.QueryPaginator paginator = query.createQueryPaginator(100);
ByteString lastSeenRowKey = ByteString.EMPTY;
while (paginator.advance(lastSeenRowKey)) {
    List<Row> rows = client.readRowsCallable().all().call(paginator.getNextQuery());
      for (Row row : rows) {
      // do some processing
     lastSeenRow = row;
    }
 }

}

/** Return the next query. Needs to be called after advance(). */
Query getNextQuery() {
return query;
}

/**
* Construct the next query. Return true if there are more queries to return. False if we've
* read everything.
*/
boolean advance(@Nonnull ByteString lastSeenRowKey) {
mutianf marked this conversation as resolved.
Show resolved Hide resolved
Preconditions.checkArgument(
lastSeenRowKey != null, "lastSeenRowKey cannot be null, use ByteString.EMPTY instead.");
mutianf marked this conversation as resolved.
Show resolved Hide resolved
// Full table scans don't have ranges or limits. Keep track of the previous split
// point. If it's the same as the current input return false. The only exception
// is the first run for this paginator. So keep track of the first run state too.
if (!firstRun && prevSplitPoint.equals(lastSeenRowKey)) {
return false;
}
mutianf marked this conversation as resolved.
Show resolved Hide resolved
if (firstRun) {
firstRun = false;
}
this.prevSplitPoint = lastSeenRowKey;

// Set the query limit. If the original limit is set, return false if the new
// limit is <= 0 to avoid returning more rows than intended.
if (originalLimit != 0 && newLimit <= 0) {
return false;
}
if (originalLimit != 0) {
query.limit(Math.min(this.chunkSize, newLimit));
newLimit -= chunkSize;
} else {
query.limit(chunkSize);
}

// Split the row ranges / row keys. Return false if there's nothing
// left on the right of the split point.
RowSetUtil.Split split = RowSetUtil.split(query.builder.getRows(), lastSeenRowKey);
if (split.getRight() == null) {
return false;
}
query.builder.setRows(split.getRight());
return true;
}
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,4 +327,190 @@ public void testClone() {
assertThat(clonedReq).isEqualTo(query);
assertThat(clonedReq.toProto(requestContext)).isEqualTo(request);
}

@Test
public void testQueryPaginatorRangeLimitReached() {
int chunkSize = 10, limit = 15;
Query query = Query.create(TABLE_ID).range("a", "z").limit(limit);
Query.QueryPaginator paginator = query.createQueryPaginator(chunkSize);
assertThat(paginator.advance(ByteString.EMPTY)).isTrue();

Query nextQuery = paginator.getNextQuery();

Builder expectedProto =
expectedProtoBuilder()
.setRows(
RowSet.newBuilder()
.addRowRanges(
RowRange.newBuilder()
.setStartKeyClosed(ByteString.copyFromUtf8("a"))
.setEndKeyOpen(ByteString.copyFromUtf8("z"))
.build()))
.setRowsLimit(chunkSize);
assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build());

assertThat(paginator.advance(ByteString.copyFromUtf8("c"))).isTrue();
int expectedLimit = limit - chunkSize;
nextQuery = paginator.getNextQuery();
expectedProto =
expectedProtoBuilder()
.setRows(
RowSet.newBuilder()
.addRowRanges(
RowRange.newBuilder()
.setStartKeyOpen(ByteString.copyFromUtf8("c"))
.setEndKeyOpen(ByteString.copyFromUtf8("z"))
.build()))
.setRowsLimit(expectedLimit);
assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build());

assertThat(paginator.advance(ByteString.copyFromUtf8("d"))).isFalse();
}

@Test
public void testQueryPaginatorRangeLimitMultiplyOfChunkSize() {
int chunkSize = 10, limit = 20;
Query query = Query.create(TABLE_ID).range("a", "z").limit(limit);
Query.QueryPaginator paginator = query.createQueryPaginator(chunkSize);
assertThat(paginator.advance(ByteString.EMPTY)).isTrue();

Query nextQuery = paginator.getNextQuery();

Builder expectedProto =
expectedProtoBuilder()
.setRows(
RowSet.newBuilder()
.addRowRanges(
RowRange.newBuilder()
.setStartKeyClosed(ByteString.copyFromUtf8("a"))
.setEndKeyOpen(ByteString.copyFromUtf8("z"))
.build()))
.setRowsLimit(chunkSize);
assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build());

assertThat(paginator.advance(ByteString.copyFromUtf8("c"))).isTrue();
int expectedLimit = limit - chunkSize;
nextQuery = paginator.getNextQuery();
expectedProto =
expectedProtoBuilder()
.setRows(
RowSet.newBuilder()
.addRowRanges(
RowRange.newBuilder()
.setStartKeyOpen(ByteString.copyFromUtf8("c"))
.setEndKeyOpen(ByteString.copyFromUtf8("z"))
.build()))
.setRowsLimit(expectedLimit);
assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build());

assertThat(paginator.advance(ByteString.copyFromUtf8("d"))).isFalse();
}

@Test
public void testQueryPaginatorRagneNoLimit() {
int chunkSize = 10;
Query query = Query.create(TABLE_ID).range("a", "z");
Query.QueryPaginator paginator = query.createQueryPaginator(chunkSize);
assertThat(paginator.advance(ByteString.EMPTY)).isTrue();

Query nextQuery = paginator.getNextQuery();

Builder expectedProto =
expectedProtoBuilder()
.setRows(
RowSet.newBuilder()
.addRowRanges(
RowRange.newBuilder()
.setStartKeyClosed(ByteString.copyFromUtf8("a"))
.setEndKeyOpen(ByteString.copyFromUtf8("z"))
.build()))
.setRowsLimit(chunkSize);
assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build());

assertThat(paginator.advance(ByteString.copyFromUtf8("c"))).isTrue();
nextQuery = paginator.getNextQuery();
expectedProto
.setRows(
RowSet.newBuilder()
.addRowRanges(
RowRange.newBuilder()
.setStartKeyOpen(ByteString.copyFromUtf8("c"))
.setEndKeyOpen(ByteString.copyFromUtf8("z"))
.build()))
.setRowsLimit(chunkSize);
assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build());

assertThat(paginator.advance(ByteString.copyFromUtf8("z"))).isFalse();
}

@Test
public void testQueryPaginatorRowsNoLimit() {
int chunkSize = 10;
Query query = Query.create(TABLE_ID).rowKey("a").rowKey("b").rowKey("c");

Query.QueryPaginator paginator = query.createQueryPaginator(chunkSize);
assertThat(paginator.advance(ByteString.EMPTY)).isTrue();

Query nextQuery = paginator.getNextQuery();

ReadRowsRequest.Builder expectedProto = expectedProtoBuilder();
expectedProto
.getRowsBuilder()
.addRowKeys(ByteString.copyFromUtf8("a"))
.addRowKeys(ByteString.copyFromUtf8("b"))
.addRowKeys(ByteString.copyFromUtf8("c"));
expectedProto.setRowsLimit(chunkSize);

assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build());

paginator.advance(ByteString.copyFromUtf8("b"));
nextQuery = paginator.getNextQuery();
expectedProto = expectedProtoBuilder();
expectedProto.getRowsBuilder().addRowKeys(ByteString.copyFromUtf8("c"));
expectedProto.setRowsLimit(chunkSize);

assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build());

assertThat(paginator.advance(ByteString.copyFromUtf8("c"))).isFalse();
}

@Test
public void testQueryPaginatorFullTableScan() {
int chunkSize = 10;
Query query = Query.create(TABLE_ID);
Query.QueryPaginator queryPaginator = query.createQueryPaginator(chunkSize);
assertThat(queryPaginator.advance(ByteString.EMPTY)).isTrue();

ReadRowsRequest.Builder expectedProto =
expectedProtoBuilder().setRows(RowSet.getDefaultInstance()).setRowsLimit(chunkSize);
assertThat(queryPaginator.getNextQuery().toProto(requestContext))
.isEqualTo(expectedProto.build());

assertThat(queryPaginator.advance(ByteString.copyFromUtf8("a"))).isTrue();
expectedProto
.setRows(
RowSet.newBuilder()
.addRowRanges(
RowRange.newBuilder().setStartKeyOpen(ByteString.copyFromUtf8("a")).build()))
.setRowsLimit(chunkSize);
assertThat(queryPaginator.getNextQuery().toProto(requestContext))
.isEqualTo(expectedProto.build());

assertThat(queryPaginator.advance(ByteString.copyFromUtf8("a"))).isFalse();
}

@Test
public void testQueryPaginatorEmptyTable() {
int chunkSize = 10;
Query query = Query.create(TABLE_ID);
Query.QueryPaginator queryPaginator = query.createQueryPaginator(chunkSize);
assertThat(queryPaginator.advance(ByteString.EMPTY)).isTrue();

ReadRowsRequest.Builder expectedProto =
expectedProtoBuilder().setRows(RowSet.getDefaultInstance()).setRowsLimit(chunkSize);
assertThat(queryPaginator.getNextQuery().toProto(requestContext))
.isEqualTo(expectedProto.build());

assertThat(queryPaginator.advance(ByteString.EMPTY)).isFalse();
}
}