Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add getNewPartitions method to CloseStream for Bigtable ChangeStream #1655

Merged
merged 4 commits into from
Mar 1, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions google-cloud-bigtable/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@
<method>*getStatus*</method>
<to>com.google.cloud.bigtable.common.Status</to>
</difference>
<!-- add new method is ok because CloseStream is InternalApi -->
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/bigtable/data/v2/models/CloseStream</className>
<method>*getNewPartitions*</method>
</difference>
<!-- change method return type is ok because ChangeStreamMutation is InternalApi -->
<difference>
<differenceType>7006</differenceType>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.google.auto.value.AutoValue;
import com.google.bigtable.v2.ReadChangeStreamResponse;
import com.google.cloud.bigtable.common.Status;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.io.Serializable;
import java.util.List;
Expand All @@ -35,8 +37,22 @@ public abstract class CloseStream implements ChangeStreamRecord, Serializable {

private static CloseStream create(
com.google.rpc.Status status,
List<ChangeStreamContinuationToken> changeStreamContinuationTokens) {
return new AutoValue_CloseStream(Status.fromProto(status), changeStreamContinuationTokens);
List<ChangeStreamContinuationToken> changeStreamContinuationTokens,
List<ByteStringRange> newPartitions) {
if (status.getCode() == 0) {
tengzhonger marked this conversation as resolved.
Show resolved Hide resolved
Preconditions.checkState(
changeStreamContinuationTokens.isEmpty(),
"An OK CloseStream should not have continuation tokens.");
} else {
Preconditions.checkState(
!changeStreamContinuationTokens.isEmpty(),
"A non-OK CloseStream should have continuation token(s).");
Preconditions.checkState(
changeStreamContinuationTokens.size() == newPartitions.size(),
"Number of continuation tokens does not match number of new partitions.");
}
return new AutoValue_CloseStream(
Status.fromProto(status), changeStreamContinuationTokens, newPartitions);
}

/** Wraps the protobuf {@link ReadChangeStreamResponse.CloseStream}. */
Expand All @@ -46,6 +62,13 @@ public static CloseStream fromProto(@Nonnull ReadChangeStreamResponse.CloseStrea
closeStream.getStatus(),
closeStream.getContinuationTokensList().stream()
.map(ChangeStreamContinuationToken::fromProto)
.collect(ImmutableList.toImmutableList()),
closeStream.getNewPartitionsList().stream()
tengzhonger marked this conversation as resolved.
Show resolved Hide resolved
.map(
newPartition ->
ByteStringRange.create(
newPartition.getRowRange().getStartKeyClosed(),
newPartition.getRowRange().getEndKeyOpen()))
.collect(ImmutableList.toImmutableList()));
}

Expand All @@ -56,4 +79,8 @@ public static CloseStream fromProto(@Nonnull ReadChangeStreamResponse.CloseStrea
@InternalApi("Intended for use by the BigtableIO in apache/beam only.")
@Nonnull
public abstract List<ChangeStreamContinuationToken> getChangeStreamContinuationTokens();

@InternalApi("Intended for use by the BigtableIO in apache/beam only.")
@Nonnull
public abstract List<ByteStringRange> getNewPartitions();
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,20 @@
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.function.ThrowingRunnable;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Instant;

@RunWith(JUnit4.class)
public class ChangeStreamRecordTest {

@Rule public ExpectedException expect = ExpectedException.none();

@Test
public void heartbeatSerializationTest() throws IOException, ClassNotFoundException {
ReadChangeStreamResponse.Heartbeat heartbeatProto =
Expand All @@ -60,7 +66,7 @@ public void heartbeatSerializationTest() throws IOException, ClassNotFoundExcept

@Test
public void closeStreamSerializationTest() throws IOException, ClassNotFoundException {
Status status = Status.newBuilder().setCode(0).build();
Status status = Status.newBuilder().setCode(11).build();
RowRange rowRange1 =
RowRange.newBuilder()
.setStartKeyClosed(ByteString.copyFromUtf8(""))
Expand All @@ -85,6 +91,8 @@ public void closeStreamSerializationTest() throws IOException, ClassNotFoundExce
.setPartition(StreamPartition.newBuilder().setRowRange(rowRange2).build())
.setToken(token2)
.build())
.addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange1))
.addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange2))
.setStatus(status)
.build();
CloseStream closeStream = CloseStream.fromProto(closeStreamProto);
Expand All @@ -98,6 +106,7 @@ public void closeStreamSerializationTest() throws IOException, ClassNotFoundExce
assertThat(actual.getChangeStreamContinuationTokens())
.isEqualTo(closeStream.getChangeStreamContinuationTokens());
assertThat(actual.getStatus()).isEqualTo(closeStream.getStatus());
assertThat(actual.getNewPartitions()).isEqualTo(closeStream.getNewPartitions());
}

@Test
Expand Down Expand Up @@ -129,7 +138,7 @@ public void heartbeatTest() {

@Test
public void closeStreamTest() {
Status status = Status.newBuilder().setCode(0).build();
Status status = Status.newBuilder().setCode(11).build();
RowRange rowRange1 =
RowRange.newBuilder()
.setStartKeyClosed(ByteString.copyFromUtf8(""))
Expand All @@ -154,6 +163,8 @@ public void closeStreamTest() {
.setPartition(StreamPartition.newBuilder().setRowRange(rowRange2).build())
.setToken(token2)
.build())
.addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange1))
.addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange2))
.setStatus(status)
.build();
CloseStream actualCloseStream = CloseStream.fromProto(closeStreamProto);
Expand All @@ -169,5 +180,65 @@ public void closeStreamTest() {
ByteStringRange.create(rowRange2.getStartKeyClosed(), rowRange2.getEndKeyOpen()));
assertThat(token2)
.isEqualTo(actualCloseStream.getChangeStreamContinuationTokens().get(1).getToken());
assertThat(actualCloseStream.getNewPartitions().get(0))
.isEqualTo(
ByteStringRange.create(rowRange1.getStartKeyClosed(), rowRange1.getEndKeyOpen()));
assertThat(actualCloseStream.getNewPartitions().get(1))
.isEqualTo(
ByteStringRange.create(rowRange2.getStartKeyClosed(), rowRange2.getEndKeyOpen()));
}

// Tests that an OK CloseStream should not have continuation tokens.
@Test(expected = IllegalStateException.class)
public void closeStreamOkWithContinuationTokenShouldFail() {
Status status = Status.newBuilder().setCode(0).build();
RowRange rowRange =
RowRange.newBuilder()
.setStartKeyClosed(ByteString.copyFromUtf8(""))
.setEndKeyOpen(ByteString.copyFromUtf8("apple"))
.build();
String token = "close-stream-token-1";
ReadChangeStreamResponse.CloseStream closeStreamProto =
ReadChangeStreamResponse.CloseStream.newBuilder()
.addContinuationTokens(
StreamContinuationToken.newBuilder()
.setPartition(StreamPartition.newBuilder().setRowRange(rowRange))
.setToken(token))
.setStatus(status)
.build();
Assert.assertThrows(
IllegalStateException.class, (ThrowingRunnable) CloseStream.fromProto(closeStreamProto));
}

// Tests that a non-OK CloseStream should have continuation tokens.
@Test(expected = IllegalStateException.class)
public void closeStreamErrorWithoutContinuationTokenShouldFail() {
Status status = Status.newBuilder().setCode(11).build();
ReadChangeStreamResponse.CloseStream closeStreamProto =
ReadChangeStreamResponse.CloseStream.newBuilder().setStatus(status).build();
Assert.assertThrows(
IllegalStateException.class, (ThrowingRunnable) CloseStream.fromProto(closeStreamProto));
}

// Tests that the number of continuation tokens should match the number of new partitions.
@Test(expected = IllegalStateException.class)
public void closeStreamTokenAndNewPartitionCountMismatchedTest() {
Status status = Status.newBuilder().setCode(11).build();
RowRange rowRange =
RowRange.newBuilder()
.setStartKeyClosed(ByteString.copyFromUtf8(""))
.setEndKeyOpen(ByteString.copyFromUtf8("apple"))
.build();
String token = "close-stream-token-1";
ReadChangeStreamResponse.CloseStream closeStreamProto =
ReadChangeStreamResponse.CloseStream.newBuilder()
.addContinuationTokens(
StreamContinuationToken.newBuilder()
.setPartition(StreamPartition.newBuilder().setRowRange(rowRange))
.setToken(token))
.setStatus(status)
.build();
Assert.assertThrows(
IllegalStateException.class, (ThrowingRunnable) CloseStream.fromProto(closeStreamProto));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public void closeStreamTest() {
ReadChangeStreamResponse.CloseStream closeStreamProto =
ReadChangeStreamResponse.CloseStream.newBuilder()
.addContinuationTokens(streamContinuationToken)
.addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange))
.setStatus(Status.newBuilder().setCode(0).build())
.build();
ReadChangeStreamResponse response =
Expand All @@ -127,5 +128,8 @@ public void closeStreamTest() {
.isEqualTo(ByteStringRange.create(rowRange.getStartKeyClosed(), rowRange.getEndKeyOpen()));
assertThat(changeStreamContinuationToken.getToken())
.isEqualTo(streamContinuationToken.getToken());
assertThat(closeStream.getNewPartitions().size()).isEqualTo(1);
assertThat(closeStream.getNewPartitions().get(0))
.isEqualTo(ByteStringRange.create(rowRange.getStartKeyClosed(), rowRange.getEndKeyOpen()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.google.cloud.bigtable.data.v2.models.DeleteFamily;
import com.google.cloud.bigtable.data.v2.models.Entry;
import com.google.cloud.bigtable.data.v2.models.Heartbeat;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
import com.google.cloud.bigtable.data.v2.models.SetCell;
import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi;
import com.google.cloud.conformance.bigtable.v2.ChangeStreamTestDefinition.ChangeStreamTestFile;
Expand Down Expand Up @@ -173,6 +174,14 @@ public void test() throws Exception {
.setToken(token.getToken())
.build());
}
for (ByteStringRange newPartition : closeStream.getNewPartitions()) {
builder.addNewPartitions(
StreamPartition.newBuilder()
.setRowRange(
RowRange.newBuilder()
.setStartKeyClosed(newPartition.getStart())
.setEndKeyOpen(newPartition.getEnd())));
}
ReadChangeStreamResponse.CloseStream closeStreamProto = builder.build();
actualResults.add(
ReadChangeStreamTest.Result.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@ private StreamContinuationToken createStreamContinuationToken(@Nonnull String to
.build();
}

private StreamPartition createNewPartitionForCloseStream() {
mutianf marked this conversation as resolved.
Show resolved Hide resolved
return StreamPartition.newBuilder()
.setRowRange(
RowRange.newBuilder()
.setStartKeyClosed(ByteString.copyFromUtf8(START_KEY_CLOSED))
.setEndKeyOpen(ByteString.copyFromUtf8(END_KEY_OPEN)))
.build();
}

private ReadChangeStreamResponse.Heartbeat createHeartbeat(
StreamContinuationToken streamContinuationToken) {
return ReadChangeStreamResponse.Heartbeat.newBuilder()
Expand All @@ -133,6 +142,7 @@ private ReadChangeStreamResponse.Heartbeat createHeartbeat(
private ReadChangeStreamResponse.CloseStream createCloseStream() {
return ReadChangeStreamResponse.CloseStream.newBuilder()
.addContinuationTokens(createStreamContinuationToken(CLOSE_STREAM_TOKEN))
.addNewPartitions(createNewPartitionForCloseStream())
.setStatus(com.google.rpc.Status.newBuilder().setCode(0).build())
.build();
}
Expand Down
Loading