Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to libraries-bom 26.31.0 , Add support for protos and enums for spanner dataflow templates #30181

Merged
merged 7 commits into from
Feb 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
## Breaking Changes

* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
* Arrow version was bumped to 15.0.0 from 5.0.0 ([#30181](https://github.com/apache/beam/pull/30181)).

## Deprecations

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,12 +602,12 @@ class BeamModulePlugin implements Plugin<Project> {
def dbcp2_version = "2.9.0"
def errorprone_version = "2.10.0"
// Try to keep gax_version consistent with gax-grpc version in google_cloud_platform_libraries_bom
def gax_version = "2.39.0"
def gax_version = "2.41.0"
def google_ads_version = "26.0.0"
def google_clients_version = "2.0.0"
def google_cloud_bigdataoss_version = "2.2.16"
// Try to keep google_cloud_spanner_version consistent with google_cloud_spanner_bom in google_cloud_platform_libraries_bom
def google_cloud_spanner_version = "6.56.0"
def google_cloud_spanner_version = "6.57.0"
def google_code_gson_version = "2.10.1"
def google_oauth_clients_version = "1.34.1"
// Try to keep grpc_version consistent with gRPC version in google_cloud_platform_libraries_bom
Expand All @@ -626,7 +626,7 @@ class BeamModulePlugin implements Plugin<Project> {
def log4j2_version = "2.20.0"
def nemo_version = "0.1"
// Try to keep netty_version consistent with the netty version in grpc_bom (includes grpc_netty) in google_cloud_platform_libraries_bom
def netty_version = "4.1.87.Final"
def netty_version = "4.1.100.Final"
def postgres_version = "42.2.16"
def powermock_version = "2.0.9"
// Try to keep protobuf_version consistent with the protobuf version in google_cloud_platform_libraries_bom
Expand All @@ -640,7 +640,8 @@ class BeamModulePlugin implements Plugin<Project> {
def spark3_version = "3.2.2"
def spotbugs_version = "4.0.6"
def testcontainers_version = "1.17.3"
def arrow_version = "5.0.0"
// Try to keep arrow_version consistent with the arrow version in google_cloud_bigquery, managed by google_cloud_platform_libraries_bom
def arrow_version = "15.0.0"
def jmh_version = "1.34"
def jupiter_version = "5.7.0"

Expand Down Expand Up @@ -756,9 +757,9 @@ class BeamModulePlugin implements Plugin<Project> {
google_cloud_pubsub : "com.google.cloud:google-cloud-pubsub", // google_cloud_platform_libraries_bom sets version
google_cloud_pubsublite : "com.google.cloud:google-cloud-pubsublite", // google_cloud_platform_libraries_bom sets version
// The release notes shows the versions set by the BOM:
// https://github.com/googleapis/java-cloud-bom/releases/tag/v26.30.0
// https://github.com/googleapis/java-cloud-bom/releases/tag/v26.31.0
// Update libraries-bom version on sdks/java/container/license_scripts/dep_urls_java.yaml
google_cloud_platform_libraries_bom : "com.google.cloud:libraries-bom:26.30.0",
google_cloud_platform_libraries_bom : "com.google.cloud:libraries-bom:26.31.0",
google_cloud_spanner : "com.google.cloud:google-cloud-spanner", // google_cloud_platform_libraries_bom sets version
google_cloud_spanner_test : "com.google.cloud:google-cloud-spanner:$google_cloud_spanner_version:tests",
google_code_gson : "com.google.code.gson:gson:$google_code_gson_version",
Expand Down
2 changes: 1 addition & 1 deletion sdks/java/container/license_scripts/dep_urls_java.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ jaxen:
'1.1.6':
type: "3-Clause BSD"
libraries-bom:
'26.30.0':
'26.31.0':
license: "https://raw.githubusercontent.com/GoogleCloudPlatform/cloud-opensource-java/master/LICENSE"
type: "Apache License 2.0"
paranamer:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ private static long estimatePrimitiveValue(Value v) {
return 1;
case INT64:
case FLOAT64:
case ENUM:
return 8;
case DATE:
case TIMESTAMP:
Expand All @@ -116,6 +117,7 @@ private static long estimatePrimitiveValue(Value v) {
case PG_NUMERIC:
return v.isNull() ? 0 : v.getString().length();
case BYTES:
case PROTO:
return v.isNull() ? 0 : v.getBytes().length();
case NUMERIC:
// see
Expand All @@ -141,6 +143,7 @@ private static long estimateArrayValue(Value v) {
case BOOL:
return v.getBoolArray().size();
case INT64:
case ENUM:
return 8L * v.getInt64Array().size();
case FLOAT64:
return 8L * v.getFloat64Array().size();
Expand All @@ -155,6 +158,7 @@ private static long estimateArrayValue(Value v) {
}
return totalLength;
case BYTES:
case PROTO:
totalLength = 0;
for (ByteArray bytes : v.getBytesArray()) {
if (bytes == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public abstract class SpannerSchema implements Serializable {

abstract ImmutableList<String> tables();

abstract Dialect dialect();
Expand Down Expand Up @@ -161,6 +162,7 @@ static Column create(String name, String spannerType, Dialect dialect) {
public abstract Type getType();

private static Type parseSpannerType(String spannerType, Dialect dialect) {
String originalSpannerType = spannerType;
spannerType = spannerType.toUpperCase();
switch (dialect) {
case GOOGLE_STANDARD_SQL:
Expand Down Expand Up @@ -193,10 +195,23 @@ private static Type parseSpannerType(String spannerType, Dialect dialect) {
}
if (spannerType.startsWith("ARRAY")) {
// Substring "ARRAY<xxx>"
String spannerArrayType = spannerType.substring(6, spannerType.length() - 1);
String spannerArrayType =
originalSpannerType.substring(6, originalSpannerType.length() - 1);
Type itemType = parseSpannerType(spannerArrayType, dialect);
return Type.array(itemType);
}
if (spannerType.startsWith("PROTO")) {
// Substring "PROTO<xxx>"
String spannerProtoType =
originalSpannerType.substring(6, originalSpannerType.length() - 1);
return Type.proto(spannerProtoType);
}
if (spannerType.startsWith("ENUM")) {
// Substring "ENUM<xxx>"
String spannerEnumType =
originalSpannerType.substring(5, originalSpannerType.length() - 1);
return Type.protoEnum(spannerEnumType);
}
throw new IllegalArgumentException("Unknown spanner type " + spannerType);
case POSTGRESQL:
if (spannerType.endsWith("[]")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,20 @@ public void primitiveArrays() throws Exception {
"{\"key123\":\"value123\", \"key321\":\"value321\"}",
"{\"key456\":\"value456\", \"key789\":600}"))
.build();
Mutation protoEnum =
Mutation.newInsertOrUpdateBuilder("test")
.set("one")
.toProtoEnumArray(ImmutableList.of(1L, 2L, 3L), "customer.app.TestEnum")
.build();
Mutation protos =
Mutation.newInsertOrUpdateBuilder("test")
.set("bytes")
.toProtoMessageArray(
ImmutableList.of(
ByteArray.copyFrom("some_bytes".getBytes(UTF_8)),
ByteArray.copyFrom("some_bytes".getBytes(UTF_8))),
"customer.app.TestMessage")
.build();
assertThat(MutationSizeEstimator.sizeOf(int64), is(24L));
assertThat(MutationSizeEstimator.sizeOf(float64), is(16L));
assertThat(MutationSizeEstimator.sizeOf(bool), is(4L));
Expand All @@ -153,12 +167,19 @@ public void primitiveArrays() throws Exception {
assertThat(MutationSizeEstimator.sizeOf(json), is(62L));
assertThat(MutationSizeEstimator.sizeOf(bytes), is(20L));
assertThat(MutationSizeEstimator.sizeOf(jsonb), is(77L));
assertThat(MutationSizeEstimator.sizeOf(protoEnum), is(24L));
assertThat(MutationSizeEstimator.sizeOf(protos), is(20L));
}

@Test
public void nullPrimitiveArrays() throws Exception {
Mutation int64 =
Mutation.newInsertOrUpdateBuilder("test").set("one").toInt64Array((long[]) null).build();
Mutation protoEnum =
Mutation.newInsertOrUpdateBuilder("test")
.set("one")
.toProtoEnumArray(null, "customer.app.TestEnum")
.build();
Mutation float64 =
Mutation.newInsertOrUpdateBuilder("test")
.set("one")
Expand Down Expand Up @@ -187,6 +208,7 @@ public void nullPrimitiveArrays() throws Exception {
assertThat(MutationSizeEstimator.sizeOf(pgNumeric), is(0L));
assertThat(MutationSizeEstimator.sizeOf(json), is(0L));
assertThat(MutationSizeEstimator.sizeOf(jsonb), is(0L));
assertThat(MutationSizeEstimator.sizeOf(protoEnum), is(0L));
}

@Test
Expand Down Expand Up @@ -235,6 +257,38 @@ public void bytes() throws Exception {
assertThat(MutationSizeEstimator.sizeOf(deleteBytes), is(10L));
}

@Test
public void protos() throws Exception {
Mutation empty =
Mutation.newInsertOrUpdateBuilder("test")
.set("one")
.to(ByteArray.fromBase64(""), "customer.app.TestMessage")
.build();
Mutation nullValue =
Mutation.newInsertOrUpdateBuilder("test")
.set("one")
.to((ByteArray) null, "customer.app.TestMessage")
.build();
Mutation sample =
Mutation.newInsertOrUpdateBuilder("test")
.set("one")
.to(ByteArray.fromBase64("abcdabcd"), "customer.app.TestMessage")
.build();
Mutation nullArray =
Mutation.newInsertOrUpdateBuilder("test")
.set("one")
.toProtoMessageArray(null, "customer.app.TestMessage")
.build();
Mutation deleteBytes =
Mutation.delete("test", Key.of(ByteArray.copyFrom("some_bytes".getBytes(UTF_8))));

assertThat(MutationSizeEstimator.sizeOf(empty), is(0L));
assertThat(MutationSizeEstimator.sizeOf(nullValue), is(0L));
assertThat(MutationSizeEstimator.sizeOf(sample), is(6L));
assertThat(MutationSizeEstimator.sizeOf(nullArray), is(0L));
assertThat(MutationSizeEstimator.sizeOf(deleteBytes), is(10L));
}

@Test
public void jsons() throws Exception {
Mutation empty =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,18 @@ public void testSingleTable() throws Exception {
.addColumn("test", "maxKey", "STRING(MAX)")
.addColumn("test", "numericVal", "NUMERIC")
.addColumn("test", "jsonVal", "JSON")
.addColumn("test", "protoVal", "PROTO<customer.app.TestMessage>")
.addColumn("test", "enumVal", "ENUM<customer.app.TestEnum>")
.build();

assertEquals(1, schema.getTables().size());
assertEquals(4, schema.getColumns("test").size());
assertEquals(6, schema.getColumns("test").size());
assertEquals(1, schema.getKeyParts("test").size());
assertEquals(Type.json(), schema.getColumns("test").get(3).getType());
assertEquals(
Type.proto("customer.app.TestMessage"), schema.getColumns("test").get(4).getType());
assertEquals(
Type.protoEnum("customer.app.TestEnum"), schema.getColumns("test").get(5).getType());
}

@Test
Expand Down
Loading