diff --git a/CHANGES.md b/CHANGES.md index cd3bcd3e177a..f9f29c3aa906 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -91,6 +91,7 @@ * Python SDK: Legacy runner support removed from Dataflow, all pipelines must use runner v2. * [Python] Dataflow Runner will no longer stage Beam SDK from PyPI in the `--staging_location` at pipeline submission. Custom container images that are not based on Beam's default image must include Apache Beam installation.([#26996](https://github.com/apache/beam/issues/26996)) +* SDK Java Extensions SQL: remove fastjson library, table property is changed to ObjectNode which belongs to jackson library. ([#24154](https://github.com/apache/beam/issues/24154)) ## Deprecations diff --git a/playground/backend/containers/java/Dockerfile b/playground/backend/containers/java/Dockerfile index 3688c435080d..f9d1b24b96d2 100644 --- a/playground/backend/containers/java/Dockerfile +++ b/playground/backend/containers/java/Dockerfile @@ -65,7 +65,6 @@ ENV BEAM_SDK="SDK_JAVA" ENV PROPERTY_PATH=/opt/playground/backend/properties.yaml ARG CALCITE_VERSION=1_28_0 ARG BYTEBUDDY_VERSION=1.12.14 -ARG FASTJSON_VERSION=1.2.69 ARG JANINO_VERSION=3.0.11 # Copy build result @@ -103,9 +102,6 @@ RUN wget https://repo1.maven.org/maven2/org/apache/beam/beam-vendor-calcite-$CAL RUN wget https://repo1.maven.org/maven2/net/bytebuddy/byte-buddy/$BYTEBUDDY_VERSION/byte-buddy-$BYTEBUDDY_VERSION.jar &&\ mv byte-buddy-$BYTEBUDDY_VERSION.jar /opt/apache/beam/jars/byte-buddy-$BYTEBUDDY_VERSION.jar -RUN wget https://repo1.maven.org/maven2/com/alibaba/fastjson/$FASTJSON_VERSION/fastjson-$FASTJSON_VERSION.jar &&\ - mv fastjson-$FASTJSON_VERSION.jar /opt/apache/beam/jars/fastjson-$FASTJSON_VERSION.jar - RUN wget https://repo1.maven.org/maven2/org/codehaus/janino/janino/$JANINO_VERSION/janino-$JANINO_VERSION.jar &&\ mv janino-$JANINO_VERSION.jar /opt/apache/beam/jars/janino-$JANINO_VERSION.jar diff --git a/sdks/java/extensions/sql/build.gradle b/sdks/java/extensions/sql/build.gradle index 01b87ddcfd81..8a22bff69b7f 100644 --- a/sdks/java/extensions/sql/build.gradle +++ b/sdks/java/extensions/sql/build.gradle @@ -84,7 +84,6 @@ dependencies { implementation library.java.jackson_databind implementation library.java.joda_time implementation library.java.vendored_calcite_1_28_0 - implementation "com.alibaba:fastjson:1.2.69" implementation "org.codehaus.janino:janino:3.0.11" implementation "org.codehaus.janino:commons-compiler:3.0.11" implementation library.java.jackson_core diff --git a/sdks/java/extensions/sql/datacatalog/build.gradle b/sdks/java/extensions/sql/datacatalog/build.gradle index e4508d28d141..cb557cc80776 100644 --- a/sdks/java/extensions/sql/datacatalog/build.gradle +++ b/sdks/java/extensions/sql/datacatalog/build.gradle @@ -34,7 +34,7 @@ dependencies { implementation library.java.protobuf_java implementation library.java.slf4j_api implementation library.java.vendored_guava_32_1_2_jre - implementation "com.alibaba:fastjson:1.2.69" + implementation library.java.jackson_databind implementation project(path: ":sdks:java:core", configuration: "shadow") implementation "org.threeten:threetenbp:1.4.5" provided project(":sdks:java:extensions:sql") diff --git a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/BigQueryTableFactory.java b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/BigQueryTableFactory.java index e799875f88e3..172dad752d93 100644 --- a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/BigQueryTableFactory.java +++ b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/BigQueryTableFactory.java @@ -17,14 +17,13 @@ */ package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog; -import com.alibaba.fastjson.JSONObject; import com.google.cloud.datacatalog.v1beta1.Entry; import java.net.URI; import java.util.Optional; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.beam.sdk.extensions.sql.TableUtils; import org.apache.beam.sdk.extensions.sql.meta.Table; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; /** {@link TableFactory} that understands Data Catalog BigQuery entries. */ class BigQueryTableFactory implements TableFactory { @@ -49,7 +48,7 @@ public Optional tableBuilder(Entry entry) { return Optional.of( Table.builder() .location(getLocation(entry)) - .properties(new JSONObject(ImmutableMap.of("truncateTimestamps", truncateTimestamps))) + .properties(TableUtils.emptyProperties().put("truncateTimestamps", truncateTimestamps)) .type("bigquery") .comment("")); } diff --git a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/GcsTableFactory.java b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/GcsTableFactory.java index 39d963af2a80..3a85362bbeab 100644 --- a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/GcsTableFactory.java +++ b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/GcsTableFactory.java @@ -17,11 +17,11 @@ */ package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog; -import com.alibaba.fastjson.JSONObject; import com.google.cloud.datacatalog.v1beta1.Entry; import com.google.cloud.datacatalog.v1beta1.GcsFilesetSpec; import java.util.List; import java.util.Optional; +import org.apache.beam.sdk.extensions.sql.TableUtils; import org.apache.beam.sdk.extensions.sql.meta.Table; /** {@link TableFactory} that understands Data Catalog GCS entries. */ @@ -55,7 +55,7 @@ public Optional tableBuilder(Entry entry) { Table.builder() .type("text") .location(filePattern) - .properties(new JSONObject()) + .properties(TableUtils.emptyProperties()) .comment("")); } } diff --git a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/PubsubTableFactory.java b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/PubsubTableFactory.java index 656f4dd2202f..5688713f7ee8 100644 --- a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/PubsubTableFactory.java +++ b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/PubsubTableFactory.java @@ -17,12 +17,12 @@ */ package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog; -import com.alibaba.fastjson.JSONObject; import com.google.cloud.datacatalog.v1beta1.Entry; import java.net.URI; import java.util.Optional; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.beam.sdk.extensions.sql.TableUtils; import org.apache.beam.sdk.extensions.sql.meta.Table; /** {@link TableFactory} that understands Data Catalog Pubsub entries. */ @@ -42,7 +42,7 @@ public Optional tableBuilder(Entry entry) { return Optional.of( Table.builder() .location(getLocation(entry)) - .properties(new JSONObject()) + .properties(TableUtils.emptyProperties()) .type("pubsub") .comment("")); } diff --git a/sdks/java/extensions/sql/hcatalog/build.gradle b/sdks/java/extensions/sql/hcatalog/build.gradle index d7dc94467ff9..e8abf21b7c3e 100644 --- a/sdks/java/extensions/sql/hcatalog/build.gradle +++ b/sdks/java/extensions/sql/hcatalog/build.gradle @@ -41,7 +41,6 @@ dependencies { implementation project(":sdks:java:extensions:sql") implementation project(":sdks:java:io:hcatalog") implementation project(":sdks:java:core") - implementation "com.alibaba:fastjson:1.2.69" implementation library.java.vendored_guava_32_1_2_jre testImplementation project(":sdks:java:io:hcatalog").sourceSets.test.output diff --git a/sdks/java/extensions/sql/hcatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/hcatalog/DatabaseProvider.java b/sdks/java/extensions/sql/hcatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/hcatalog/DatabaseProvider.java index 90adeaf85f23..925cca4bcabb 100644 --- a/sdks/java/extensions/sql/hcatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/hcatalog/DatabaseProvider.java +++ b/sdks/java/extensions/sql/hcatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/hcatalog/DatabaseProvider.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.extensions.sql.meta.provider.hcatalog; -import com.alibaba.fastjson.JSONObject; import java.util.Map; import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; import org.apache.beam.sdk.extensions.sql.meta.Table; @@ -80,7 +79,6 @@ public Map getTables() { .schema(tableSchema.get()) .name(table) .location("") - .properties(new JSONObject()) .comment("") .type("hcatalog") .build(); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/TableUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/TableUtils.java new file mode 100644 index 000000000000..2e52a1bbf422 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/TableUtils.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.json.JsonReadFeature; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.util.Map; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; + +public class TableUtils { + private static final ObjectMapper objectMapper = + JsonMapper.builder() + .enable( + JsonReadFeature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, + JsonReadFeature.ALLOW_JAVA_COMMENTS, + JsonReadFeature.ALLOW_MISSING_VALUES, + JsonReadFeature.ALLOW_NON_NUMERIC_NUMBERS, + JsonReadFeature.ALLOW_LEADING_ZEROS_FOR_NUMBERS, + JsonReadFeature.ALLOW_SINGLE_QUOTES, + JsonReadFeature.ALLOW_TRAILING_COMMA, + JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS, + JsonReadFeature.ALLOW_UNQUOTED_FIELD_NAMES) + .build(); + + private TableUtils() { + // nothing here + } + + @VisibleForTesting + public static ObjectMapper getObjectMapper() { + return objectMapper; + } + + public static ObjectNode emptyProperties() { + return objectMapper.createObjectNode(); + } + + public static ObjectNode parseProperties(String json) { + try { + return (ObjectNode) objectMapper.readTree(json); + } catch (JsonProcessingException e) { + throw new RuntimeException("illegal table properties: " + json); + } + } + + public static Map convertNode2Map(JsonNode jsonNode) { + return objectMapper.convertValue(jsonNode, new TypeReference>() {}); + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java index a0007f779c69..e44a152eab04 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java @@ -17,13 +17,12 @@ */ package org.apache.beam.sdk.extensions.sql.impl.parser; -import static com.alibaba.fastjson.JSON.parseObject; import static org.apache.beam.sdk.schemas.Schema.toSchema; import static org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.util.Static.RESOURCE; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; -import com.alibaba.fastjson.JSONObject; import java.util.List; +import org.apache.beam.sdk.extensions.sql.TableUtils; import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteSchema; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; import org.apache.beam.sdk.extensions.sql.meta.Table; @@ -159,8 +158,8 @@ private Table toTable() { .location(SqlDdlNodes.getString(location)) .properties( (tblProperties == null) - ? new JSONObject() - : parseObject(SqlDdlNodes.getString(tblProperties))) + ? TableUtils.emptyProperties() + : TableUtils.parseProperties(SqlDdlNodes.getString(tblProperties))) .build(); } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java index 1fb5d9a510b2..23f301dd2455 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java @@ -17,9 +17,10 @@ */ package org.apache.beam.sdk.extensions.sql.meta; -import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.auto.value.AutoValue; import java.io.Serializable; +import org.apache.beam.sdk.extensions.sql.TableUtils; import org.apache.beam.sdk.schemas.Schema; import org.checkerframework.checker.nullness.qual.Nullable; @@ -37,12 +38,12 @@ public abstract class Table implements Serializable { public abstract @Nullable String getLocation(); - public abstract JSONObject getProperties(); + public abstract ObjectNode getProperties(); public abstract Builder toBuilder(); public static Builder builder() { - return new AutoValue_Table.Builder().properties(new JSONObject()); + return new AutoValue_Table.Builder().properties(TableUtils.emptyProperties()); } /** Builder class for {@link Table}. */ @@ -58,7 +59,7 @@ public abstract static class Builder { public abstract Builder location(String location); - public abstract Builder properties(JSONObject properties); + public abstract Builder properties(ObjectNode properties); public abstract Table build(); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapper.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapper.java index a8f3ce672ee0..1e2629353bc8 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapper.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapper.java @@ -19,8 +19,8 @@ import static org.apache.beam.sdk.util.RowJsonUtils.newObjectMapperWith; -import com.alibaba.fastjson.JSONObject; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.node.ObjectNode; import java.io.Serializable; import java.util.List; import org.apache.beam.sdk.annotations.Internal; @@ -66,7 +66,7 @@ public String getTableType() { @Override public BeamSqlTable buildBeamSqlTable(Table tableDefinition) { - JSONObject tableProperties = tableDefinition.getProperties(); + ObjectNode tableProperties = tableDefinition.getProperties(); try { RowJson.RowJsonDeserializer deserializer = @@ -84,8 +84,7 @@ public BeamSqlTable buildBeamSqlTable(Table tableDefinition) { } catch (InvalidConfigurationException | InvalidSchemaException e) { throw new InvalidTableException(e.getMessage()); } catch (JsonProcessingException e) { - throw new AssertionError( - "Failed to re-parse TBLPROPERTIES JSON " + tableProperties.toString()); + throw new AssertionError("Failed to re-parse TBLPROPERTIES JSON " + tableProperties); } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java index 2dc95bdd2b81..1898c28f670c 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java @@ -81,11 +81,11 @@ class BigQueryTable extends SchemaBaseBeamTable implements Serializable { this.conversionOptions = options; this.bqLocation = table.getLocation(); - if (table.getProperties().containsKey(METHOD_PROPERTY)) { + if (table.getProperties().has(METHOD_PROPERTY)) { List validMethods = Arrays.stream(Method.values()).map(Enum::toString).collect(Collectors.toList()); // toUpperCase should make it case-insensitive - String selectedMethod = table.getProperties().getString(METHOD_PROPERTY).toUpperCase(); + String selectedMethod = table.getProperties().get(METHOD_PROPERTY).asText().toUpperCase(); if (validMethods.contains(selectedMethod)) { method = Method.valueOf(selectedMethod); @@ -105,12 +105,12 @@ class BigQueryTable extends SchemaBaseBeamTable implements Serializable { LOG.info("BigQuery method is set to: {}", method); - if (table.getProperties().containsKey(WRITE_DISPOSITION_PROPERTY)) { + if (table.getProperties().has(WRITE_DISPOSITION_PROPERTY)) { List validWriteDispositions = Arrays.stream(WriteDisposition.values()).map(Enum::toString).collect(Collectors.toList()); // toUpperCase should make it case-insensitive String selectedWriteDisposition = - table.getProperties().getString(WRITE_DISPOSITION_PROPERTY).toUpperCase(); + table.getProperties().get(WRITE_DISPOSITION_PROPERTY).asText().toUpperCase(); if (validWriteDispositions.contains(selectedWriteDisposition)) { writeDisposition = WriteDisposition.valueOf(selectedWriteDisposition); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProvider.java index 2882de0194ff..2f6e6d1d8864 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProvider.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProvider.java @@ -17,9 +17,7 @@ */ package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull; - -import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.auto.service.AutoService; import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; import org.apache.beam.sdk.extensions.sql.meta.Table; @@ -56,10 +54,10 @@ public BeamSqlTable buildBeamSqlTable(Table table) { return new BigQueryTable(table, getConversionOptions(table.getProperties())); } - protected static ConversionOptions getConversionOptions(JSONObject properties) { + protected static ConversionOptions getConversionOptions(ObjectNode properties) { return ConversionOptions.builder() .setTruncateTimestamps( - firstNonNull(properties.getBoolean("truncateTimestamps"), false) + properties.path("truncateTimestamps").asBoolean(false) ? TruncateTimestamps.TRUNCATE : TruncateTimestamps.REJECT) .build(); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTable.java index 816b93969d22..60c722d32d2d 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTable.java @@ -23,7 +23,7 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps.newHashMap; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets.newHashSet; -import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.databind.node.ObjectNode; import java.io.Serializable; import java.util.Collection; import java.util.HashMap; @@ -84,9 +84,9 @@ public class BigtableTable extends SchemaBaseBeamTable implements Serializable { this.emulatorHost = host; } - JSONObject properties = table.getProperties(); - if (properties.containsKey(COLUMNS_MAPPING)) { - columnsMapping = parseColumnsMapping(properties.getString(COLUMNS_MAPPING)); + ObjectNode properties = table.getProperties(); + if (properties.has(COLUMNS_MAPPING)) { + columnsMapping = parseColumnsMapping(properties.get(COLUMNS_MAPPING).asText()); validateColumnsMapping(columnsMapping, schema); useFlatSchema = true; } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java index dd1feadfaa54..06cc4ea0beff 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java @@ -21,10 +21,12 @@ import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; -import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.auto.service.AutoService; import java.util.List; import java.util.Optional; +import org.apache.beam.sdk.extensions.sql.TableUtils; import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; import org.apache.beam.sdk.extensions.sql.meta.Table; import org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider; @@ -80,11 +82,11 @@ private static ParsedLocation parseLocation(String location) { return parsed; } - private static List mergeParam(Optional initial, @Nullable List toMerge) { + private static List mergeParam(Optional initial, @Nullable ArrayNode toMerge) { ImmutableList.Builder merged = ImmutableList.builder(); initial.ifPresent(merged::add); if (toMerge != null) { - toMerge.forEach(o -> merged.add(o.toString())); + toMerge.forEach(o -> merged.add(o.asText())); } return merged.build(); } @@ -92,23 +94,23 @@ private static List mergeParam(Optional initial, @Nullable List< @Override public BeamSqlTable buildBeamSqlTable(Table table) { Schema schema = table.getSchema(); - JSONObject properties = table.getProperties(); + ObjectNode properties = table.getProperties(); Optional parsedLocation = Optional.empty(); if (!Strings.isNullOrEmpty(table.getLocation())) { parsedLocation = Optional.of(parseLocation(checkArgumentNotNull(table.getLocation()))); } List topics = - mergeParam(parsedLocation.map(loc -> loc.topic), properties.getJSONArray("topics")); + mergeParam(parsedLocation.map(loc -> loc.topic), (ArrayNode) properties.get("topics")); List allBootstrapServers = mergeParam( parsedLocation.map(loc -> loc.brokerLocation), - properties.getJSONArray("bootstrap_servers")); + (ArrayNode) properties.get("bootstrap_servers")); String bootstrapServers = String.join(",", allBootstrapServers); Optional payloadFormat = - properties.containsKey("format") - ? Optional.of(properties.getString("format")) + properties.has("format") + ? Optional.of(properties.get("format").asText()) : Optional.empty(); if (Schemas.isNestedSchema(schema)) { Optional serializer = @@ -117,7 +119,7 @@ public BeamSqlTable buildBeamSqlTable(Table table) { PayloadSerializers.getSerializer( format, checkArgumentNotNull(schema.getField(PAYLOAD_FIELD).getType().getRowSchema()), - properties.getInnerMap())); + TableUtils.convertNode2Map(properties))); return new NestedPayloadKafkaTable(schema, bootstrapServers, topics, serializer); } else { /* @@ -130,7 +132,8 @@ public BeamSqlTable buildBeamSqlTable(Table table) { return new BeamKafkaCSVTable(schema, bootstrapServers, topics); } PayloadSerializer serializer = - PayloadSerializers.getSerializer(payloadFormat.get(), schema, properties.getInnerMap()); + PayloadSerializers.getSerializer( + payloadFormat.get(), schema, TableUtils.convertNode2Map(properties)); return new PayloadSerializerKafkaTable(schema, bootstrapServers, topics, serializer); } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsublite/PubsubLiteTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsublite/PubsubLiteTableProvider.java index 5d0a0ffb0fec..cdc6a4c05dff 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsublite/PubsubLiteTableProvider.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsublite/PubsubLiteTableProvider.java @@ -20,13 +20,14 @@ import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; -import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.auto.service.AutoService; import com.google.auto.value.AutoOneOf; import com.google.cloud.pubsublite.SubscriptionPath; import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.proto.PubSubMessage; import java.util.Optional; +import org.apache.beam.sdk.extensions.sql.TableUtils; import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; import org.apache.beam.sdk.extensions.sql.meta.Table; import org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider; @@ -78,15 +79,16 @@ public String getTableType() { return "pubsublite"; } - private static Optional getSerializer(Schema schema, JSONObject properties) { + private static Optional getSerializer(Schema schema, ObjectNode properties) { if (schema.getField("payload").getType().equals(FieldType.BYTES)) { checkArgument( - !properties.containsKey("format"), + !properties.has("format"), "Must not set the 'format' property if not unpacking payload."); return Optional.empty(); } - String format = properties.containsKey("format") ? properties.getString("format") : "json"; - return Optional.of(PayloadSerializers.getSerializer(format, schema, properties.getInnerMap())); + String format = properties.path("format").asText("json"); + return Optional.of( + PayloadSerializers.getSerializer(format, schema, TableUtils.convertNode2Map(properties))); } private static void checkFieldHasType(Field field, FieldType type) { @@ -165,9 +167,9 @@ private static RowHandler getRowHandler( private static PTransform, PCollection> addDlqIfPresent( - SimpleFunction transform, JSONObject properties) { - if (properties.containsKey("deadLetterQueue")) { - return new DeadLetteredTransform<>(transform, properties.getString("deadLetterQueue")); + SimpleFunction transform, ObjectNode properties) { + if (properties.has("deadLetterQueue")) { + return new DeadLetteredTransform<>(transform, properties.get("deadLetterQueue").asText()); } return MapElements.via(transform); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/seqgen/GenerateSequenceTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/seqgen/GenerateSequenceTable.java index eea7bc474681..e1924147b982 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/seqgen/GenerateSequenceTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/seqgen/GenerateSequenceTable.java @@ -44,8 +44,8 @@ class GenerateSequenceTable extends SchemaBaseBeamTable implements Serializable GenerateSequenceTable(Table table) { super(TABLE_SCHEMA); - if (table.getProperties().containsKey("elementsPerSecond")) { - elementsPerSecond = table.getProperties().getInteger("elementsPerSecond"); + if (table.getProperties().has("elementsPerSecond")) { + elementsPerSecond = table.getProperties().get("elementsPerSecond").asInt(); } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java index 051a794e0225..5e0851a3685e 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java @@ -152,10 +152,10 @@ public InMemoryTable(TableWithRows tableWithRows) { // The reason for introducing a property here is to simplify writing unit tests, testing // project and predicate push-down behavior when run separate and together. - if (tableWithRows.table.getProperties().containsKey(PUSH_DOWN_OPTION)) { + if (tableWithRows.table.getProperties().has(PUSH_DOWN_OPTION)) { options = PushDownOptions.valueOf( - tableWithRows.table.getProperties().getString(PUSH_DOWN_OPTION).toUpperCase()); + tableWithRows.table.getProperties().get(PUSH_DOWN_OPTION).asText().toUpperCase()); } else { options = PushDownOptions.NONE; } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java index 91dc137f4b1b..3ddd78ab232b 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java @@ -22,8 +22,8 @@ import static org.apache.beam.sdk.util.RowJsonUtils.jsonToRow; import static org.apache.beam.sdk.util.RowJsonUtils.newObjectMapperWith; -import com.alibaba.fastjson.JSONObject; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; import java.io.Serializable; @@ -52,7 +52,6 @@ import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.apache.commons.csv.CSVFormat; import org.checkerframework.checker.nullness.qual.Nullable; @@ -89,9 +88,9 @@ public BeamSqlTable buildBeamSqlTable(Table table) { Schema schema = table.getSchema(); String filePattern = table.getLocation(); - JSONObject properties = table.getProperties(); - String format = MoreObjects.firstNonNull(properties.getString("format"), "csv"); - String deadLetterFile = properties.getString("deadLetterFile"); + ObjectNode properties = table.getProperties(); + String format = properties.path("format").asText("csv"); + String deadLetterFile = properties.path("deadLetterFile").asText(null); // Backwards compatibility: previously "type": "text" meant CSV and "format" was where the // CSV format went. So assume that any other format is the CSV format. @@ -103,7 +102,7 @@ public BeamSqlTable buildBeamSqlTable(Table table) { switch (format) { case "csv": - String specifiedCsvFormat = properties.getString("csvformat"); + String specifiedCsvFormat = properties.path("csvformat").asText(null); CSVFormat csvFormat = specifiedCsvFormat != null ? CSVFormat.valueOf(specifiedCsvFormat) diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java index c43caf0f7fd8..704a9d4586e1 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java @@ -22,9 +22,10 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import java.util.stream.Stream; +import org.apache.beam.sdk.extensions.sql.TableUtils; import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.ParseException; import org.apache.beam.sdk.extensions.sql.impl.parser.impl.BeamSqlParserImpl; @@ -50,8 +51,8 @@ public void testParseCreateExternalTable_full() throws Exception { TestTableProvider tableProvider = new TestTableProvider(); BeamSqlEnv env = BeamSqlEnv.withTableProvider(tableProvider); - JSONObject properties = new JSONObject(); - JSONArray hello = new JSONArray(); + ObjectNode properties = TableUtils.emptyProperties(); + ArrayNode hello = TableUtils.getObjectMapper().createArrayNode(); hello.add("james"); hello.add("bond"); properties.put("hello", hello); @@ -115,8 +116,8 @@ public void testParseCreateExternalTable_withoutTableComment() throws Exception TestTableProvider tableProvider = new TestTableProvider(); BeamSqlEnv env = BeamSqlEnv.withTableProvider(tableProvider); - JSONObject properties = new JSONObject(); - JSONArray hello = new JSONArray(); + ObjectNode properties = TableUtils.emptyProperties(); + ArrayNode hello = TableUtils.getObjectMapper().createArrayNode(); hello.add("james"); hello.add("bond"); properties.put("hello", hello); @@ -145,7 +146,7 @@ public void testParseCreateExternalTable_withoutTblProperties() throws Exception + "COMMENT 'person table' \n" + "LOCATION '/home/admin/person'\n"); assertEquals( - mockTable("person", "text", "person table", new JSONObject()), + mockTable("person", "text", "person table", TableUtils.emptyProperties()), tableProvider.getTables().get("person")); } @@ -162,7 +163,7 @@ public void testParseCreateExternalTable_withoutLocation() throws Exception { + "COMMENT 'person table' \n"); assertEquals( - mockTable("person", "text", "person table", new JSONObject(), null), + mockTable("person", "text", "person table", TableUtils.emptyProperties(), null), tableProvider.getTables().get("person")); } @@ -180,7 +181,7 @@ public void testParseCreateExternalTable_minimal() throws Exception { .schema( Stream.of(Schema.Field.of("id", CalciteUtils.INTEGER).withNullable(true)) .collect(toSchema())) - .properties(new JSONObject()) + .properties(TableUtils.emptyProperties()) .build(), tableProvider.getTables().get("person")); } @@ -249,12 +250,12 @@ public void unparseAggregateFunction() { sqlWriter.toSqlString().getSql()); } - private static Table mockTable(String name, String type, String comment, JSONObject properties) { + private static Table mockTable(String name, String type, String comment, ObjectNode properties) { return mockTable(name, type, comment, properties, "/home/admin/" + name); } private static Table mockTable( - String name, String type, String comment, JSONObject properties, String location) { + String name, String type, String comment, ObjectNode properties, String location) { return Table.builder() .name(name) diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamAggregateProjectMergeRuleTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamAggregateProjectMergeRuleTest.java index 5d13af947777..593febb9f190 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamAggregateProjectMergeRuleTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamAggregateProjectMergeRuleTest.java @@ -22,7 +22,7 @@ import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; import static org.hamcrest.core.IsInstanceOf.instanceOf; -import com.alibaba.fastjson.JSON; +import org.apache.beam.sdk.extensions.sql.TableUtils; import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamAggregationRel; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel; @@ -150,7 +150,8 @@ private static Table getTable(String name, PushDownOptions options) { .comment(name + " table") .schema(BASIC_SCHEMA) .properties( - JSON.parseObject("{ " + PUSH_DOWN_OPTION + ": " + "\"" + options.toString() + "\" }")) + TableUtils.parseProperties( + "{ " + PUSH_DOWN_OPTION + ": " + "\"" + options.toString() + "\" }")) .type("test") .build(); } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/IOPushDownRuleTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/IOPushDownRuleTest.java index d8af7fbdea2e..32f59ddad79b 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/IOPushDownRuleTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/IOPushDownRuleTest.java @@ -23,10 +23,10 @@ import static org.hamcrest.core.IsEqual.equalTo; import static org.hamcrest.core.IsInstanceOf.instanceOf; -import com.alibaba.fastjson.JSON; import java.util.HashSet; import java.util.List; import java.util.Set; +import org.apache.beam.sdk.extensions.sql.TableUtils; import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; import org.apache.beam.sdk.extensions.sql.meta.Table; @@ -170,7 +170,8 @@ private static Table getTable(String name, PushDownOptions options) { .comment(name + " table") .schema(BASIC_SCHEMA) .properties( - JSON.parseObject("{ " + PUSH_DOWN_OPTION + ": " + "\"" + options.toString() + "\" }")) + TableUtils.parseProperties( + "{ " + PUSH_DOWN_OPTION + ": " + "\"" + options.toString() + "\" }")) .type("test") .build(); } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapperTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapperTest.java index e4f173a27e53..0bab4a109d61 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapperTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapperTest.java @@ -17,8 +17,8 @@ */ package org.apache.beam.sdk.extensions.sql.meta.provider; -import com.alibaba.fastjson.JSON; import java.util.List; +import org.apache.beam.sdk.extensions.sql.TableUtils; import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; import org.apache.beam.sdk.extensions.sql.meta.DefaultTableFilter; import org.apache.beam.sdk.extensions.sql.meta.Table; @@ -57,7 +57,7 @@ public class SchemaIOTableProviderWrapperTest { .name("table") .comment("table") .schema(inputSchema) - .properties(JSON.parseObject("{}")) + .properties(TableUtils.parseProperties("{}")) .type("test") .build(); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryFilterTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryFilterTest.java index 6c3c43f04727..9ef2c5ebbd89 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryFilterTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryFilterTest.java @@ -21,8 +21,8 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.instanceOf; -import com.alibaba.fastjson.JSON; import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair; +import org.apache.beam.sdk.extensions.sql.TableUtils; import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; @@ -110,7 +110,8 @@ private static Table getTable(String name, PushDownOptions options) { .comment(name + " table") .schema(BASIC_SCHEMA) .properties( - JSON.parseObject("{ " + PUSH_DOWN_OPTION + ": " + "\"" + options.toString() + "\" }")) + TableUtils.parseProperties( + "{ " + PUSH_DOWN_OPTION + ": " + "\"" + options.toString() + "\" }")) .type("test") .build(); } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProviderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProviderTest.java index d113b10060ee..4486d00885be 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProviderTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProviderTest.java @@ -25,8 +25,8 @@ import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; -import com.alibaba.fastjson.JSON; import java.util.stream.Stream; +import org.apache.beam.sdk.extensions.sql.TableUtils; import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; import org.apache.beam.sdk.extensions.sql.meta.Table; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method; @@ -67,7 +67,7 @@ public void testDefaultMethod_whenPropertiesAreNotSet() { public void testSelectDefaultMethodExplicitly() { Table table = fakeTableWithProperties( - "hello", "{ " + METHOD_PROPERTY + ": " + "\"" + Method.DEFAULT.toString() + "\" }"); + "hello", "{" + METHOD_PROPERTY + ": " + "\"" + Method.DEFAULT.toString() + "\" }"); BigQueryTable sqlTable = (BigQueryTable) provider.buildBeamSqlTable(table); assertEquals(Method.DEFAULT, sqlTable.method); @@ -77,7 +77,7 @@ public void testSelectDefaultMethodExplicitly() { public void testSelectDirectReadMethod() { Table table = fakeTableWithProperties( - "hello", "{ " + METHOD_PROPERTY + ": " + "\"" + Method.DIRECT_READ.toString() + "\" }"); + "hello", "{" + METHOD_PROPERTY + ": " + "\"" + Method.DIRECT_READ.toString() + "\" }"); BigQueryTable sqlTable = (BigQueryTable) provider.buildBeamSqlTable(table); assertEquals(Method.DIRECT_READ, sqlTable.method); @@ -87,7 +87,7 @@ public void testSelectDirectReadMethod() { public void testSelectExportMethod() { Table table = fakeTableWithProperties( - "hello", "{ " + METHOD_PROPERTY + ": " + "\"" + Method.EXPORT.toString() + "\" }"); + "hello", "{" + METHOD_PROPERTY + ": " + "\"" + Method.EXPORT.toString() + "\" }"); BigQueryTable sqlTable = (BigQueryTable) provider.buildBeamSqlTable(table); assertEquals(Method.EXPORT, sqlTable.method); @@ -143,7 +143,7 @@ public void testSelectWriteDispositionMethodEmpty() { @Test public void testRuntimeExceptionThrown_whenAnInvalidPropertyIsSpecified() { - Table table = fakeTableWithProperties("hello", "{ " + METHOD_PROPERTY + ": \"blahblah\" }"); + Table table = fakeTableWithProperties("hello", "{" + METHOD_PROPERTY + ": \"blahblah\" }"); assertThrows( RuntimeException.class, @@ -154,7 +154,7 @@ public void testRuntimeExceptionThrown_whenAnInvalidPropertyIsSpecified() { @Test public void testRuntimeExceptionThrown_whenAPropertyOfInvalidTypeIsSpecified() { - Table table = fakeTableWithProperties("hello", "{ " + METHOD_PROPERTY + ": 1337 }"); + Table table = fakeTableWithProperties("hello", "{" + METHOD_PROPERTY + ": 1337 }"); assertThrows( RuntimeException.class, @@ -188,7 +188,7 @@ private static Table fakeTableWithProperties(String name, String properties) { Schema.Field.nullable("name", Schema.FieldType.STRING)) .collect(toSchema())) .type("bigquery") - .properties(JSON.parseObject(properties)) + .properties(TableUtils.parseProperties(properties)) .build(); } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTableProvider.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTableProvider.java index ee0e06a37516..d6ab009e0e33 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTableProvider.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTableProvider.java @@ -17,8 +17,6 @@ */ package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull; - import java.util.HashMap; import java.util.Map; import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; @@ -59,7 +57,7 @@ public BeamSqlTable buildBeamSqlTable(Table table) { table, BigQueryUtils.ConversionOptions.builder() .setTruncateTimestamps( - firstNonNull(table.getProperties().getBoolean("truncateTimestamps"), false) + table.getProperties().path("truncateTimestamps").asBoolean(false) ? BigQueryUtils.ConversionOptions.TruncateTimestamps.TRUNCATE : BigQueryUtils.ConversionOptions.TruncateTimestamps.REJECT) .build()); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableFilterTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableFilterTest.java index dade31390f6a..6348edcc9730 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableFilterTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableFilterTest.java @@ -22,9 +22,9 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.instanceOf; -import com.alibaba.fastjson.JSON; import java.util.Arrays; import java.util.Collection; +import org.apache.beam.sdk.extensions.sql.TableUtils; import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; @@ -100,7 +100,8 @@ private static Table getTable(String name, PushDownOptions options) { .comment(name + " table") .schema(BASIC_SCHEMA) .properties( - JSON.parseObject("{ " + PUSH_DOWN_OPTION + ": " + "\"" + options.toString() + "\" }")) + TableUtils.parseProperties( + "{ " + PUSH_DOWN_OPTION + ": " + "\"" + options.toString() + "\" }")) .type("test") .build(); } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableFlatTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableFlatTest.java index 08082e85e0e1..6a038842becb 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableFlatTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableFlatTest.java @@ -36,7 +36,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.cloud.bigtable.emulator.v2.BigtableEmulatorRule; import java.io.IOException; import org.apache.beam.sdk.extensions.sql.BeamSqlCli; @@ -95,9 +95,9 @@ public void testCreatesFlatSchemaCorrectly() { assertNotNull(table); assertEquals(TEST_FLAT_SCHEMA, table.getSchema()); - JSONObject properties = table.getProperties(); - assertTrue(properties.containsKey(COLUMNS_MAPPING)); - assertEquals(columnsMappingString(), properties.getString(COLUMNS_MAPPING)); + ObjectNode properties = table.getProperties(); + assertTrue(properties.has(COLUMNS_MAPPING)); + assertEquals(columnsMappingString(), properties.get(COLUMNS_MAPPING).asText()); } @Test diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableAvroTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableAvroTest.java index 245df92e8850..1a1c06c76edf 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableAvroTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableAvroTest.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.extensions.sql.meta.provider.kafka; -import com.alibaba.fastjson.JSON; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.List; @@ -25,6 +24,7 @@ import org.apache.avro.generic.GenericRecordBuilder; import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils; +import org.apache.beam.sdk.extensions.sql.TableUtils; import org.apache.beam.sdk.extensions.sql.meta.Table; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.values.Row; @@ -94,7 +94,7 @@ protected BeamKafkaTable getBeamKafkaTable() { .type("kafka") .schema(TEST_SCHEMA) .location("localhost/mytopic") - .properties(JSON.parseObject("{ \"format\": \"avro\" }")) + .properties(TableUtils.parseProperties("{ \"format\": \"avro\" }")) .build()); } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableJsonTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableJsonTest.java index e5f00e010852..a3611b00c142 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableJsonTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableJsonTest.java @@ -19,8 +19,8 @@ import static java.nio.charset.StandardCharsets.UTF_8; -import com.alibaba.fastjson.JSON; import java.util.List; +import org.apache.beam.sdk.extensions.sql.TableUtils; import org.apache.beam.sdk.extensions.sql.meta.Table; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.values.Row; @@ -67,7 +67,7 @@ protected BeamKafkaTable getBeamKafkaTable() { .type("kafka") .schema(TEST_SCHEMA) .location("localhost/mytopic") - .properties(JSON.parseObject("{ \"format\": \"json\" }")) + .properties(TableUtils.parseProperties("{ \"format\": \"json\" }")) .build()); } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableProtoTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableProtoTest.java index b4bd4ec2e501..7fa94aa9daa3 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableProtoTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableProtoTest.java @@ -21,10 +21,10 @@ import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertThrows; -import com.alibaba.fastjson.JSON; import java.util.List; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.extensions.protobuf.PayloadMessages; +import org.apache.beam.sdk.extensions.sql.TableUtils; import org.apache.beam.sdk.extensions.sql.meta.Table; import org.apache.beam.sdk.io.kafka.KafkaRecordCoder; import org.apache.beam.sdk.io.kafka.ProducerRecordCoder; @@ -94,7 +94,7 @@ private static BeamKafkaTable getBeamKafkaTable(Schema schema) { .schema(schema) .location("localhost/mytopic") .properties( - JSON.parseObject( + TableUtils.parseProperties( "{ \"format\": \"proto\", \"protoClass\": \"" + PayloadMessages.TestMessage.class.getName() + "\" }")) diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableThriftTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableThriftTest.java index 2b457b6cc2f5..0463e4c65109 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableThriftTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableThriftTest.java @@ -21,9 +21,9 @@ import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertThrows; -import com.alibaba.fastjson.JSON; import java.util.List; import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.extensions.sql.TableUtils; import org.apache.beam.sdk.extensions.sql.meta.Table; import org.apache.beam.sdk.io.kafka.KafkaRecordCoder; import org.apache.beam.sdk.io.kafka.ProducerRecordCoder; @@ -100,7 +100,7 @@ private static BeamKafkaTable getBeamKafkaTable(Schema schema) { .schema(schema) .location("localhost/mytopic") .properties( - JSON.parseObject( + TableUtils.parseProperties( "{ \"format\": \"thrift\", \"thriftClass\": \"" + TestThriftMessage.class.getName() + "\", \"thriftProtocolFactoryClass\": \"" diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderIT.java index 97cd2c313a99..e9794f2f5b3d 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderIT.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderIT.java @@ -21,7 +21,6 @@ import static org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.beamRow2CsvLine; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; -import com.alibaba.fastjson.JSON; import java.io.Serializable; import java.util.Arrays; import java.util.Collection; @@ -40,6 +39,7 @@ import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils; import org.apache.beam.sdk.extensions.protobuf.PayloadMessages; import org.apache.beam.sdk.extensions.protobuf.ProtoMessageSchema; +import org.apache.beam.sdk.extensions.sql.TableUtils; import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils; import org.apache.beam.sdk.extensions.sql.meta.Table; @@ -153,7 +153,7 @@ public void testFake2() throws BeamKafkaTable.NoEstimationException { .location(buildLocation()) .schema(TEST_TABLE_SCHEMA) .type("kafka") - .properties(JSON.parseObject(objectsProvider.getKafkaPropertiesString())) + .properties(TableUtils.parseProperties(objectsProvider.getKafkaPropertiesString())) .build(); BeamKafkaTable kafkaTable = (BeamKafkaTable) new KafkaTableProvider().buildBeamSqlTable(table); produceSomeRecordsWithDelay(100, 20); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java index 795523a795c6..09211ba06ba0 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java @@ -21,10 +21,11 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import java.util.List; import org.apache.beam.sdk.extensions.protobuf.PayloadMessages; +import org.apache.beam.sdk.extensions.sql.TableUtils; import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; import org.apache.beam.sdk.extensions.sql.meta.Table; import org.apache.beam.sdk.io.thrift.payloads.SimpleThriftMessage; @@ -204,16 +205,20 @@ private static Table mockTable( @Nullable Class protoClass, @Nullable Class> thriftClass, @Nullable Class thriftProtocolFactoryClass) { - JSONObject properties = new JSONObject(); + ObjectNode properties = TableUtils.emptyProperties(); if (extraBootstrapServers != null) { - JSONArray bootstrapServers = new JSONArray(); - bootstrapServers.addAll(extraBootstrapServers); + ArrayNode bootstrapServers = TableUtils.getObjectMapper().createArrayNode(); + for (String server : extraBootstrapServers) { + bootstrapServers.add(server); + } properties.put("bootstrap_servers", bootstrapServers); } if (extraTopics != null) { - JSONArray topics = new JSONArray(); - topics.addAll(extraTopics); + ArrayNode topics = TableUtils.getObjectMapper().createArrayNode(); + for (String topic : extraTopics) { + topics.add(topic); + } properties.put("topics", topics); } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbFilterTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbFilterTest.java index aafaf9b9396d..648f033630a9 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbFilterTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbFilterTest.java @@ -21,9 +21,9 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.instanceOf; -import com.alibaba.fastjson.JSON; import java.util.Arrays; import java.util.Collection; +import org.apache.beam.sdk.extensions.sql.TableUtils; import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; @@ -115,7 +115,8 @@ private static Table getTable(String name, PushDownOptions options) { .comment(name + " table") .schema(BASIC_SCHEMA) .properties( - JSON.parseObject("{ " + PUSH_DOWN_OPTION + ": " + "\"" + options.toString() + "\" }")) + TableUtils.parseProperties( + "{ " + PUSH_DOWN_OPTION + ": " + "\"" + options.toString() + "\" }")) .type("test") .build(); } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubTableProviderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubTableProviderTest.java index e778e2f46e1a..95af75ed312c 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubTableProviderTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubTableProviderTest.java @@ -21,7 +21,7 @@ import static org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.VARCHAR; import static org.junit.Assert.assertEquals; -import com.alibaba.fastjson.JSON; +import org.apache.beam.sdk.extensions.sql.TableUtils; import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; import org.apache.beam.sdk.extensions.sql.meta.Table; import org.apache.beam.sdk.schemas.Schema; @@ -108,6 +108,6 @@ private static Table.Builder tableDefinition() { .location("projects/project/topics/topic") .schema(Schema.builder().build()) .type("pubsub") - .properties(JSON.parseObject("{ \"timestampAttributeKey\" : \"ts_field\" }")); + .properties(TableUtils.parseProperties("{ \"timestampAttributeKey\" : \"ts_field\" }")); } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsublite/PubsubLiteTableProviderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsublite/PubsubLiteTableProviderTest.java index 12bdfffe3ac0..4cdad7180b8c 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsublite/PubsubLiteTableProviderTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsublite/PubsubLiteTableProviderTest.java @@ -21,12 +21,12 @@ import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; -import com.alibaba.fastjson.JSONObject; import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.SubscriptionPath; import com.google.cloud.pubsublite.TopicPath; import java.util.Map; import java.util.function.Function; +import org.apache.beam.sdk.extensions.sql.TableUtils; import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; import org.apache.beam.sdk.extensions.sql.meta.Table; import org.apache.beam.sdk.schemas.Schema; @@ -65,7 +65,7 @@ private static BeamSqlTable makeTable( .name("testTable") .schema(schema) .location(location) - .properties(new JSONObject().fluentPutAll(properties)) + .properties(TableUtils.getObjectMapper().valueToTree(properties)) .build(); return PROVIDER.buildBeamSqlTable(table); } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderTest.java index c367afa9ae5a..a91b28210df6 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderTest.java @@ -19,7 +19,7 @@ import static org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider.PUSH_DOWN_OPTION; -import com.alibaba.fastjson.JSON; +import org.apache.beam.sdk.extensions.sql.TableUtils; import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; import org.apache.beam.sdk.extensions.sql.meta.Table; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider.PushDownOptions; @@ -120,8 +120,13 @@ private static Table getTable(String name) { .comment(name + " table") .schema(BASIC_SCHEMA) .properties( - JSON.parseObject( - "{ " + PUSH_DOWN_OPTION + ": " + "\"" + PushDownOptions.BOTH.toString() + "\" }")) + TableUtils.parseProperties( + "{ \"" + + PUSH_DOWN_OPTION + + "\": " + + "\"" + + PushDownOptions.BOTH.toString() + + "\" }")) .type("test") .build(); } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterAndProjectPushDown.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterAndProjectPushDown.java index 3eabb62e58fb..ce9f52ffea94 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterAndProjectPushDown.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterAndProjectPushDown.java @@ -24,8 +24,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; -import com.alibaba.fastjson.JSON; import java.util.List; +import org.apache.beam.sdk.extensions.sql.TableUtils; import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel; @@ -409,7 +409,8 @@ private static Table getTable(String name, PushDownOptions options) { .comment(name + " table") .schema(BASIC_SCHEMA) .properties( - JSON.parseObject("{ " + PUSH_DOWN_OPTION + ": " + "\"" + options.toString() + "\" }")) + TableUtils.parseProperties( + "{ " + PUSH_DOWN_OPTION + ": " + "\"" + options.toString() + "\" }")) .type("test") .build(); } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterPushDown.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterPushDown.java index 12d8a98cd00d..eae1dfb49ee8 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterPushDown.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterPushDown.java @@ -25,8 +25,8 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; -import com.alibaba.fastjson.JSON; import java.util.List; +import org.apache.beam.sdk.extensions.sql.TableUtils; import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel; @@ -290,7 +290,8 @@ private static Table getTable(String name, PushDownOptions options) { .comment(name + " table") .schema(BASIC_SCHEMA) .properties( - JSON.parseObject("{ " + PUSH_DOWN_OPTION + ": " + "\"" + options.toString() + "\" }")) + TableUtils.parseProperties( + "{ " + PUSH_DOWN_OPTION + ": " + "\"" + options.toString() + "\" }")) .type("test") .build(); } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithProjectPushDown.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithProjectPushDown.java index 6b94302acbf9..4fa45cbab9ec 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithProjectPushDown.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithProjectPushDown.java @@ -24,8 +24,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import com.alibaba.fastjson.JSON; import java.util.List; +import org.apache.beam.sdk.extensions.sql.TableUtils; import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel; @@ -255,7 +255,8 @@ private static Table getTable(String name, PushDownOptions options) { .comment(name + " table") .schema(BASIC_SCHEMA) .properties( - JSON.parseObject("{ " + PUSH_DOWN_OPTION + ": " + "\"" + options.toString() + "\" }")) + TableUtils.parseProperties( + "{ " + PUSH_DOWN_OPTION + ": " + "\"" + options.toString() + "\" }")) .type("test") .build(); } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java index 2cf835c38f64..825f3ed06485 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java @@ -22,10 +22,10 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import com.alibaba.fastjson.JSONObject; import java.util.HashMap; import java.util.Map; import java.util.stream.Stream; +import org.apache.beam.sdk.extensions.sql.TableUtils; import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; import org.apache.beam.sdk.extensions.sql.meta.Table; import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider; @@ -125,7 +125,7 @@ private static Table mockTable(String name, String type) { Schema.Field.nullable("name", Schema.FieldType.STRING)) .collect(toSchema())) .type(type) - .properties(new JSONObject()) + .properties(TableUtils.emptyProperties()) .build(); } diff --git a/sdks/java/extensions/sql/zetasql/build.gradle b/sdks/java/extensions/sql/zetasql/build.gradle index e3976400b6b8..8d6e2aac0bf4 100644 --- a/sdks/java/extensions/sql/zetasql/build.gradle +++ b/sdks/java/extensions/sql/zetasql/build.gradle @@ -61,7 +61,7 @@ dependencies { testImplementation library.java.hamcrest testImplementation library.java.mockito_core testImplementation library.java.quickcheck_core - testImplementation "com.alibaba:fastjson:1.2.69" + testImplementation library.java.jackson_databind testImplementation "org.codehaus.janino:janino:3.0.11" testCompileOnly project(":sdks:java:extensions:sql:udf-test-provider") testRuntimeOnly library.java.slf4j_jdk14 diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLPushDownTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLPushDownTest.java index 031eda7c4e51..fd2b7dc7ca5e 100644 --- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLPushDownTest.java +++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLPushDownTest.java @@ -22,7 +22,7 @@ import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; -import com.alibaba.fastjson.JSON; +import org.apache.beam.sdk.extensions.sql.TableUtils; import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.JdbcConnection; import org.apache.beam.sdk.extensions.sql.impl.JdbcDriver; @@ -220,7 +220,8 @@ private static Table getTable(String name, PushDownOptions options) { .comment(name + " table") .schema(BASIC_SCHEMA) .properties( - JSON.parseObject("{ " + PUSH_DOWN_OPTION + ": " + "\"" + options.toString() + "\" }")) + TableUtils.parseProperties( + "{ " + PUSH_DOWN_OPTION + ": " + "\"" + options.toString() + "\" }")) .type("test") .build(); } diff --git a/sdks/java/testing/tpcds/build.gradle b/sdks/java/testing/tpcds/build.gradle index 48538146fc27..87d7ed11407a 100644 --- a/sdks/java/testing/tpcds/build.gradle +++ b/sdks/java/testing/tpcds/build.gradle @@ -66,7 +66,7 @@ dependencies { implementation library.java.commons_csv implementation library.java.slf4j_api implementation "com.googlecode.json-simple:json-simple:1.1.1" - implementation "com.alibaba:fastjson:1.2.69" + implementation library.java.jackson_databind implementation project(":sdks:java:extensions:sql") implementation project(":sdks:java:extensions:sql:zetasql") implementation project(":sdks:java:io:parquet") diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java index 2a238cb1e9a5..4beab4bca578 100644 --- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java +++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java @@ -19,7 +19,7 @@ import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; -import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.databind.node.ObjectNode; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -30,6 +30,7 @@ import java.util.concurrent.Executors; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.sql.TableUtils; import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils; @@ -99,7 +100,7 @@ private static void registerAllTablesByBeamSqlEnv(BeamSqlEnv env, String dataSiz */ private static void registerAllTablesByInMemoryMetaStore( InMemoryMetaStore inMemoryMetaStore, String dataSize) throws Exception { - JSONObject properties = new JSONObject(); + ObjectNode properties = TableUtils.emptyProperties(); properties.put("csvformat", "InformixUnload"); Map schemaMap = TpcdsSchemas.getTpcdsSchemas();