Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add toByteString/fromByteString for ChangeStreamContinuationToken #1346

Merged
merged 4 commits into from
Aug 11, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,60 +18,72 @@
import com.google.api.core.InternalApi;
import com.google.bigtable.v2.RowRange;
import com.google.bigtable.v2.StreamContinuationToken;
import com.google.bigtable.v2.StreamPartition;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.Serializable;
import javax.annotation.Nonnull;

/** A simple wrapper for {@link StreamContinuationToken}. */
public final class ChangeStreamContinuationToken implements Serializable {
private static final long serialVersionUID = 524679926247095L;

private transient StreamContinuationToken.Builder builder;
private final StreamContinuationToken tokenProto;

private ChangeStreamContinuationToken(@Nonnull StreamContinuationToken.Builder builder) {
this.builder = builder;
private ChangeStreamContinuationToken(@Nonnull StreamContinuationToken tokenProto) {
this.tokenProto = tokenProto;
}

private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException {
input.defaultReadObject();
builder = StreamContinuationToken.newBuilder().mergeFrom(input);
}

private void writeObject(ObjectOutputStream output) throws IOException {
output.defaultWriteObject();
builder.build().writeTo(output);
@InternalApi("Used in Changestream beam pipeline.")
public ChangeStreamContinuationToken(
@Nonnull ByteStringRange byteStringRange, @Nonnull String token) {
this.tokenProto =
StreamContinuationToken.newBuilder()
.setPartition(
StreamPartition.newBuilder()
.setRowRange(
RowRange.newBuilder()
.setStartKeyClosed(byteStringRange.getStart())
.setEndKeyOpen(byteStringRange.getEnd())
.build())
.build())
.setToken(token)
.build();
}

// TODO: Change this to return ByteStringRange.
public RowRange getRowRange() {
return this.builder.getPartition().getRowRange();
return this.tokenProto.getPartition().getRowRange();
}

public String getToken() {
return this.builder.getToken();
return this.tokenProto.getToken();
}

/**
* Creates the protobuf. This method is considered an internal implementation detail and not meant
* to be used by applications.
*/
@InternalApi("Used in Changestream beam pipeline.")
public StreamContinuationToken toProto() {
return builder.build();
// Creates the protobuf.
StreamContinuationToken toProto() {
return tokenProto;
}

/** Wraps the protobuf {@link StreamContinuationToken}. */
@InternalApi("Used in Changestream beam pipeline.")
public static ChangeStreamContinuationToken fromProto(
static ChangeStreamContinuationToken fromProto(
@Nonnull StreamContinuationToken streamContinuationToken) {
return new ChangeStreamContinuationToken(streamContinuationToken.toBuilder());
return new ChangeStreamContinuationToken(streamContinuationToken);
}

public ChangeStreamContinuationToken clone() {
return new ChangeStreamContinuationToken(this.builder.clone());
@InternalApi("Used in Changestream beam pipeline.")
public ByteString toByteString() {
return tokenProto.toByteString();
}

@InternalApi("Used in Changestream beam pipeline.")
public static ChangeStreamContinuationToken fromByteString(ByteString byteString)
throws InvalidProtocolBufferException {
return new ChangeStreamContinuationToken(
StreamContinuationToken.newBuilder().mergeFrom(byteString).build());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package com.google.cloud.bigtable.data.v2.models;

import com.google.api.core.InternalApi;
import com.google.bigtable.v2.ReadChangeStreamResponse.DataChange.Type;
import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange;
import com.google.cloud.bigtable.data.v2.stub.changestream.ChangeStreamRecordMerger;
Expand Down Expand Up @@ -62,8 +61,10 @@
* builder.deleteCells(...);
* ChangeStreamMutation changeStreamMutation = builder.setToken(...).setLowWatermark().build();
* }</pre>
*
* Make this class non-final so that we can create a subclass to mock it.
mutianf marked this conversation as resolved.
Show resolved Hide resolved
*/
public final class ChangeStreamMutation implements ChangeStreamRecord, Serializable {
public class ChangeStreamMutation implements ChangeStreamRecord, Serializable {
tengzhonger marked this conversation as resolved.
Show resolved Hide resolved
private static final long serialVersionUID = 8419520253162024218L;

private final ByteString rowKey;
Expand Down Expand Up @@ -100,8 +101,7 @@ private ChangeStreamMutation(Builder builder) {
* ChangeStreamMutation because `token` and `loWatermark` must be set later when we finish
* building the logical mutation.
*/
@InternalApi("Used in Changestream beam pipeline.")
public static Builder createUserMutation(
static Builder createUserMutation(
@Nonnull ByteString rowKey,
@Nonnull String sourceClusterId,
@Nonnull Timestamp commitTimestamp,
Expand All @@ -114,8 +114,7 @@ public static Builder createUserMutation(
* because `token` and `loWatermark` must be set later when we finish building the logical
* mutation.
*/
@InternalApi("Used in Changestream beam pipeline.")
public static Builder createGcMutation(
static Builder createGcMutation(
@Nonnull ByteString rowKey, @Nonnull Timestamp commitTimestamp, int tieBreaker) {
return new Builder(rowKey, Type.GARBAGE_COLLECTION, null, commitTimestamp, tieBreaker);
}
Expand Down Expand Up @@ -227,8 +226,7 @@ private Builder(ChangeStreamMutation changeStreamMutation) {
this.lowWatermark = changeStreamMutation.lowWatermark;
}

@InternalApi("Used in Changestream beam pipeline.")
public Builder setCell(
Builder setCell(
@Nonnull String familyName,
@Nonnull ByteString qualifier,
long timestamp,
Expand All @@ -237,35 +235,30 @@ public Builder setCell(
return this;
}

@InternalApi("Used in Changestream beam pipeline.")
public Builder deleteCells(
Builder deleteCells(
@Nonnull String familyName,
@Nonnull ByteString qualifier,
@Nonnull TimestampRange timestampRange) {
this.entries.add(DeleteCells.create(familyName, qualifier, timestampRange));
return this;
}

@InternalApi("Used in Changestream beam pipeline.")
public Builder deleteFamily(@Nonnull String familyName) {
Builder deleteFamily(@Nonnull String familyName) {
this.entries.add(DeleteFamily.create(familyName));
return this;
}

@InternalApi("Used in Changestream beam pipeline.")
public Builder setToken(@Nonnull String token) {
Builder setToken(@Nonnull String token) {
this.token = token;
return this;
}

@InternalApi("Used in Changestream beam pipeline.")
public Builder setLowWatermark(@Nonnull Timestamp lowWatermark) {
Builder setLowWatermark(@Nonnull Timestamp lowWatermark) {
this.lowWatermark = lowWatermark;
return this;
}

@InternalApi("Used in Changestream beam pipeline.")
public ChangeStreamMutation build() {
ChangeStreamMutation build() {
Preconditions.checkArgument(
token != null && lowWatermark != null,
"ChangeStreamMutation must have a continuation token and low watermark.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@
import java.util.List;
import javax.annotation.Nonnull;

public final class CloseStream implements ChangeStreamRecord, Serializable {
/**
* A simple wrapper for {@link ReadChangeStreamResponse.CloseStream}. Make this class non-final so
* that we can create a subclass to mock it.
mutianf marked this conversation as resolved.
Show resolved Hide resolved
*/
public class CloseStream implements ChangeStreamRecord, Serializable {
tengzhonger marked this conversation as resolved.
Show resolved Hide resolved
private static final long serialVersionUID = 7316215828353608505L;
private final Status status;
private transient ImmutableList.Builder<ChangeStreamContinuationToken>
Expand Down Expand Up @@ -69,8 +73,7 @@ private void writeObject(ObjectOutputStream output) throws IOException {
}

/** Wraps the protobuf {@link ReadChangeStreamResponse.CloseStream}. */
@InternalApi("Used in Changestream beam pipeline.")
public static CloseStream fromProto(@Nonnull ReadChangeStreamResponse.CloseStream closeStream) {
static CloseStream fromProto(@Nonnull ReadChangeStreamResponse.CloseStream closeStream) {
return new CloseStream(closeStream.getStatus(), closeStream.getContinuationTokensList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.Serializable;
import javax.annotation.Nonnull;

/** A simple wrapper for {@link ReadChangeStreamResponse.Heartbeat}. */
@AutoValue
public abstract class Heartbeat implements ChangeStreamRecord, Serializable {
private static final long serialVersionUID = 7316215828353608504L;
Expand All @@ -32,8 +33,7 @@ private static Heartbeat create(
}

/** Wraps the protobuf {@link ReadChangeStreamResponse.Heartbeat}. */
@InternalApi("Used in Changestream beam pipeline.")
public static Heartbeat fromProto(@Nonnull ReadChangeStreamResponse.Heartbeat heartbeat) {
static Heartbeat fromProto(@Nonnull ReadChangeStreamResponse.Heartbeat heartbeat) {
return create(
ChangeStreamContinuationToken.fromProto(heartbeat.getContinuationToken()),
heartbeat.getLowWatermark());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.models;

import static com.google.common.truth.Truth.assertThat;

import com.google.bigtable.v2.RowRange;
import com.google.bigtable.v2.StreamContinuationToken;
import com.google.bigtable.v2.StreamPartition;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class ChangeStreamContinuationTokenTest {

private final String TOKEN = "token";

private ByteStringRange createFakeByteStringRange() {
return ByteStringRange.create("a", "b");
}

// TODO: Get rid of this once we change ChangeStreamContinuationToken::getRowRange()
// to ChangeStreamContinuationToken::getByteStringRange().
private RowRange rowRangeFromByteStringRange(ByteStringRange byteStringRange) {
return RowRange.newBuilder()
.setStartKeyClosed(byteStringRange.getStart())
.setEndKeyOpen(byteStringRange.getEnd())
.build();
}

@Test
public void basicTest() throws Exception {
ByteStringRange byteStringRange = createFakeByteStringRange();
ChangeStreamContinuationToken changeStreamContinuationToken =
new ChangeStreamContinuationToken(byteStringRange, TOKEN);
Assert.assertEquals(
changeStreamContinuationToken.getRowRange(), rowRangeFromByteStringRange(byteStringRange));
Assert.assertEquals(changeStreamContinuationToken.getToken(), TOKEN);

ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(changeStreamContinuationToken);
oos.close();
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray()));
ChangeStreamContinuationToken actual = (ChangeStreamContinuationToken) ois.readObject();
assertThat(actual).isEqualTo(changeStreamContinuationToken);
}

@Test
public void fromProtoTest() {
ByteStringRange byteStringRange = createFakeByteStringRange();
RowRange fakeRowRange = rowRangeFromByteStringRange(byteStringRange);
StreamContinuationToken proto =
StreamContinuationToken.newBuilder()
.setPartition(StreamPartition.newBuilder().setRowRange(fakeRowRange).build())
.setToken(TOKEN)
.build();
ChangeStreamContinuationToken changeStreamContinuationToken =
ChangeStreamContinuationToken.fromProto(proto);
Assert.assertEquals(changeStreamContinuationToken.getRowRange(), fakeRowRange);
Assert.assertEquals(changeStreamContinuationToken.getToken(), TOKEN);
Assert.assertEquals(
changeStreamContinuationToken,
ChangeStreamContinuationToken.fromProto(changeStreamContinuationToken.toProto()));
}

@Test
public void toByteStringTest() throws Exception {
ByteStringRange byteStringRange = createFakeByteStringRange();
ChangeStreamContinuationToken changeStreamContinuationToken =
new ChangeStreamContinuationToken(byteStringRange, TOKEN);
Assert.assertEquals(
changeStreamContinuationToken.getRowRange(), rowRangeFromByteStringRange(byteStringRange));
Assert.assertEquals(changeStreamContinuationToken.getToken(), TOKEN);
Assert.assertEquals(
changeStreamContinuationToken,
ChangeStreamContinuationToken.fromByteString(changeStreamContinuationToken.toByteString()));
}
}