Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Change types for Cloud Bigtable Changestream methods #1639

Merged
merged 3 commits into from
Feb 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion google-cloud-bigtable/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,41 @@
<differenceType>7006</differenceType>
<className>com/google/cloud/bigtable/data/v2/models/Heartbeat</className>
<method>*getEstimatedLowWatermark*</method>
<to>long</to>
<to>org.threeten.bp.Instant</to>
</difference>
<!-- change method return type is ok because CloseStream is InternalApi -->
<difference>
<differenceType>7006</differenceType>
<className>com/google/cloud/bigtable/data/v2/models/CloseStream</className>
<method>*getStatus*</method>
<to>com.google.cloud.bigtable.common.Status</to>
</difference>
<!-- change method return type is ok because ChangeStreamMutation is InternalApi -->
<difference>
<differenceType>7006</differenceType>
<className>com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation</className>
<method>*getCommitTimestamp*</method>
<to>org.threeten.bp.Instant</to>
</difference>
<!-- change method return type is ok because ChangeStreamMutation is InternalApi -->
<difference>
<differenceType>7006</differenceType>
<className>com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation</className>
<method>*getEstimatedLowWatermark*</method>
<to>org.threeten.bp.Instant</to>
</difference>
<!-- change method argument type is ok because ChangeStreamRecordAdapter is InternalApi -->
<difference>
<differenceType>7005</differenceType>
<className>com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordAdapter$ChangeStreamRecordBuilder</className>
<method>*</method>
<to>*</to>
</difference>
<!-- change method argument type is ok because ReadChangeStreamQuery is InternalApi -->
<difference>
<differenceType>7005</differenceType>
<className>com/google/cloud/bigtable/data/v2/models/ReadChangeStreamQuery</className>
<method>*</method>
<to>*</to>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@
package com.google.cloud.bigtable.common;

import com.google.common.base.Objects;
import java.io.Serializable;

/**
* The `Status` type defines a logical error model. Each `Status` message contains an error code and
* a error message.
*
* <p>This primarily wraps the protobuf {@link com.google.rpc.Status}.
*/
public final class Status {
public final class Status implements Serializable {
private static final long serialVersionUID = -5512896228725308380L;

public enum Code {
OK(com.google.rpc.Code.OK),
CANCELLED(com.google.rpc.Code.CANCELLED),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.protobuf.ByteString;
import java.io.Serializable;
import javax.annotation.Nonnull;
import org.threeten.bp.Instant;

/**
* A ChangeStreamMutation represents a list of mods(represented by List<{@link Entry}>) targeted at
Expand Down Expand Up @@ -72,7 +73,7 @@ public enum MutationType {
static Builder createUserMutation(
@Nonnull ByteString rowKey,
@Nonnull String sourceClusterId,
long commitTimestamp,
Instant commitTimestamp,
int tieBreaker) {
return builder()
.setRowKey(rowKey)
Expand All @@ -88,7 +89,7 @@ static Builder createUserMutation(
* mutation.
*/
static Builder createGcMutation(
@Nonnull ByteString rowKey, long commitTimestamp, int tieBreaker) {
@Nonnull ByteString rowKey, Instant commitTimestamp, int tieBreaker) {
return builder()
.setRowKey(rowKey)
.setType(MutationType.GARBAGE_COLLECTION)
Expand All @@ -110,7 +111,7 @@ static Builder createGcMutation(
public abstract String getSourceClusterId();

/** Get the commit timestamp of the current mutation. */
public abstract long getCommitTimestamp();
public abstract Instant getCommitTimestamp();

/**
* Get the tie breaker of the current mutation. This is used to resolve conflicts when multiple
Expand All @@ -123,7 +124,7 @@ static Builder createGcMutation(
public abstract String getToken();

/** Get the low watermark of the current mutation. */
public abstract long getEstimatedLowWatermark();
public abstract Instant getEstimatedLowWatermark();

/** Get the list of mods of the current mutation. */
@Nonnull
Expand All @@ -144,15 +145,15 @@ abstract static class Builder {

abstract Builder setSourceClusterId(@Nonnull String sourceClusterId);

abstract Builder setCommitTimestamp(long commitTimestamp);
abstract Builder setCommitTimestamp(Instant commitTimestamp);

abstract Builder setTieBreaker(int tieBreaker);

abstract ImmutableList.Builder<Entry> entriesBuilder();

abstract Builder setToken(@Nonnull String token);

abstract Builder setEstimatedLowWatermark(long estimatedLowWatermark);
abstract Builder setEstimatedLowWatermark(Instant estimatedLowWatermark);

Builder setCell(
@Nonnull String familyName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange;
import com.google.protobuf.ByteString;
import javax.annotation.Nonnull;
import org.threeten.bp.Instant;

/**
* An extension point that allows end users to plug in a custom implementation of logical change
Expand Down Expand Up @@ -115,15 +116,15 @@ interface ChangeStreamRecordBuilder<ChangeStreamRecordT> {
void startUserMutation(
@Nonnull ByteString rowKey,
@Nonnull String sourceClusterId,
long commitTimestamp,
Instant commitTimestamp,
int tieBreaker);

/**
* Called to start a new Garbage Collection ChangeStreamMutation. This will be called at most
* once. If called, the current change stream record must not include any close stream message
* or heartbeat.
*/
void startGcMutation(@Nonnull ByteString rowKey, long commitTimestamp, int tieBreaker);
void startGcMutation(@Nonnull ByteString rowKey, Instant commitTimestamp, int tieBreaker);

/** Called to add a DeleteFamily mod. */
void deleteFamily(@Nonnull String familyName);
Expand Down Expand Up @@ -164,7 +165,7 @@ void deleteCells(

/** Called once per stream record to signal that all mods have been processed (unless reset). */
ChangeStreamRecordT finishChangeStreamMutation(
@Nonnull String token, long estimatedLowWatermark);
@Nonnull String token, Instant estimatedLowWatermark);

/** Called when the current in progress change stream record should be dropped */
void reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import com.google.api.core.InternalApi;
import com.google.auto.value.AutoValue;
import com.google.bigtable.v2.ReadChangeStreamResponse;
import com.google.cloud.bigtable.common.Status;
import com.google.common.collect.ImmutableList;
import com.google.rpc.Status;
import java.io.Serializable;
import java.util.List;
import javax.annotation.Nonnull;
Expand All @@ -34,8 +34,9 @@ public abstract class CloseStream implements ChangeStreamRecord, Serializable {
private static final long serialVersionUID = 7316215828353608505L;

private static CloseStream create(
Status status, List<ChangeStreamContinuationToken> changeStreamContinuationTokens) {
return new AutoValue_CloseStream(status, changeStreamContinuationTokens);
com.google.rpc.Status status,
List<ChangeStreamContinuationToken> changeStreamContinuationTokens) {
return new AutoValue_CloseStream(Status.fromProto(status), changeStreamContinuationTokens);
}

/** Wraps the protobuf {@link ReadChangeStreamResponse.CloseStream}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.protobuf.ByteString;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.threeten.bp.Instant;

/**
* Default implementation of a {@link ChangeStreamRecordAdapter} that uses {@link
Expand Down Expand Up @@ -102,7 +103,7 @@ public ChangeStreamRecord onCloseStream(ReadChangeStreamResponse.CloseStream clo
public void startUserMutation(
@Nonnull ByteString rowKey,
@Nonnull String sourceClusterId,
long commitTimestamp,
Instant commitTimestamp,
int tieBreaker) {
this.changeStreamMutationBuilder =
ChangeStreamMutation.createUserMutation(
Expand All @@ -111,7 +112,8 @@ public void startUserMutation(

/** {@inheritDoc} */
@Override
public void startGcMutation(@Nonnull ByteString rowKey, long commitTimestamp, int tieBreaker) {
public void startGcMutation(
@Nonnull ByteString rowKey, Instant commitTimestamp, int tieBreaker) {
this.changeStreamMutationBuilder =
ChangeStreamMutation.createGcMutation(rowKey, commitTimestamp, tieBreaker);
}
Expand Down Expand Up @@ -156,7 +158,7 @@ public void finishCell() {
/** {@inheritDoc} */
@Override
public ChangeStreamRecord finishChangeStreamMutation(
@Nonnull String token, long estimatedLowWatermark) {
@Nonnull String token, Instant estimatedLowWatermark) {
this.changeStreamMutationBuilder.setToken(token);
this.changeStreamMutationBuilder.setEstimatedLowWatermark(estimatedLowWatermark);
return this.changeStreamMutationBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
import com.google.api.core.InternalApi;
import com.google.auto.value.AutoValue;
import com.google.bigtable.v2.ReadChangeStreamResponse;
import com.google.protobuf.util.Timestamps;
import java.io.Serializable;
import javax.annotation.Nonnull;
import org.threeten.bp.Instant;

/** A simple wrapper for {@link ReadChangeStreamResponse.Heartbeat}. */
@InternalApi("Intended for use by the BigtableIO in apache/beam only.")
Expand All @@ -29,20 +29,22 @@ public abstract class Heartbeat implements ChangeStreamRecord, Serializable {
private static final long serialVersionUID = 7316215828353608504L;

private static Heartbeat create(
ChangeStreamContinuationToken changeStreamContinuationToken, long estimatedLowWatermark) {
ChangeStreamContinuationToken changeStreamContinuationToken, Instant estimatedLowWatermark) {
return new AutoValue_Heartbeat(changeStreamContinuationToken, estimatedLowWatermark);
}

/** Wraps the protobuf {@link ReadChangeStreamResponse.Heartbeat}. */
static Heartbeat fromProto(@Nonnull ReadChangeStreamResponse.Heartbeat heartbeat) {
return create(
ChangeStreamContinuationToken.fromProto(heartbeat.getContinuationToken()),
Timestamps.toNanos(heartbeat.getEstimatedLowWatermark()));
Instant.ofEpochSecond(
heartbeat.getEstimatedLowWatermark().getSeconds(),
heartbeat.getEstimatedLowWatermark().getNanos()));
}

@InternalApi("Intended for use by the BigtableIO in apache/beam only.")
public abstract ChangeStreamContinuationToken getChangeStreamContinuationToken();

@InternalApi("Intended for use by the BigtableIO in apache/beam only.")
public abstract long getEstimatedLowWatermark();
public abstract Instant getEstimatedLowWatermark();
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import com.google.protobuf.Duration;
import com.google.protobuf.util.Timestamps;
import com.google.protobuf.Timestamp;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.List;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.threeten.bp.Instant;

/** A simple wrapper to construct a query for the ReadChangeStream RPC. */
@InternalApi("Intended for use by the BigtableIO in apache/beam only.")
Expand Down Expand Up @@ -142,18 +143,26 @@ public ReadChangeStreamQuery streamPartition(ByteStringRange range) {
return streamPartition(rangeBuilder.build());
}

/** Sets the startTime(Nanosecond) to read the change stream. */
public ReadChangeStreamQuery startTime(long value) {
/** Sets the startTime to read the change stream. */
public ReadChangeStreamQuery startTime(Instant value) {
Preconditions.checkState(
!builder.hasContinuationTokens(),
"startTime and continuationTokens can't be specified together");
builder.setStartTime(Timestamps.fromNanos(value));
builder.setStartTime(
Timestamp.newBuilder()
.setSeconds(value.getEpochSecond())
.setNanos(value.getNano())
.build());
return this;
}

/** Sets the endTime(Nanosecond) to read the change stream. */
public ReadChangeStreamQuery endTime(long value) {
builder.setEndTime(Timestamps.fromNanos(value));
/** Sets the endTime to read the change stream. */
public ReadChangeStreamQuery endTime(Instant value) {
builder.setEndTime(
Timestamp.newBuilder()
.setSeconds(value.getEpochSecond())
.setNanos(value.getNano())
.build());
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecordAdapter.ChangeStreamRecordBuilder;
import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange;
import com.google.common.base.Preconditions;
import com.google.protobuf.util.Timestamps;
import org.threeten.bp.Instant;

/**
* A state machine to produce change stream records from a stream of {@link
Expand Down Expand Up @@ -338,7 +338,9 @@ State handleMod(ReadChangeStreamResponse.DataChange dataChange, int index) {
"AWAITING_NEW_STREAM_RECORD: GC mutation shouldn't have source cluster id.");
builder.startGcMutation(
dataChange.getRowKey(),
Timestamps.toNanos(dataChange.getCommitTimestamp()),
Instant.ofEpochSecond(
dataChange.getCommitTimestamp().getSeconds(),
dataChange.getCommitTimestamp().getNanos()),
dataChange.getTiebreaker());
} else if (dataChange.getType() == Type.USER) {
validate(
Expand All @@ -347,7 +349,9 @@ State handleMod(ReadChangeStreamResponse.DataChange dataChange, int index) {
builder.startUserMutation(
dataChange.getRowKey(),
dataChange.getSourceClusterId(),
Timestamps.toNanos(dataChange.getCommitTimestamp()),
Instant.ofEpochSecond(
dataChange.getCommitTimestamp().getSeconds(),
dataChange.getCommitTimestamp().getNanos()),
dataChange.getTiebreaker());
} else {
validate(false, "AWAITING_NEW_STREAM_RECORD: Unexpected type: " + dataChange.getType());
Expand Down Expand Up @@ -591,7 +595,10 @@ private State checkAndFinishMutationIfNeeded(
validate(dataChange.hasEstimatedLowWatermark(), "Last data change missing lowWatermark");
completeChangeStreamRecord =
builder.finishChangeStreamMutation(
dataChange.getToken(), Timestamps.toNanos(dataChange.getEstimatedLowWatermark()));
dataChange.getToken(),
Instant.ofEpochSecond(
dataChange.getEstimatedLowWatermark().getSeconds(),
dataChange.getEstimatedLowWatermark().getNanos()));
return AWAITING_STREAM_RECORD_CONSUME;
}
// Case 2_2): The current DataChange itself is chunked, so wait for the next
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
import static com.google.common.truth.Truth.assertWithMessage;

import com.google.rpc.Code;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand Down Expand Up @@ -89,4 +94,23 @@ public void testToProto() {

assertThat(model.toString()).isEqualTo(proto.toString());
}

@Test
public void testSerialization() throws IOException, ClassNotFoundException {
com.google.rpc.Status proto =
com.google.rpc.Status.newBuilder()
.setCode(Code.UNAVAILABLE.getNumber())
.setMessage("some message")
.build();

Status model = Status.fromProto(proto);

ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(model);
oos.close();
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray()));
Status actual = (Status) ois.readObject();
assertThat(actual).isEqualTo(model);
}
}
Loading