From 71a4bb9afbdffbc3d172f8e80526b1f5fd128847 Mon Sep 17 00:00:00 2001 From: Teng Zhong Date: Thu, 11 Aug 2022 09:39:38 -0400 Subject: [PATCH 1/4] feat: Add toByteString/fromByteString for ChangeStreamContinuationToken This will be used by the beam connector to write/read to a Bigtable table. This PR also does: 1. Revert the changes in https://github.com/googleapis/java-bigtable/pull/1345 since we can use Mockito to create mock objects for testing. --- .../models/ChangeStreamContinuationToken.java | 62 +++++++----- .../data/v2/models/ChangeStreamMutation.java | 27 ++--- .../bigtable/data/v2/models/CloseStream.java | 5 +- .../bigtable/data/v2/models/Heartbeat.java | 3 +- .../ChangeStreamContinuationTokenTest.java | 99 +++++++++++++++++++ 5 files changed, 149 insertions(+), 47 deletions(-) create mode 100644 google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationTokenTest.java diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationToken.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationToken.java index bb5363a3a5..992310edfc 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationToken.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationToken.java @@ -18,11 +18,11 @@ import com.google.api.core.InternalApi; import com.google.bigtable.v2.RowRange; import com.google.bigtable.v2.StreamContinuationToken; +import com.google.bigtable.v2.StreamPartition; +import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; import com.google.common.base.MoreObjects; import com.google.common.base.Objects; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; +import com.google.protobuf.ByteString; import java.io.Serializable; import javax.annotation.Nonnull; @@ -30,48 +30,62 @@ public final class ChangeStreamContinuationToken implements Serializable { private static final long serialVersionUID = 524679926247095L; - private transient StreamContinuationToken.Builder builder; + private final StreamContinuationToken proto; - private ChangeStreamContinuationToken(@Nonnull StreamContinuationToken.Builder builder) { - this.builder = builder; + private ChangeStreamContinuationToken(@Nonnull StreamContinuationToken proto) { + this.proto = proto; } - private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException { - input.defaultReadObject(); - builder = StreamContinuationToken.newBuilder().mergeFrom(input); - } - - private void writeObject(ObjectOutputStream output) throws IOException { - output.defaultWriteObject(); - builder.build().writeTo(output); + @InternalApi("Used in Changestream beam pipeline.") + public ChangeStreamContinuationToken( + @Nonnull ByteStringRange byteStringRange, @Nonnull String token) { + this.proto = + StreamContinuationToken.newBuilder() + .setPartition( + StreamPartition.newBuilder() + .setRowRange( + RowRange.newBuilder() + .setStartKeyClosed(byteStringRange.getStart()) + .setEndKeyOpen(byteStringRange.getEnd()) + .build()) + .build()) + .setToken(token) + .build(); } + // TODO: Change this to return ByteStringRange. public RowRange getRowRange() { - return this.builder.getPartition().getRowRange(); + return this.proto.getPartition().getRowRange(); } public String getToken() { - return this.builder.getToken(); + return this.proto.getToken(); } /** * Creates the protobuf. This method is considered an internal implementation detail and not meant * to be used by applications. */ - @InternalApi("Used in Changestream beam pipeline.") - public StreamContinuationToken toProto() { - return builder.build(); + StreamContinuationToken toProto() { + return proto; } /** Wraps the protobuf {@link StreamContinuationToken}. */ - @InternalApi("Used in Changestream beam pipeline.") - public static ChangeStreamContinuationToken fromProto( + static ChangeStreamContinuationToken fromProto( @Nonnull StreamContinuationToken streamContinuationToken) { - return new ChangeStreamContinuationToken(streamContinuationToken.toBuilder()); + return new ChangeStreamContinuationToken(streamContinuationToken); } - public ChangeStreamContinuationToken clone() { - return new ChangeStreamContinuationToken(this.builder.clone()); + @InternalApi("Used in Changestream beam pipeline.") + public ByteString toByteString() { + return proto.toByteString(); + } + + @InternalApi("Used in Changestream beam pipeline.") + public static ChangeStreamContinuationToken fromByteString(ByteString byteString) + throws Exception { + return new ChangeStreamContinuationToken( + StreamContinuationToken.newBuilder().mergeFrom(byteString).build()); } @Override diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java index f9107220b3..92c0ccbd47 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java @@ -15,7 +15,6 @@ */ package com.google.cloud.bigtable.data.v2.models; -import com.google.api.core.InternalApi; import com.google.bigtable.v2.ReadChangeStreamResponse.DataChange.Type; import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange; import com.google.cloud.bigtable.data.v2.stub.changestream.ChangeStreamRecordMerger; @@ -63,7 +62,7 @@ * ChangeStreamMutation changeStreamMutation = builder.setToken(...).setLowWatermark().build(); * } */ -public final class ChangeStreamMutation implements ChangeStreamRecord, Serializable { +public class ChangeStreamMutation implements ChangeStreamRecord, Serializable { private static final long serialVersionUID = 8419520253162024218L; private final ByteString rowKey; @@ -100,8 +99,7 @@ private ChangeStreamMutation(Builder builder) { * ChangeStreamMutation because `token` and `loWatermark` must be set later when we finish * building the logical mutation. */ - @InternalApi("Used in Changestream beam pipeline.") - public static Builder createUserMutation( + static Builder createUserMutation( @Nonnull ByteString rowKey, @Nonnull String sourceClusterId, @Nonnull Timestamp commitTimestamp, @@ -114,8 +112,7 @@ public static Builder createUserMutation( * because `token` and `loWatermark` must be set later when we finish building the logical * mutation. */ - @InternalApi("Used in Changestream beam pipeline.") - public static Builder createGcMutation( + static Builder createGcMutation( @Nonnull ByteString rowKey, @Nonnull Timestamp commitTimestamp, int tieBreaker) { return new Builder(rowKey, Type.GARBAGE_COLLECTION, null, commitTimestamp, tieBreaker); } @@ -227,8 +224,7 @@ private Builder(ChangeStreamMutation changeStreamMutation) { this.lowWatermark = changeStreamMutation.lowWatermark; } - @InternalApi("Used in Changestream beam pipeline.") - public Builder setCell( + Builder setCell( @Nonnull String familyName, @Nonnull ByteString qualifier, long timestamp, @@ -237,8 +233,7 @@ public Builder setCell( return this; } - @InternalApi("Used in Changestream beam pipeline.") - public Builder deleteCells( + Builder deleteCells( @Nonnull String familyName, @Nonnull ByteString qualifier, @Nonnull TimestampRange timestampRange) { @@ -246,26 +241,22 @@ public Builder deleteCells( return this; } - @InternalApi("Used in Changestream beam pipeline.") - public Builder deleteFamily(@Nonnull String familyName) { + Builder deleteFamily(@Nonnull String familyName) { this.entries.add(DeleteFamily.create(familyName)); return this; } - @InternalApi("Used in Changestream beam pipeline.") - public Builder setToken(@Nonnull String token) { + Builder setToken(@Nonnull String token) { this.token = token; return this; } - @InternalApi("Used in Changestream beam pipeline.") - public Builder setLowWatermark(@Nonnull Timestamp lowWatermark) { + Builder setLowWatermark(@Nonnull Timestamp lowWatermark) { this.lowWatermark = lowWatermark; return this; } - @InternalApi("Used in Changestream beam pipeline.") - public ChangeStreamMutation build() { + ChangeStreamMutation build() { Preconditions.checkArgument( token != null && lowWatermark != null, "ChangeStreamMutation must have a continuation token and low watermark."); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java index e871c86697..e245ce615e 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java @@ -29,7 +29,7 @@ import java.util.List; import javax.annotation.Nonnull; -public final class CloseStream implements ChangeStreamRecord, Serializable { +public class CloseStream implements ChangeStreamRecord, Serializable { private static final long serialVersionUID = 7316215828353608505L; private final Status status; private transient ImmutableList.Builder @@ -69,8 +69,7 @@ private void writeObject(ObjectOutputStream output) throws IOException { } /** Wraps the protobuf {@link ReadChangeStreamResponse.CloseStream}. */ - @InternalApi("Used in Changestream beam pipeline.") - public static CloseStream fromProto(@Nonnull ReadChangeStreamResponse.CloseStream closeStream) { + static CloseStream fromProto(@Nonnull ReadChangeStreamResponse.CloseStream closeStream) { return new CloseStream(closeStream.getStatus(), closeStream.getContinuationTokensList()); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java index 63c23525f3..f2371c8507 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java @@ -32,8 +32,7 @@ private static Heartbeat create( } /** Wraps the protobuf {@link ReadChangeStreamResponse.Heartbeat}. */ - @InternalApi("Used in Changestream beam pipeline.") - public static Heartbeat fromProto(@Nonnull ReadChangeStreamResponse.Heartbeat heartbeat) { + static Heartbeat fromProto(@Nonnull ReadChangeStreamResponse.Heartbeat heartbeat) { return create( ChangeStreamContinuationToken.fromProto(heartbeat.getContinuationToken()), heartbeat.getLowWatermark()); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationTokenTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationTokenTest.java new file mode 100644 index 0000000000..e1ba6c68f6 --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationTokenTest.java @@ -0,0 +1,99 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.models; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.bigtable.v2.RowRange; +import com.google.bigtable.v2.StreamContinuationToken; +import com.google.bigtable.v2.StreamPartition; +import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ChangeStreamContinuationTokenTest { + + private final String TOKEN = "token"; + + private ByteStringRange createFakeByteStringRange() { + return ByteStringRange.create("a", "b"); + } + + // TODO: Get rid of this once we change ChangeStreamContinuationToken::getRowRange() + // to ChangeStreamContinuationToken::getByteStringRange(). + private RowRange rowRangeFromByteStringRange(ByteStringRange byteStringRange) { + return RowRange.newBuilder() + .setStartKeyClosed(byteStringRange.getStart()) + .setEndKeyOpen(byteStringRange.getEnd()) + .build(); + } + + @Test + public void basicTest() throws Exception { + ByteStringRange byteStringRange = createFakeByteStringRange(); + ChangeStreamContinuationToken changeStreamContinuationToken = + new ChangeStreamContinuationToken(byteStringRange, TOKEN); + Assert.assertEquals( + changeStreamContinuationToken.getRowRange(), rowRangeFromByteStringRange(byteStringRange)); + Assert.assertEquals(changeStreamContinuationToken.getToken(), TOKEN); + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(bos); + oos.writeObject(changeStreamContinuationToken); + oos.close(); + ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray())); + ChangeStreamContinuationToken actual = (ChangeStreamContinuationToken) ois.readObject(); + assertThat(actual).isEqualTo(changeStreamContinuationToken); + } + + @Test + public void toProtoTest() { + ByteStringRange byteStringRange = createFakeByteStringRange(); + RowRange fakeRowRange = rowRangeFromByteStringRange(byteStringRange); + StreamContinuationToken proto = + StreamContinuationToken.newBuilder() + .setPartition(StreamPartition.newBuilder().setRowRange(fakeRowRange).build()) + .setToken(TOKEN) + .build(); + ChangeStreamContinuationToken changeStreamContinuationToken = + ChangeStreamContinuationToken.fromProto(proto); + Assert.assertEquals(changeStreamContinuationToken.getRowRange(), fakeRowRange); + Assert.assertEquals(changeStreamContinuationToken.getToken(), TOKEN); + Assert.assertEquals( + changeStreamContinuationToken, + ChangeStreamContinuationToken.fromProto(changeStreamContinuationToken.toProto())); + } + + @Test + public void toByteStringTest() throws Exception { + ByteStringRange byteStringRange = createFakeByteStringRange(); + ChangeStreamContinuationToken changeStreamContinuationToken = + new ChangeStreamContinuationToken(byteStringRange, TOKEN); + Assert.assertEquals( + changeStreamContinuationToken.getRowRange(), rowRangeFromByteStringRange(byteStringRange)); + Assert.assertEquals(changeStreamContinuationToken.getToken(), TOKEN); + Assert.assertEquals( + changeStreamContinuationToken, + ChangeStreamContinuationToken.fromByteString(changeStreamContinuationToken.toByteString())); + } +} From 726862a8142ce77d3a324b3ced3ce99b54188b57 Mon Sep 17 00:00:00 2001 From: Teng Zhong Date: Thu, 11 Aug 2022 09:57:16 -0400 Subject: [PATCH 2/4] fix: Update comments --- .../data/v2/models/ChangeStreamContinuationToken.java | 3 ++- .../cloud/bigtable/data/v2/models/ChangeStreamMutation.java | 2 ++ .../com/google/cloud/bigtable/data/v2/models/CloseStream.java | 4 ++++ .../com/google/cloud/bigtable/data/v2/models/Heartbeat.java | 1 + 4 files changed, 9 insertions(+), 1 deletion(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationToken.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationToken.java index 992310edfc..baca6220aa 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationToken.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationToken.java @@ -23,6 +23,7 @@ import com.google.common.base.MoreObjects; import com.google.common.base.Objects; import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; import java.io.Serializable; import javax.annotation.Nonnull; @@ -83,7 +84,7 @@ public ByteString toByteString() { @InternalApi("Used in Changestream beam pipeline.") public static ChangeStreamContinuationToken fromByteString(ByteString byteString) - throws Exception { + throws InvalidProtocolBufferException { return new ChangeStreamContinuationToken( StreamContinuationToken.newBuilder().mergeFrom(byteString).build()); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java index 92c0ccbd47..610380c8e8 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java @@ -61,6 +61,8 @@ * builder.deleteCells(...); * ChangeStreamMutation changeStreamMutation = builder.setToken(...).setLowWatermark().build(); * } + * + * Make this class non-final so that we can create a subclass to mock it. */ public class ChangeStreamMutation implements ChangeStreamRecord, Serializable { private static final long serialVersionUID = 8419520253162024218L; diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java index e245ce615e..ccb46cf3fb 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java @@ -29,6 +29,10 @@ import java.util.List; import javax.annotation.Nonnull; +/** + * A simple wrapper for {@link ReadChangeStreamResponse.CloseStream}. Make this class non-final so + * that we can create a subclass to mock it. + */ public class CloseStream implements ChangeStreamRecord, Serializable { private static final long serialVersionUID = 7316215828353608505L; private final Status status; diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java index f2371c8507..da8586909d 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java @@ -22,6 +22,7 @@ import java.io.Serializable; import javax.annotation.Nonnull; +/** A simple wrapper for {@link ReadChangeStreamResponse.Heartbeat}. */ @AutoValue public abstract class Heartbeat implements ChangeStreamRecord, Serializable { private static final long serialVersionUID = 7316215828353608504L; From a1d3a9223c7dda5cd48cf25c277ea0417f9736df Mon Sep 17 00:00:00 2001 From: Teng Zhong Date: Thu, 11 Aug 2022 11:46:04 -0400 Subject: [PATCH 3/4] fix: Address comments --- .../models/ChangeStreamContinuationToken.java | 21 ++++++++----------- .../ChangeStreamContinuationTokenTest.java | 2 +- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationToken.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationToken.java index baca6220aa..af7b15ea4e 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationToken.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationToken.java @@ -31,16 +31,16 @@ public final class ChangeStreamContinuationToken implements Serializable { private static final long serialVersionUID = 524679926247095L; - private final StreamContinuationToken proto; + private final StreamContinuationToken tokenProto; - private ChangeStreamContinuationToken(@Nonnull StreamContinuationToken proto) { - this.proto = proto; + private ChangeStreamContinuationToken(@Nonnull StreamContinuationToken tokenProto) { + this.tokenProto = tokenProto; } @InternalApi("Used in Changestream beam pipeline.") public ChangeStreamContinuationToken( @Nonnull ByteStringRange byteStringRange, @Nonnull String token) { - this.proto = + this.tokenProto = StreamContinuationToken.newBuilder() .setPartition( StreamPartition.newBuilder() @@ -56,19 +56,16 @@ public ChangeStreamContinuationToken( // TODO: Change this to return ByteStringRange. public RowRange getRowRange() { - return this.proto.getPartition().getRowRange(); + return this.tokenProto.getPartition().getRowRange(); } public String getToken() { - return this.proto.getToken(); + return this.tokenProto.getToken(); } - /** - * Creates the protobuf. This method is considered an internal implementation detail and not meant - * to be used by applications. - */ + // Creates the protobuf. StreamContinuationToken toProto() { - return proto; + return tokenProto; } /** Wraps the protobuf {@link StreamContinuationToken}. */ @@ -79,7 +76,7 @@ static ChangeStreamContinuationToken fromProto( @InternalApi("Used in Changestream beam pipeline.") public ByteString toByteString() { - return proto.toByteString(); + return tokenProto.toByteString(); } @InternalApi("Used in Changestream beam pipeline.") diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationTokenTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationTokenTest.java index e1ba6c68f6..e93dfc70bf 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationTokenTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationTokenTest.java @@ -67,7 +67,7 @@ public void basicTest() throws Exception { } @Test - public void toProtoTest() { + public void fromProtoTest() { ByteStringRange byteStringRange = createFakeByteStringRange(); RowRange fakeRowRange = rowRangeFromByteStringRange(byteStringRange); StreamContinuationToken proto = From b75467ee3009f8be1cf8f7a559a583d26c91194d Mon Sep 17 00:00:00 2001 From: Teng Zhong Date: Thu, 11 Aug 2022 12:02:17 -0400 Subject: [PATCH 4/4] fix: Add InternalExtensionOnly annotations for Heartbeat/CloseStream/ChangeStreamMutation --- .../cloud/bigtable/data/v2/models/ChangeStreamMutation.java | 2 ++ .../com/google/cloud/bigtable/data/v2/models/CloseStream.java | 2 ++ .../com/google/cloud/bigtable/data/v2/models/Heartbeat.java | 2 ++ 3 files changed, 6 insertions(+) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java index 610380c8e8..cfb8bb30b7 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java @@ -15,6 +15,7 @@ */ package com.google.cloud.bigtable.data.v2.models; +import com.google.api.core.InternalExtensionOnly; import com.google.bigtable.v2.ReadChangeStreamResponse.DataChange.Type; import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange; import com.google.cloud.bigtable.data.v2.stub.changestream.ChangeStreamRecordMerger; @@ -64,6 +65,7 @@ * * Make this class non-final so that we can create a subclass to mock it. */ +@InternalExtensionOnly("Used in Changestream beam pipeline testing.") public class ChangeStreamMutation implements ChangeStreamRecord, Serializable { private static final long serialVersionUID = 8419520253162024218L; diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java index ccb46cf3fb..346b0b60a7 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java @@ -16,6 +16,7 @@ package com.google.cloud.bigtable.data.v2.models; import com.google.api.core.InternalApi; +import com.google.api.core.InternalExtensionOnly; import com.google.bigtable.v2.ReadChangeStreamResponse; import com.google.bigtable.v2.StreamContinuationToken; import com.google.common.base.MoreObjects; @@ -33,6 +34,7 @@ * A simple wrapper for {@link ReadChangeStreamResponse.CloseStream}. Make this class non-final so * that we can create a subclass to mock it. */ +@InternalExtensionOnly("Used in Changestream beam pipeline testing.") public class CloseStream implements ChangeStreamRecord, Serializable { private static final long serialVersionUID = 7316215828353608505L; private final Status status; diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java index da8586909d..40daa9d23a 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java @@ -16,6 +16,7 @@ package com.google.cloud.bigtable.data.v2.models; import com.google.api.core.InternalApi; +import com.google.api.core.InternalExtensionOnly; import com.google.auto.value.AutoValue; import com.google.bigtable.v2.ReadChangeStreamResponse; import com.google.protobuf.Timestamp; @@ -23,6 +24,7 @@ import javax.annotation.Nonnull; /** A simple wrapper for {@link ReadChangeStreamResponse.Heartbeat}. */ +@InternalExtensionOnly("Used in Changestream beam pipeline testing.") @AutoValue public abstract class Heartbeat implements ChangeStreamRecord, Serializable { private static final long serialVersionUID = 7316215828353608504L;