diff --git a/CHANGES.md b/CHANGES.md index bb1dd5a117a40..bda5937a93d02 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -71,6 +71,7 @@ ## Breaking Changes * X behavior was changed ([#X](https://github.com/apache/beam/issues/X)). +* Arrow version was bumped to 15.0.0 from 5.0.0 ([#30181](https://github.com/apache/beam/pull/30181)). ## Deprecations diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 8aaec52bb8c2c..9e86fedb0e99d 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -602,12 +602,12 @@ class BeamModulePlugin implements Plugin { def dbcp2_version = "2.9.0" def errorprone_version = "2.10.0" // Try to keep gax_version consistent with gax-grpc version in google_cloud_platform_libraries_bom - def gax_version = "2.39.0" + def gax_version = "2.41.0" def google_ads_version = "26.0.0" def google_clients_version = "2.0.0" def google_cloud_bigdataoss_version = "2.2.16" // Try to keep google_cloud_spanner_version consistent with google_cloud_spanner_bom in google_cloud_platform_libraries_bom - def google_cloud_spanner_version = "6.56.0" + def google_cloud_spanner_version = "6.57.0" def google_code_gson_version = "2.10.1" def google_oauth_clients_version = "1.34.1" // Try to keep grpc_version consistent with gRPC version in google_cloud_platform_libraries_bom @@ -626,7 +626,7 @@ class BeamModulePlugin implements Plugin { def log4j2_version = "2.20.0" def nemo_version = "0.1" // Try to keep netty_version consistent with the netty version in grpc_bom (includes grpc_netty) in google_cloud_platform_libraries_bom - def netty_version = "4.1.87.Final" + def netty_version = "4.1.100.Final" def postgres_version = "42.2.16" def powermock_version = "2.0.9" // Try to keep protobuf_version consistent with the protobuf version in google_cloud_platform_libraries_bom @@ -640,7 +640,8 @@ class BeamModulePlugin implements Plugin { def spark3_version = "3.2.2" def spotbugs_version = "4.0.6" def testcontainers_version = "1.17.3" - def arrow_version = "5.0.0" + // Try to keep arrow_version consistent with the arrow version in google_cloud_bigquery, managed by google_cloud_platform_libraries_bom + def arrow_version = "15.0.0" def jmh_version = "1.34" def jupiter_version = "5.7.0" @@ -756,9 +757,9 @@ class BeamModulePlugin implements Plugin { google_cloud_pubsub : "com.google.cloud:google-cloud-pubsub", // google_cloud_platform_libraries_bom sets version google_cloud_pubsublite : "com.google.cloud:google-cloud-pubsublite", // google_cloud_platform_libraries_bom sets version // The release notes shows the versions set by the BOM: - // https://github.com/googleapis/java-cloud-bom/releases/tag/v26.30.0 + // https://github.com/googleapis/java-cloud-bom/releases/tag/v26.31.0 // Update libraries-bom version on sdks/java/container/license_scripts/dep_urls_java.yaml - google_cloud_platform_libraries_bom : "com.google.cloud:libraries-bom:26.30.0", + google_cloud_platform_libraries_bom : "com.google.cloud:libraries-bom:26.31.0", google_cloud_spanner : "com.google.cloud:google-cloud-spanner", // google_cloud_platform_libraries_bom sets version google_cloud_spanner_test : "com.google.cloud:google-cloud-spanner:$google_cloud_spanner_version:tests", google_code_gson : "com.google.code.gson:gson:$google_code_gson_version", diff --git a/sdks/java/container/license_scripts/dep_urls_java.yaml b/sdks/java/container/license_scripts/dep_urls_java.yaml index 42a7c7a0a7444..a084805c70deb 100644 --- a/sdks/java/container/license_scripts/dep_urls_java.yaml +++ b/sdks/java/container/license_scripts/dep_urls_java.yaml @@ -46,7 +46,7 @@ jaxen: '1.1.6': type: "3-Clause BSD" libraries-bom: - '26.30.0': + '26.31.0': license: "https://raw.githubusercontent.com/GoogleCloudPlatform/cloud-opensource-java/master/LICENSE" type: "Apache License 2.0" paranamer: diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java index b0a1da5fb15e3..83fcf026cc1a1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java @@ -108,6 +108,7 @@ private static long estimatePrimitiveValue(Value v) { return 1; case INT64: case FLOAT64: + case ENUM: return 8; case DATE: case TIMESTAMP: @@ -116,6 +117,7 @@ private static long estimatePrimitiveValue(Value v) { case PG_NUMERIC: return v.isNull() ? 0 : v.getString().length(); case BYTES: + case PROTO: return v.isNull() ? 0 : v.getBytes().length(); case NUMERIC: // see @@ -141,6 +143,7 @@ private static long estimateArrayValue(Value v) { case BOOL: return v.getBoolArray().size(); case INT64: + case ENUM: return 8L * v.getInt64Array().size(); case FLOAT64: return 8L * v.getFloat64Array().size(); @@ -155,6 +158,7 @@ private static long estimateArrayValue(Value v) { } return totalLength; case BYTES: + case PROTO: totalLength = 0; for (ByteArray bytes : v.getBytesArray()) { if (bytes == null) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java index e7587365fe12f..1196dbe0a53c7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java @@ -35,6 +35,7 @@ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) public abstract class SpannerSchema implements Serializable { + abstract ImmutableList tables(); abstract Dialect dialect(); @@ -161,6 +162,7 @@ static Column create(String name, String spannerType, Dialect dialect) { public abstract Type getType(); private static Type parseSpannerType(String spannerType, Dialect dialect) { + String originalSpannerType = spannerType; spannerType = spannerType.toUpperCase(); switch (dialect) { case GOOGLE_STANDARD_SQL: @@ -193,10 +195,23 @@ private static Type parseSpannerType(String spannerType, Dialect dialect) { } if (spannerType.startsWith("ARRAY")) { // Substring "ARRAY" - String spannerArrayType = spannerType.substring(6, spannerType.length() - 1); + String spannerArrayType = + originalSpannerType.substring(6, originalSpannerType.length() - 1); Type itemType = parseSpannerType(spannerArrayType, dialect); return Type.array(itemType); } + if (spannerType.startsWith("PROTO")) { + // Substring "PROTO" + String spannerProtoType = + originalSpannerType.substring(6, originalSpannerType.length() - 1); + return Type.proto(spannerProtoType); + } + if (spannerType.startsWith("ENUM")) { + // Substring "ENUM" + String spannerEnumType = + originalSpannerType.substring(5, originalSpannerType.length() - 1); + return Type.protoEnum(spannerEnumType); + } throw new IllegalArgumentException("Unknown spanner type " + spannerType); case POSTGRESQL: if (spannerType.endsWith("[]")) { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java index ebabfa8b575fd..497e33d3cfc9c 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java @@ -145,6 +145,20 @@ public void primitiveArrays() throws Exception { "{\"key123\":\"value123\", \"key321\":\"value321\"}", "{\"key456\":\"value456\", \"key789\":600}")) .build(); + Mutation protoEnum = + Mutation.newInsertOrUpdateBuilder("test") + .set("one") + .toProtoEnumArray(ImmutableList.of(1L, 2L, 3L), "customer.app.TestEnum") + .build(); + Mutation protos = + Mutation.newInsertOrUpdateBuilder("test") + .set("bytes") + .toProtoMessageArray( + ImmutableList.of( + ByteArray.copyFrom("some_bytes".getBytes(UTF_8)), + ByteArray.copyFrom("some_bytes".getBytes(UTF_8))), + "customer.app.TestMessage") + .build(); assertThat(MutationSizeEstimator.sizeOf(int64), is(24L)); assertThat(MutationSizeEstimator.sizeOf(float64), is(16L)); assertThat(MutationSizeEstimator.sizeOf(bool), is(4L)); @@ -153,12 +167,19 @@ public void primitiveArrays() throws Exception { assertThat(MutationSizeEstimator.sizeOf(json), is(62L)); assertThat(MutationSizeEstimator.sizeOf(bytes), is(20L)); assertThat(MutationSizeEstimator.sizeOf(jsonb), is(77L)); + assertThat(MutationSizeEstimator.sizeOf(protoEnum), is(24L)); + assertThat(MutationSizeEstimator.sizeOf(protos), is(20L)); } @Test public void nullPrimitiveArrays() throws Exception { Mutation int64 = Mutation.newInsertOrUpdateBuilder("test").set("one").toInt64Array((long[]) null).build(); + Mutation protoEnum = + Mutation.newInsertOrUpdateBuilder("test") + .set("one") + .toProtoEnumArray(null, "customer.app.TestEnum") + .build(); Mutation float64 = Mutation.newInsertOrUpdateBuilder("test") .set("one") @@ -187,6 +208,7 @@ public void nullPrimitiveArrays() throws Exception { assertThat(MutationSizeEstimator.sizeOf(pgNumeric), is(0L)); assertThat(MutationSizeEstimator.sizeOf(json), is(0L)); assertThat(MutationSizeEstimator.sizeOf(jsonb), is(0L)); + assertThat(MutationSizeEstimator.sizeOf(protoEnum), is(0L)); } @Test @@ -235,6 +257,38 @@ public void bytes() throws Exception { assertThat(MutationSizeEstimator.sizeOf(deleteBytes), is(10L)); } + @Test + public void protos() throws Exception { + Mutation empty = + Mutation.newInsertOrUpdateBuilder("test") + .set("one") + .to(ByteArray.fromBase64(""), "customer.app.TestMessage") + .build(); + Mutation nullValue = + Mutation.newInsertOrUpdateBuilder("test") + .set("one") + .to((ByteArray) null, "customer.app.TestMessage") + .build(); + Mutation sample = + Mutation.newInsertOrUpdateBuilder("test") + .set("one") + .to(ByteArray.fromBase64("abcdabcd"), "customer.app.TestMessage") + .build(); + Mutation nullArray = + Mutation.newInsertOrUpdateBuilder("test") + .set("one") + .toProtoMessageArray(null, "customer.app.TestMessage") + .build(); + Mutation deleteBytes = + Mutation.delete("test", Key.of(ByteArray.copyFrom("some_bytes".getBytes(UTF_8)))); + + assertThat(MutationSizeEstimator.sizeOf(empty), is(0L)); + assertThat(MutationSizeEstimator.sizeOf(nullValue), is(0L)); + assertThat(MutationSizeEstimator.sizeOf(sample), is(6L)); + assertThat(MutationSizeEstimator.sizeOf(nullArray), is(0L)); + assertThat(MutationSizeEstimator.sizeOf(deleteBytes), is(10L)); + } + @Test public void jsons() throws Exception { Mutation empty = diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchemaTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchemaTest.java index 7ba345a24885d..166df1704ca8a 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchemaTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchemaTest.java @@ -38,12 +38,18 @@ public void testSingleTable() throws Exception { .addColumn("test", "maxKey", "STRING(MAX)") .addColumn("test", "numericVal", "NUMERIC") .addColumn("test", "jsonVal", "JSON") + .addColumn("test", "protoVal", "PROTO") + .addColumn("test", "enumVal", "ENUM") .build(); assertEquals(1, schema.getTables().size()); - assertEquals(4, schema.getColumns("test").size()); + assertEquals(6, schema.getColumns("test").size()); assertEquals(1, schema.getKeyParts("test").size()); assertEquals(Type.json(), schema.getColumns("test").get(3).getType()); + assertEquals( + Type.proto("customer.app.TestMessage"), schema.getColumns("test").get(4).getType()); + assertEquals( + Type.protoEnum("customer.app.TestEnum"), schema.getColumns("test").get(5).getType()); } @Test