diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java index 3a2d938e31..23fb47bd82 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java @@ -182,6 +182,11 @@ Builder addToCell(@Nonnull String familyName, Value qualifier, Value timestamp, return this; } + Builder mergeToCell(@Nonnull String familyName, Value qualifier, Value timestamp, Value input) { + this.entriesBuilder().add(MergeToCell.create(familyName, qualifier, timestamp, input)); + return this; + } + abstract ChangeStreamMutation build(); } @@ -210,6 +215,13 @@ public RowMutation toRowMutation(@Nonnull String tableId) { addToCell.getQualifier(), addToCell.getTimestamp(), addToCell.getInput()); + } else if (entry instanceof MergeToCell) { + MergeToCell mergeToCell = (MergeToCell) entry; + rowMutation.mergeToCell( + mergeToCell.getFamily(), + mergeToCell.getQualifier(), + mergeToCell.getTimestamp(), + mergeToCell.getInput()); } else { throw new IllegalArgumentException("Unexpected Entry type."); } @@ -242,6 +254,13 @@ public RowMutationEntry toRowMutationEntry() { addToCell.getQualifier(), addToCell.getTimestamp(), addToCell.getInput()); + } else if (entry instanceof MergeToCell) { + MergeToCell mergeToCell = (MergeToCell) entry; + rowMutationEntry.mergeToCell( + mergeToCell.getFamily(), + mergeToCell.getQualifier(), + mergeToCell.getTimestamp(), + mergeToCell.getInput()); } else { throw new IllegalArgumentException("Unexpected Entry type."); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordAdapter.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordAdapter.java index 0fbe786753..40a71b1d3c 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordAdapter.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordAdapter.java @@ -141,6 +141,12 @@ void addToCell( @Nonnull Value timestamp, @Nonnull Value value); + void mergeToCell( + @Nonnull String familyName, + @Nonnull Value qualifier, + @Nonnull Value timestamp, + @Nonnull Value value); + /** * Called to start a SetCell. * diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapter.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapter.java index a6335f4076..d40ad7621c 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapter.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapter.java @@ -142,6 +142,15 @@ public void addToCell( this.changeStreamMutationBuilder.addToCell(familyName, qualifier, timestamp, input); } + @Override + public void mergeToCell( + @Nonnull String familyName, + @Nonnull Value qualifier, + @Nonnull Value timestamp, + @Nonnull Value input) { + this.changeStreamMutationBuilder.mergeToCell(familyName, qualifier, timestamp, input); + } + /** {@inheritDoc} */ @Override public void startCell(String family, ByteString qualifier, long timestampMicros) { diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/MergeToCell.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/MergeToCell.java new file mode 100644 index 0000000000..cca3aee182 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/MergeToCell.java @@ -0,0 +1,46 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.models; + +import com.google.api.core.InternalApi; +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import javax.annotation.Nonnull; + +/** Representation of an MergeToCell mod in a data change. */ +@InternalApi("Intended for use by the BigtableIO in apache/beam only.") +@AutoValue +public abstract class MergeToCell implements Entry, Serializable { + public static MergeToCell create( + @Nonnull String family, + @Nonnull Value qualifier, + @Nonnull Value timestamp, + @Nonnull Value input) { + return new AutoValue_MergeToCell(family, qualifier, timestamp, input); + } + + @Nonnull + public abstract String getFamily(); + + @Nonnull + public abstract Value getQualifier(); + + @Nonnull + public abstract Value getTimestamp(); + + @Nonnull + public abstract Value getInput(); +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Mutation.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Mutation.java index d2b23dd297..dc55756241 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Mutation.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Mutation.java @@ -21,6 +21,7 @@ import com.google.bigtable.v2.Mutation.DeleteFromColumn; import com.google.bigtable.v2.Mutation.DeleteFromFamily; import com.google.bigtable.v2.Mutation.DeleteFromRow; +import com.google.bigtable.v2.Mutation.MergeToCell; import com.google.bigtable.v2.Mutation.SetCell; import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange; import com.google.common.base.Preconditions; @@ -308,6 +309,24 @@ public Mutation addToCell( return this; } + @Override + public Mutation mergeToCell( + @Nonnull String familyName, + @Nonnull Value qualifier, + @Nonnull Value timestamp, + @Nonnull Value value) { + com.google.bigtable.v2.Mutation.Builder builder = com.google.bigtable.v2.Mutation.newBuilder(); + MergeToCell.Builder mergeToCellBuilder = builder.getMergeToCellBuilder(); + mergeToCellBuilder.setFamilyName(familyName); + + qualifier.buildTo(mergeToCellBuilder.getColumnQualifierBuilder()); + timestamp.buildTo(mergeToCellBuilder.getTimestampBuilder()); + value.buildTo(mergeToCellBuilder.getInputBuilder()); + + addMutation(builder.build()); + return this; + } + private void addMutation(com.google.bigtable.v2.Mutation mutation) { Preconditions.checkState(numMutations + 1 <= MAX_MUTATIONS, "Too many mutations per row"); Preconditions.checkState( diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/MutationApi.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/MutationApi.java index 612d1bb020..3a54f68748 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/MutationApi.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/MutationApi.java @@ -138,6 +138,20 @@ default T addToCell( return addToCell(familyName, ByteString.copyFromUtf8(qualifier), timestamp, value); } + /** + * Merges a ByteString accumulator value to a cell in an aggregate column family. + * + *
This is a convenience override that converts Strings to ByteStrings. + * + *
Note: The timestamp values are in microseconds but must match the granularity of the + * table(defaults to `MILLIS`). Therefore, the given value must be a multiple of 1000 (millisecond + * granularity). For example: `1571902339435000`. + */ + default T mergeToCell( + @Nonnull String familyName, @Nonnull String qualifier, long timestamp, ByteString value) { + return mergeToCell(familyName, ByteString.copyFromUtf8(qualifier), timestamp, value); + } + /** * Adds an int64 value to an aggregate cell. The column family must be an aggregate family and * have an "int64" input type or this mutation will be rejected. @@ -155,6 +169,22 @@ default T addToCell( Value.IntValue.create(input)); } + /** + * Merges a ByteString accumulator value to a cell in an aggregate column family. + * + *
Note: The timestamp values are in microseconds but must match the granularity of the + * table(defaults to `MILLIS`). Therefore, the given value must be a multiple of 1000 (millisecond + * granularity). For example: `1571902339435000`. + */ + default T mergeToCell( + @Nonnull String familyName, @Nonnull ByteString qualifier, long timestamp, ByteString input) { + return mergeToCell( + familyName, + Value.RawValue.create(qualifier), + Value.RawTimestamp.create(timestamp), + Value.RawValue.create(input)); + } + /** * Adds a {@link Value} to an aggregate cell. The column family must be an aggregate family and * have an input type matching the type of {@link Value} or this mutation will be rejected. @@ -168,4 +198,18 @@ T addToCell( @Nonnull Value qualifier, @Nonnull Value timestamp, @Nonnull Value input); + + /** + * Merges a {@link Value} accumulator to an aggregate cell. The column family must be an aggregate + * family or this mutation will be rejected. + * + *
Note: The timestamp values are in microseconds but must match the granularity of the
+ * table(defaults to `MILLIS`). Therefore, the given value must be a multiple of 1000 (millisecond
+ * granularity). For example: `1571902339435000`.
+ */
+ T mergeToCell(
+ @Nonnull String familyName,
+ @Nonnull Value qualifier,
+ @Nonnull Value timestamp,
+ @Nonnull Value input);
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/RowMutation.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/RowMutation.java
index 4dfe751225..cee0a37f19 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/RowMutation.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/RowMutation.java
@@ -237,6 +237,16 @@ public RowMutation addToCell(
return this;
}
+ @Override
+ public RowMutation mergeToCell(
+ @Nonnull String familyName,
+ @Nonnull Value qualifier,
+ @Nonnull Value timestamp,
+ @Nonnull Value input) {
+ mutation.mergeToCell(familyName, qualifier, timestamp, input);
+ return this;
+ }
+
@InternalApi
public MutateRowRequest toProto(RequestContext requestContext) {
MutateRowRequest.Builder builder = MutateRowRequest.newBuilder();
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/RowMutationEntry.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/RowMutationEntry.java
index ede90eb6ac..80ffe53737 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/RowMutationEntry.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/RowMutationEntry.java
@@ -190,6 +190,16 @@ public RowMutationEntry addToCell(
return this;
}
+ @Override
+ public RowMutationEntry mergeToCell(
+ @Nonnull String familyName,
+ @Nonnull Value qualifier,
+ @Nonnull Value timestamp,
+ @Nonnull Value input) {
+ mutation.mergeToCell(familyName, qualifier, timestamp, input);
+ return this;
+ }
+
@InternalApi
public MutateRowsRequest.Entry toProto() {
Preconditions.checkArgument(
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachine.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachine.java
index 912b55eceb..b41acc4ac3 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachine.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachine.java
@@ -485,6 +485,16 @@ State handleDataChange(ReadChangeStreamResponse.DataChange dataChange) {
Value.fromProto(mod.getAddToCell().getColumnQualifier()),
Value.fromProto(mod.getAddToCell().getTimestamp()),
Value.fromProto(mod.getAddToCell().getInput()));
+ continue;
+ }
+ // Case 5: MergeToCell
+ if (mod.hasMergeToCell()) {
+ builder.mergeToCell(
+ mod.getMergeToCell().getFamilyName(),
+ Value.fromProto(mod.getMergeToCell().getColumnQualifier()),
+ Value.fromProto(mod.getMergeToCell().getTimestamp()),
+ Value.fromProto(mod.getMergeToCell().getInput()));
+ continue;
}
throw new IllegalStateException(
"Received unknown mod type. You may need to upgrade your Bigtable client.");
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutationTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutationTest.java
index 948c083224..61c028cdb6 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutationTest.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutationTest.java
@@ -67,6 +67,11 @@ public void userInitiatedMutationTest() throws IOException, ClassNotFoundExcepti
Value.rawValue(ByteString.copyFromUtf8("col1")),
Value.rawTimestamp(1000),
Value.intValue(1234))
+ .mergeToCell(
+ "agg-family",
+ Value.rawValue(ByteString.copyFromUtf8("col2")),
+ Value.rawTimestamp(1000),
+ Value.rawValue(ByteString.copyFrom(Longs.toByteArray(1234L))))
.setToken("fake-token")
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK)
.build();
@@ -150,6 +155,11 @@ public void toRowMutationTest() {
Value.rawValue(ByteString.copyFromUtf8("qual1")),
Value.rawTimestamp(1000),
Value.intValue(1234))
+ .mergeToCell(
+ "agg-family",
+ Value.rawValue(ByteString.copyFromUtf8("qual2")),
+ Value.rawTimestamp(1000),
+ Value.rawValue(ByteString.copyFrom(Longs.toByteArray(1234L))))
.setToken("fake-token")
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK)
.build();
@@ -161,7 +171,7 @@ public void toRowMutationTest() {
NameUtil.formatTableName(
REQUEST_CONTEXT.getProjectId(), REQUEST_CONTEXT.getInstanceId(), TABLE_ID);
assertThat(mutateRowRequest.getTableName()).isEqualTo(tableName);
- assertThat(mutateRowRequest.getMutationsList()).hasSize(4);
+ assertThat(mutateRowRequest.getMutationsList()).hasSize(5);
assertThat(mutateRowRequest.getMutations(0).getSetCell().getValue())
.isEqualTo(ByteString.copyFromUtf8("fake-value"));
assertThat(mutateRowRequest.getMutations(1).getDeleteFromFamily().getFamilyName())
@@ -178,6 +188,14 @@ public void toRowMutationTest() {
.setTimestamp(Value.rawTimestamp(1000).toProto())
.setInput(Value.intValue(1234).toProto())
.build());
+ assertThat(mutateRowRequest.getMutations(4).getMergeToCell())
+ .isEqualTo(
+ Mutation.MergeToCell.newBuilder()
+ .setFamilyName("agg-family")
+ .setColumnQualifier(Value.rawValue(ByteString.copyFromUtf8("qual2")).toProto())
+ .setTimestamp(Value.rawTimestamp(1000).toProto())
+ .setInput(Value.rawValue(ByteString.copyFrom(Longs.toByteArray(1234L))).toProto())
+ .build());
}
@Test
@@ -220,6 +238,11 @@ public void toRowMutationEntryTest() {
Value.rawValue(ByteString.copyFromUtf8("qual1")),
Value.rawTimestamp(1000),
Value.intValue(1234))
+ .mergeToCell(
+ "agg-family",
+ Value.rawValue(ByteString.copyFromUtf8("qual2")),
+ Value.rawTimestamp(1000),
+ Value.rawValue(ByteString.copyFrom(Longs.toByteArray(1234L))))
.setToken("fake-token")
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK)
.build();
@@ -228,7 +251,7 @@ public void toRowMutationEntryTest() {
RowMutationEntry rowMutationEntry = changeStreamMutation.toRowMutationEntry();
MutateRowsRequest.Entry mutateRowsRequestEntry = rowMutationEntry.toProto();
assertThat(mutateRowsRequestEntry.getRowKey()).isEqualTo(ByteString.copyFromUtf8("key"));
- assertThat(mutateRowsRequestEntry.getMutationsList()).hasSize(4);
+ assertThat(mutateRowsRequestEntry.getMutationsList()).hasSize(5);
assertThat(mutateRowsRequestEntry.getMutations(0).getSetCell().getValue())
.isEqualTo(ByteString.copyFromUtf8("fake-value"));
assertThat(mutateRowsRequestEntry.getMutations(1).getDeleteFromFamily().getFamilyName())
@@ -245,6 +268,14 @@ public void toRowMutationEntryTest() {
.setTimestamp(Value.rawTimestamp(1000).toProto())
.setInput(Value.intValue(1234).toProto())
.build());
+ assertThat(mutateRowsRequestEntry.getMutations(4).getMergeToCell())
+ .isEqualTo(
+ Mutation.MergeToCell.newBuilder()
+ .setFamilyName("agg-family")
+ .setColumnQualifier(Value.rawValue(ByteString.copyFromUtf8("qual2")).toProto())
+ .setTimestamp(Value.rawTimestamp(1000).toProto())
+ .setInput(Value.rawValue(ByteString.copyFrom(Longs.toByteArray(1234L))).toProto())
+ .build());
}
@Test
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/MutationTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/MutationTest.java
index fca65f90f5..3ba1de6701 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/MutationTest.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/MutationTest.java
@@ -21,6 +21,7 @@
import com.google.bigtable.v2.Mutation.DeleteFromColumn;
import com.google.bigtable.v2.Mutation.DeleteFromFamily;
import com.google.bigtable.v2.Mutation.DeleteFromRow;
+import com.google.bigtable.v2.Mutation.MergeToCell;
import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange;
import com.google.common.primitives.Longs;
import com.google.protobuf.ByteString;
@@ -195,6 +196,21 @@ public void addToCellTest() {
assertThat(actual).containsExactly(builder.build());
}
+ @Test
+ public void mergeToCellTest() {
+ mutation.mergeToCell("cf1", "q", 10000, ByteString.copyFrom(Longs.toByteArray(1234L)));
+ List