From 6f8476acdb1d82205e35250c6c56495a98b83eff Mon Sep 17 00:00:00 2001 From: wenningd Date: Wed, 9 Dec 2020 15:52:23 -0800 Subject: [PATCH] [HUDI-1040] Make Hudi support Spark 3 (#2208) * Fix flaky MOR unit test * Update Spark APIs to make it be compatible with both spark2 & spark3 * Refactor bulk insert v2 part to make Hudi be able to compile with Spark3 * Add spark3 profile to handle fasterxml & spark version * Create hudi-spark-common module & refactor hudi-spark related modules Co-authored-by: Wenning Ding --- LICENSE | 2 + README.md | 8 + .../client/utils/SparkRowDeserializer.java | 28 +++ .../org/apache/hudi/AvroConversionUtils.scala | 28 +-- .../hudi/io/TestHoodieRowCreateHandle.java | 7 +- .../TestHoodieInternalRowParquetWriter.java | 3 +- .../testutils/HoodieMergeOnReadTestUtils.java | 59 ++--- .../hudi/testutils/SparkDatasetTestUtils.java | 20 +- .../view/RemoteHoodieTableFileSystemView.java | 2 +- hudi-integ-test/pom.xml | 8 +- .../testsuite/reader/SparkBasedReader.java | 6 +- .../org/apache/hudi/integ/ITTestBase.java | 6 +- .../hudi-spark-common/pom.xml | 178 ++++++++++++++ .../java/org/apache/hudi/DataSourceUtils.java | 0 .../org/apache/hudi/DataSourceOptions.scala | 4 +- .../hudi-spark}/pom.xml | 22 +- .../hudi-spark}/run_hoodie_app.sh | 2 +- .../hudi-spark}/run_hoodie_generate_app.sh | 2 +- .../hudi-spark}/run_hoodie_streaming_app.sh | 2 +- .../apache/hudi/HoodieDataSourceHelpers.java | 0 .../hudi/HoodieDatasetBulkInsertHelper.java | 0 .../java/org/apache/hudi/QuickstartUtils.java | 0 .../SparkStreamingAsyncCompactService.java | 0 .../SparkParquetBootstrapDataProvider.java | 4 +- .../HoodieDeltaStreamerException.java | 0 .../hudi/payload/AWSDmsAvroPayload.java | 0 ...pache.spark.sql.sources.DataSourceRegister | 0 .../scala/org/apache/hudi/DefaultSource.scala | 0 .../org/apache/hudi/HoodieBootstrapRDD.scala | 0 .../apache/hudi/HoodieBootstrapRelation.scala | 0 .../org/apache/hudi/HoodieEmptyRelation.scala | 0 .../apache/hudi/HoodieMergeOnReadRDD.scala | 0 .../apache/hudi/HoodieSparkSqlWriter.scala | 8 +- .../org/apache/hudi/HoodieSparkUtils.scala | 121 ++++++++++ .../org/apache/hudi/HoodieStreamingSink.scala | 0 .../org/apache/hudi/HoodieWriterUtils.scala | 0 .../org/apache/hudi/IncrementalRelation.scala | 0 .../hudi/MergeOnReadSnapshotRelation.scala | 3 - .../main/scala/org/apache/hudi/package.scala | 0 .../src/test/java/HoodieJavaApp.java | 0 .../src/test/java/HoodieJavaGenerateApp.java | 0 .../src/test/java/HoodieJavaStreamingApp.java | 4 +- .../org/apache/hudi/TestDataSourceUtils.java | 0 .../TestHoodieDatasetBulkInsertHelper.java | 0 .../org/apache/hudi/client/TestBootstrap.java | 0 .../hudi/payload/TestAWSDmsAvroPayload.java | 0 .../hudi/testutils/DataSourceTestUtils.java | 0 .../src/test/resources/exampleSchema.txt | 0 .../resources/log4j-surefire-quiet.properties | 0 .../test/resources/log4j-surefire.properties | 0 .../hudi/TestAvroConversionHelper.scala | 0 .../apache/hudi/TestDataSourceDefaults.scala | 0 .../apache/hudi/TestHoodieSparkUtils.scala | 0 .../HoodieSparkSqlWriterSuite.scala | 0 .../hudi/functional/TestCOWDataSource.scala | 0 .../TestDataSourceForBootstrap.scala | 0 .../hudi/functional/TestMORDataSource.scala | 0 .../functional/TestStructuredStreaming.scala | 4 +- hudi-spark-datasource/hudi-spark2/pom.xml | 225 ++++++++++++++++++ .../apache/hudi/internal/DefaultSource.java | 0 .../HoodieBulkInsertDataInternalWriter.java | 0 ...ieBulkInsertDataInternalWriterFactory.java | 0 .../HoodieDataSourceInternalWriter.java | 0 .../internal/HoodieWriterCommitMessage.java | 0 .../apache/hudi/Spark2RowDeserializer.scala | 30 +++ ...estHoodieBulkInsertDataInternalWriter.java | 12 +- .../TestHoodieDataSourceInternalWriter.java | 15 +- hudi-spark-datasource/hudi-spark3/pom.xml | 163 +++++++++++++ .../apache/hudi/Spark3RowDeserializer.scala | 33 +++ hudi-spark-datasource/pom.xml | 39 +++ .../org/apache/hudi/HoodieSparkUtils.scala | 50 ---- hudi-utilities/pom.xml | 15 ++ .../apache/hudi/utilities/UtilHelpers.java | 4 +- .../utilities/deltastreamer/DeltaSync.java | 6 +- .../deltastreamer/SourceFormatAdapter.java | 5 +- packaging/hudi-integ-test-bundle/pom.xml | 14 ++ packaging/hudi-spark-bundle/pom.xml | 18 ++ packaging/hudi-utilities-bundle/pom.xml | 19 ++ pom.xml | 33 ++- 79 files changed, 1040 insertions(+), 172 deletions(-) create mode 100644 hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkRowDeserializer.java create mode 100644 hudi-spark-datasource/hudi-spark-common/pom.xml rename {hudi-spark => hudi-spark-datasource/hudi-spark-common}/src/main/java/org/apache/hudi/DataSourceUtils.java (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark-common}/src/main/scala/org/apache/hudi/DataSourceOptions.scala (99%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/pom.xml (94%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/run_hoodie_app.sh (93%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/run_hoodie_generate_app.sh (93%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/run_hoodie_streaming_app.sh (94%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/main/java/org/apache/hudi/QuickstartUtils.java (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/main/java/org/apache/hudi/async/SparkStreamingAsyncCompactService.java (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java (95%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/main/java/org/apache/hudi/exception/HoodieDeltaStreamerException.java (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/main/scala/org/apache/hudi/DefaultSource.scala (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/main/scala/org/apache/hudi/HoodieEmptyRelation.scala (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala (98%) create mode 100644 hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/main/scala/org/apache/hudi/IncrementalRelation.scala (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala (97%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/main/scala/org/apache/hudi/package.scala (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/test/java/HoodieJavaApp.java (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/test/java/HoodieJavaGenerateApp.java (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/test/java/HoodieJavaStreamingApp.java (99%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/test/java/org/apache/hudi/TestDataSourceUtils.java (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/test/java/org/apache/hudi/TestHoodieDatasetBulkInsertHelper.java (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/test/java/org/apache/hudi/client/TestBootstrap.java (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/test/java/org/apache/hudi/payload/TestAWSDmsAvroPayload.java (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/test/resources/exampleSchema.txt (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/test/resources/log4j-surefire-quiet.properties (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/test/resources/log4j-surefire.properties (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/test/scala/org/apache/hudi/TestAvroConversionHelper.scala (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark}/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala (98%) create mode 100644 hudi-spark-datasource/hudi-spark2/pom.xml rename {hudi-spark => hudi-spark-datasource/hudi-spark2}/src/main/java/org/apache/hudi/internal/DefaultSource.java (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark2}/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark2}/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriterFactory.java (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark2}/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java (100%) rename {hudi-spark => hudi-spark-datasource/hudi-spark2}/src/main/java/org/apache/hudi/internal/HoodieWriterCommitMessage.java (100%) create mode 100644 hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/hudi/Spark2RowDeserializer.scala rename {hudi-spark => hudi-spark-datasource/hudi-spark2}/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java (96%) rename {hudi-spark => hudi-spark-datasource/hudi-spark2}/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java (96%) create mode 100644 hudi-spark-datasource/hudi-spark3/pom.xml create mode 100644 hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/hudi/Spark3RowDeserializer.scala create mode 100644 hudi-spark-datasource/pom.xml delete mode 100644 hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala diff --git a/LICENSE b/LICENSE index 1e2174731f16..385191d1b9ef 100644 --- a/LICENSE +++ b/LICENSE @@ -246,6 +246,8 @@ This product includes code from Apache Spark * org.apache.hudi.AvroConversionHelper copied from classes in org/apache/spark/sql/avro package +* org.apache.hudi.HoodieSparkUtils.scala copied some methods from org.apache.spark.deploy.SparkHadoopUtil.scala + Copyright: 2014 and onwards The Apache Software Foundation Home page: http://spark.apache.org/ License: http://www.apache.org/licenses/LICENSE-2.0 diff --git a/README.md b/README.md index 2416b3f92cc9..427d8595f436 100644 --- a/README.md +++ b/README.md @@ -76,6 +76,14 @@ The default Scala version supported is 2.11. To build for Scala 2.12 version, bu mvn clean package -DskipTests -Dscala-2.12 ``` +### Build with Spark 3.0.0 + +The default Spark version supported is 2.4.4. To build for Spark 3.0.0 version, build using `spark3` profile + +``` +mvn clean package -DskipTests -Dspark3 +``` + ### Build without spark-avro module The default hudi-jar bundles spark-avro module. To build without spark-avro module, build using `spark-shade-unbundle-avro` profile diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkRowDeserializer.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkRowDeserializer.java new file mode 100644 index 000000000000..66b8b78b5692 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkRowDeserializer.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.utils; + +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.InternalRow; + +import java.io.Serializable; + +public interface SparkRowDeserializer extends Serializable { + Row deserializeRow(InternalRow internalRow); +} diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala index d1a4249f7962..88101265de29 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala @@ -21,41 +21,15 @@ package org.apache.hudi import org.apache.avro.Schema import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord} import org.apache.hudi.avro.HoodieAvroUtils -import org.apache.hudi.common.model.HoodieKey import org.apache.spark.rdd.RDD import org.apache.spark.sql.avro.SchemaConverters -import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} +import org.apache.spark.sql.{Dataset, Row, SparkSession} import scala.collection.JavaConverters._ object AvroConversionUtils { - def createRdd(df: DataFrame, structName: String, recordNamespace: String): RDD[GenericRecord] = { - val avroSchema = convertStructTypeToAvroSchema(df.schema, structName, recordNamespace) - createRdd(df, avroSchema, structName, recordNamespace) - } - - def createRdd(df: DataFrame, avroSchema: Schema, structName: String, recordNamespace: String) - : RDD[GenericRecord] = { - // Use the Avro schema to derive the StructType which has the correct nullability information - val dataType = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType] - val encoder = RowEncoder.apply(dataType).resolveAndBind() - df.queryExecution.toRdd.map(encoder.fromRow) - .mapPartitions { records => - if (records.isEmpty) Iterator.empty - else { - val convertor = AvroConversionHelper.createConverterToAvro(dataType, structName, recordNamespace) - records.map { x => convertor(x).asInstanceOf[GenericRecord] } - } - } - } - - def createRddForDeletes(df: DataFrame, rowField: String, partitionField: String): RDD[HoodieKey] = { - df.rdd.map(row => new HoodieKey(row.getAs[String](rowField), row.getAs[String](partitionField))) - } - def createDataFrame(rdd: RDD[GenericRecord], schemaStr: String, ss: SparkSession): Dataset[Row] = { if (rdd.isEmpty()) { ss.emptyDataFrame diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieRowCreateHandle.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieRowCreateHandle.java index c7a313ab96b3..edce77772d40 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieRowCreateHandle.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieRowCreateHandle.java @@ -72,7 +72,7 @@ public void tearDown() throws Exception { } @Test - public void testRowCreateHandle() throws IOException { + public void testRowCreateHandle() throws Exception { // init config and table HoodieWriteConfig cfg = SparkDatasetTestUtils.getConfigBuilder(basePath).build(); HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); @@ -113,7 +113,7 @@ public void testRowCreateHandle() throws IOException { * should be thrown. */ @Test - public void testGlobalFailure() throws IOException { + public void testGlobalFailure() throws Exception { // init config and table HoodieWriteConfig cfg = SparkDatasetTestUtils.getConfigBuilder(basePath).build(); HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); @@ -179,7 +179,8 @@ public void testInstantiationFailure() throws IOException { } } - private HoodieInternalWriteStatus writeAndGetWriteStatus(Dataset inputRows, HoodieRowCreateHandle handle) throws IOException { + private HoodieInternalWriteStatus writeAndGetWriteStatus(Dataset inputRows, HoodieRowCreateHandle handle) + throws Exception { List internalRows = SparkDatasetTestUtils.toInternalRows(inputRows, SparkDatasetTestUtils.ENCODER); // issue writes for (InternalRow internalRow : internalRows) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieInternalRowParquetWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieInternalRowParquetWriter.java index 37b8cdca88bb..2b344db7e132 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieInternalRowParquetWriter.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieInternalRowParquetWriter.java @@ -35,7 +35,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.io.IOException; import java.util.List; import java.util.Random; import java.util.UUID; @@ -64,7 +63,7 @@ public void tearDown() throws Exception { } @Test - public void endToEndTest() throws IOException { + public void endToEndTest() throws Exception { HoodieWriteConfig cfg = SparkDatasetTestUtils.getConfigBuilder(basePath).build(); for (int i = 0; i < 5; i++) { // init write support and parquet config diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java index 8104ef7744fc..56335511201f 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java @@ -43,7 +43,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; @@ -84,36 +83,32 @@ public static List getRecordsUsingInputFormat(Configuration conf, .map(f -> new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal())) .collect(Collectors.toList())); - return inputPaths.stream().map(path -> { - setInputPath(jobConf, path); - List records = new ArrayList<>(); - try { - List splits = Arrays.asList(inputFormat.getSplits(jobConf, 1)); - for (InputSplit split : splits) { - RecordReader recordReader = inputFormat.getRecordReader(split, jobConf, null); - Object key = recordReader.createKey(); - ArrayWritable writable = (ArrayWritable) recordReader.createValue(); - while (recordReader.next(key, writable)) { - GenericRecordBuilder newRecord = new GenericRecordBuilder(projectedSchema); - // writable returns an array with [field1, field2, _hoodie_commit_time, - // _hoodie_commit_seqno] - Writable[] values = writable.get(); - schema.getFields().stream() - .filter(f -> !projectCols || projectedColumns.contains(f.name())) - .map(f -> Pair.of(projectedSchema.getFields().stream() - .filter(p -> f.name().equals(p.name())).findFirst().get(), f)) - .forEach(fieldsPair -> newRecord.set(fieldsPair.getKey(), values[fieldsPair.getValue().pos()])); - records.add(newRecord.build()); - } + List records = new ArrayList<>(); + try { + FileInputFormat.setInputPaths(jobConf, String.join(",", inputPaths)); + InputSplit[] splits = inputFormat.getSplits(jobConf, inputPaths.size()); + + for (InputSplit split : splits) { + RecordReader recordReader = inputFormat.getRecordReader(split, jobConf, null); + Object key = recordReader.createKey(); + ArrayWritable writable = (ArrayWritable) recordReader.createValue(); + while (recordReader.next(key, writable)) { + GenericRecordBuilder newRecord = new GenericRecordBuilder(projectedSchema); + // writable returns an array with [field1, field2, _hoodie_commit_time, + // _hoodie_commit_seqno] + Writable[] values = writable.get(); + schema.getFields().stream() + .filter(f -> !projectCols || projectedColumns.contains(f.name())) + .map(f -> Pair.of(projectedSchema.getFields().stream() + .filter(p -> f.name().equals(p.name())).findFirst().get(), f)) + .forEach(fieldsPair -> newRecord.set(fieldsPair.getKey(), values[fieldsPair.getValue().pos()])); + records.add(newRecord.build()); } - } catch (IOException ie) { - ie.printStackTrace(); } - return records; - }).reduce((a, b) -> { - a.addAll(b); - return a; - }).orElse(new ArrayList<>()); + } catch (IOException ie) { + ie.printStackTrace(); + } + return records; } private static void setPropsForInputFormat(FileInputFormat inputFormat, JobConf jobConf, Schema schema, String hiveColumnTypes, boolean projectCols, List projectedCols) { @@ -156,10 +151,4 @@ private static void setPropsForInputFormat(FileInputFormat inputFormat, JobConf configurable.setConf(conf); jobConf.addResource(conf); } - - private static void setInputPath(JobConf jobConf, String inputPath) { - jobConf.set("mapreduce.input.fileinputformat.inputdir", inputPath); - jobConf.set("mapreduce.input.fileinputformat.inputdir", inputPath); - jobConf.set("map.input.dir", inputPath); - } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java index 04a1712a2197..3d2019dbdcef 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java @@ -26,6 +26,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex; +import org.apache.spark.package$; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; @@ -41,6 +42,8 @@ import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; import java.util.UUID; @@ -139,11 +142,11 @@ public static Row getRandomValue(String partitionPath, boolean isError) { * @param rows Datasets to be converted * @return the List of {@link InternalRow}s thus converted. */ - public static List toInternalRows(Dataset rows, ExpressionEncoder encoder) { + public static List toInternalRows(Dataset rows, ExpressionEncoder encoder) throws Exception { List toReturn = new ArrayList<>(); List rowList = rows.collectAsList(); for (Row row : rowList) { - toReturn.add(encoder.toRow(row).copy()); + toReturn.add(serializeRow(encoder, row).copy()); } return toReturn; } @@ -173,4 +176,17 @@ public static HoodieWriteConfig.Builder getConfigBuilder(String basePath) { .withBulkInsertParallelism(2); } + private static InternalRow serializeRow(ExpressionEncoder encoder, Row row) + throws InvocationTargetException, IllegalAccessException, NoSuchMethodException, ClassNotFoundException { + // TODO remove reflection if Spark 2.x support is dropped + if (package$.MODULE$.SPARK_VERSION().startsWith("2.")) { + Method spark2method = encoder.getClass().getMethod("toRow", Object.class); + return (InternalRow) spark2method.invoke(encoder, row); + } else { + Class serializerClass = Class.forName("org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer"); + Object serializer = encoder.getClass().getMethod("createSerializer").invoke(encoder); + Method aboveSpark2method = serializerClass.getMethod("apply", Object.class); + return (InternalRow) aboveSpark2method.invoke(serializer, row); + } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java index d42592bd1ece..91a28a861fad 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java @@ -171,7 +171,7 @@ private T executeRequest(String requestPath, Map queryParame break; } String content = response.returnContent().asString(); - return mapper.readValue(content, reference); + return (T) mapper.readValue(content, reference); } private Map getParamsWithPartitionPath(String partitionPath) { diff --git a/hudi-integ-test/pom.xml b/hudi-integ-test/pom.xml index e9fcc61e013b..90ca94dbb4ae 100644 --- a/hudi-integ-test/pom.xml +++ b/hudi-integ-test/pom.xml @@ -206,12 +206,11 @@ com.fasterxml.jackson.dataformat jackson-dataformat-yaml - 2.7.4 + ${fasterxml.jackson.dataformat.yaml.version} com.fasterxml.jackson.core jackson-databind - 2.6.7.3 @@ -220,11 +219,6 @@ jackson-annotations test - - com.fasterxml.jackson.core - jackson-databind - test - com.fasterxml.jackson.datatype jackson-datatype-guava diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/SparkBasedReader.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/SparkBasedReader.java index 16a5259ee94b..fc23a47b3533 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/SparkBasedReader.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/SparkBasedReader.java @@ -20,7 +20,7 @@ import java.util.List; import org.apache.avro.generic.GenericRecord; -import org.apache.hudi.AvroConversionUtils; +import org.apache.hudi.HoodieSparkUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.utilities.schema.RowBasedSchemaProvider; import org.apache.spark.api.java.JavaRDD; @@ -49,7 +49,7 @@ public static JavaRDD readAvro(SparkSession sparkSession, String .option(AVRO_SCHEMA_OPTION_KEY, schemaStr) .load(JavaConverters.asScalaIteratorConverter(listOfPaths.iterator()).asScala().toSeq()); - return AvroConversionUtils + return HoodieSparkUtils .createRdd(dataSet.toDF(), structName.orElse(RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME), nameSpace.orElse(RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE)) .toJavaRDD(); @@ -61,7 +61,7 @@ public static JavaRDD readParquet(SparkSession sparkSession, List Dataset dataSet = sparkSession.read() .parquet((JavaConverters.asScalaIteratorConverter(listOfPaths.iterator()).asScala().toSeq())); - return AvroConversionUtils + return HoodieSparkUtils .createRdd(dataSet.toDF(), structName.orElse(RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME), RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE) .toJavaRDD(); diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java index 8cb64f83e23a..80ed1d4bf40f 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java @@ -61,9 +61,9 @@ public abstract class ITTestBase { protected static final String HIVESERVER = "/hiveserver"; protected static final String PRESTO_COORDINATOR = "/presto-coordinator-1"; protected static final String HOODIE_WS_ROOT = "/var/hoodie/ws"; - protected static final String HOODIE_JAVA_APP = HOODIE_WS_ROOT + "/hudi-spark/run_hoodie_app.sh"; - protected static final String HOODIE_GENERATE_APP = HOODIE_WS_ROOT + "/hudi-spark/run_hoodie_generate_app.sh"; - protected static final String HOODIE_JAVA_STREAMING_APP = HOODIE_WS_ROOT + "/hudi-spark/run_hoodie_streaming_app.sh"; + protected static final String HOODIE_JAVA_APP = HOODIE_WS_ROOT + "/hudi-spark-datasource/hudi-spark/run_hoodie_app.sh"; + protected static final String HOODIE_GENERATE_APP = HOODIE_WS_ROOT + "/hudi-spark-datasource/hudi-spark/run_hoodie_generate_app.sh"; + protected static final String HOODIE_JAVA_STREAMING_APP = HOODIE_WS_ROOT + "/hudi-spark-datasource/hudi-spark/run_hoodie_streaming_app.sh"; protected static final String HUDI_HADOOP_BUNDLE = HOODIE_WS_ROOT + "/docker/hoodie/hadoop/hive_base/target/hoodie-hadoop-mr-bundle.jar"; protected static final String HUDI_HIVE_SYNC_BUNDLE = diff --git a/hudi-spark-datasource/hudi-spark-common/pom.xml b/hudi-spark-datasource/hudi-spark-common/pom.xml new file mode 100644 index 000000000000..af6403dc59c8 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/pom.xml @@ -0,0 +1,178 @@ + + + + + hudi-spark-datasource + org.apache.hudi + 0.6.1-SNAPSHOT + + 4.0.0 + + hudi-spark-common + ${parent.version} + + hudi-spark-common + jar + + + ${project.parent.parent.basedir} + + + + + + src/main/resources + + + + + + net.alchim31.maven + scala-maven-plugin + ${scala-maven-plugin.version} + + + -nobootcp + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-dependencies + prepare-package + + copy-dependencies + + + ${project.build.directory}/lib + true + true + true + + + + + + net.alchim31.maven + scala-maven-plugin + + + scala-compile-first + process-resources + + add-source + compile + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + compile + + compile + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + test-compile + + + + false + + + + org.apache.rat + apache-rat-plugin + + + org.scalastyle + scalastyle-maven-plugin + + + org.jacoco + jacoco-maven-plugin + + + + + + + + org.scala-lang + scala-library + ${scala.version} + + + + + org.apache.hudi + hudi-client-common + ${project.version} + + + org.apache.hudi + hudi-spark-client + ${project.version} + + + org.apache.hudi + hudi-common + ${project.version} + + + org.apache.hudi + hudi-hive-sync + ${project.version} + + + + org.apache.spark + spark-sql_${scala.binary.version} + + + + diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java similarity index 100% rename from hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java rename to hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala similarity index 99% rename from hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 8a3ccf7d5b55..1b6e49b456a0 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -36,7 +36,7 @@ import org.apache.log4j.LogManager */ object DataSourceReadOptions { - private val log = LogManager.getLogger(classOf[DefaultSource]) + private val log = LogManager.getLogger(DataSourceReadOptions.getClass) /** * Whether data needs to be read, in @@ -143,7 +143,7 @@ object DataSourceReadOptions { */ object DataSourceWriteOptions { - private val log = LogManager.getLogger(classOf[DefaultSource]) + private val log = LogManager.getLogger(DataSourceWriteOptions.getClass) /** * The write operation, that this write should do diff --git a/hudi-spark/pom.xml b/hudi-spark-datasource/hudi-spark/pom.xml similarity index 94% rename from hudi-spark/pom.xml rename to hudi-spark-datasource/hudi-spark/pom.xml index 0942327542a1..65efdce0a4f7 100644 --- a/hudi-spark/pom.xml +++ b/hudi-spark-datasource/hudi-spark/pom.xml @@ -17,17 +17,20 @@ --> - hudi + hudi-spark-datasource org.apache.hudi 0.6.1-SNAPSHOT 4.0.0 hudi-spark_${scala.binary.version} + ${parent.version} + + hudi-spark_${scala.binary.version} jar - ${project.parent.basedir} + ${project.parent.parent.basedir} @@ -196,6 +199,21 @@ hudi-sync-common ${project.version} + + org.apache.hudi + hudi-spark-common + ${project.version} + + + org.apache.hudi + hudi-spark2_${scala.binary.version} + ${project.version} + + + org.apache.hudi + hudi-spark3_2.12 + ${project.version} + diff --git a/hudi-spark/run_hoodie_app.sh b/hudi-spark-datasource/hudi-spark/run_hoodie_app.sh similarity index 93% rename from hudi-spark/run_hoodie_app.sh rename to hudi-spark-datasource/hudi-spark/run_hoodie_app.sh index 7c63e7411eb4..9782aa359556 100755 --- a/hudi-spark/run_hoodie_app.sh +++ b/hudi-spark-datasource/hudi-spark/run_hoodie_app.sh @@ -23,7 +23,7 @@ function error_exit { DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" #Ensure we pick the right jar even for hive11 builds -HUDI_JAR=`ls -c $DIR/../packaging/hudi-spark-bundle/target/hudi-spark-bundle*.jar | grep -v source | head -1` +HUDI_JAR=`ls -c $DIR/../../packaging/hudi-spark-bundle/target/hudi-spark-bundle*.jar | grep -v sources | head -1` if [ -z "$HADOOP_CONF_DIR" ]; then echo "setting hadoop conf dir" diff --git a/hudi-spark/run_hoodie_generate_app.sh b/hudi-spark-datasource/hudi-spark/run_hoodie_generate_app.sh similarity index 93% rename from hudi-spark/run_hoodie_generate_app.sh rename to hudi-spark-datasource/hudi-spark/run_hoodie_generate_app.sh index a4b4090b049a..a2769517b9eb 100755 --- a/hudi-spark/run_hoodie_generate_app.sh +++ b/hudi-spark-datasource/hudi-spark/run_hoodie_generate_app.sh @@ -23,7 +23,7 @@ function error_exit { DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" #Ensure we pick the right jar even for hive11 builds -HUDI_JAR=`ls -c $DIR/../packaging/hudi-spark-bundle/target/hudi-spark-bundle*.jar | grep -v source | head -1` +HUDI_JAR=`ls -c $DIR/../../packaging/hudi-spark-bundle/target/hudi-spark-bundle*.jar | grep -v sources | head -1` if [ -z "$HADOOP_CONF_DIR" ]; then echo "setting hadoop conf dir" diff --git a/hudi-spark/run_hoodie_streaming_app.sh b/hudi-spark-datasource/hudi-spark/run_hoodie_streaming_app.sh similarity index 94% rename from hudi-spark/run_hoodie_streaming_app.sh rename to hudi-spark-datasource/hudi-spark/run_hoodie_streaming_app.sh index 01f1a4e4a13f..9a81a4c0684e 100755 --- a/hudi-spark/run_hoodie_streaming_app.sh +++ b/hudi-spark-datasource/hudi-spark/run_hoodie_streaming_app.sh @@ -23,7 +23,7 @@ function error_exit { DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" #Ensure we pick the right jar even for hive11 builds -HUDI_JAR=`ls -c $DIR/../packaging/hudi-spark-bundle/target/hudi-spark-bundle*.jar | grep -v source | head -1` +HUDI_JAR=`ls -c $DIR/../../packaging/hudi-spark-bundle/target/hudi-spark-bundle*.jar | grep -v sources | head -1` if [ -z "$HADOOP_CONF_DIR" ]; then echo "setting hadoop conf dir" diff --git a/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java similarity index 100% rename from hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java rename to hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java diff --git a/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java similarity index 100% rename from hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java rename to hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java diff --git a/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java similarity index 100% rename from hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java rename to hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java diff --git a/hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncCompactService.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncCompactService.java similarity index 100% rename from hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncCompactService.java rename to hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncCompactService.java diff --git a/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java similarity index 95% rename from hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java rename to hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java index 022abe3085de..6c5eb0ed5748 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java @@ -18,8 +18,8 @@ package org.apache.hudi.bootstrap; -import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.DataSourceUtils; +import org.apache.hudi.HoodieSparkUtils; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieFileStatus; import org.apache.hudi.client.bootstrap.FullRecordBootstrapDataProvider; @@ -65,7 +65,7 @@ public JavaRDD generateInputRecords(String tableName, String sourc KeyGenerator keyGenerator = DataSourceUtils.createKeyGenerator(props); String structName = tableName + "_record"; String namespace = "hoodie." + tableName; - RDD genericRecords = AvroConversionUtils.createRdd(inputDataset, structName, namespace); + RDD genericRecords = HoodieSparkUtils.createRdd(inputDataset, structName, namespace); return genericRecords.toJavaRDD().map(gr -> { String orderingVal = HoodieAvroUtils.getNestedFieldValAsString( gr, props.getString("hoodie.datasource.write.precombine.field"), false); diff --git a/hudi-spark/src/main/java/org/apache/hudi/exception/HoodieDeltaStreamerException.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/exception/HoodieDeltaStreamerException.java similarity index 100% rename from hudi-spark/src/main/java/org/apache/hudi/exception/HoodieDeltaStreamerException.java rename to hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/exception/HoodieDeltaStreamerException.java diff --git a/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java similarity index 100% rename from hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java rename to hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java diff --git a/hudi-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/hudi-spark-datasource/hudi-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister similarity index 100% rename from hudi-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister rename to hudi-spark-datasource/hudi-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala similarity index 100% rename from hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala rename to hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala similarity index 100% rename from hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala rename to hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala similarity index 100% rename from hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala rename to hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieEmptyRelation.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieEmptyRelation.scala similarity index 100% rename from hudi-spark/src/main/scala/org/apache/hudi/HoodieEmptyRelation.scala rename to hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieEmptyRelation.scala diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala similarity index 100% rename from hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala rename to hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala similarity index 98% rename from hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala rename to hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index b10a05b025e0..d66103600f9a 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -41,6 +41,7 @@ import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool} import org.apache.hudi.internal.HoodieDataSourceInternalWriter import org.apache.hudi.sync.common.AbstractSyncTool import org.apache.log4j.LogManager +import org.apache.spark.SPARK_VERSION import org.apache.spark.SparkContext import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.rdd.RDD @@ -129,6 +130,9 @@ private[hudi] object HoodieSparkSqlWriter { // scalastyle:off if (parameters(ENABLE_ROW_WRITER_OPT_KEY).toBoolean && operation == WriteOperationType.BULK_INSERT) { + if (!SPARK_VERSION.startsWith("2.")) { + throw new HoodieException("Bulk insert using row writer is not supported with Spark 3. To use row writer please switch to spark 2.") + } val (success, commitTime: common.util.Option[String]) = bulkInsertAsRow(sqlContext, parameters, df, tblName, basePath, path, instantTime) return (success, commitTime, common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig) @@ -148,7 +152,7 @@ private[hudi] object HoodieSparkSqlWriter { // Convert to RDD[HoodieRecord] val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters)) - val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, schema, structName, nameSpace) + val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, schema, structName, nameSpace) val shouldCombine = parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean || operation.equals(WriteOperationType.UPSERT); val hoodieAllIncomingRecords = genericRecords.map(gr => { val hoodieRecord = if (shouldCombine) { @@ -195,7 +199,7 @@ private[hudi] object HoodieSparkSqlWriter { // Convert to RDD[HoodieKey] val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters)) - val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace) + val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace) val hoodieKeysToDelete = genericRecords.map(gr => keyGenerator.getKey(gr)).toJavaRDD() if (!tableExists) { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala new file mode 100644 index 000000000000..02880f22b93f --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.avro.Schema +import org.apache.avro.generic.GenericRecord +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hudi.client.utils.SparkRowDeserializer +import org.apache.hudi.common.model.HoodieRecord +import org.apache.spark.SPARK_VERSION +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.avro.SchemaConverters +import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} +import org.apache.spark.sql.execution.datasources.{FileStatusCache, InMemoryFileIndex} +import org.apache.spark.sql.types.{StringType, StructField, StructType} + +import scala.collection.JavaConverters._ + + +object HoodieSparkUtils { + + def getMetaSchema: StructType = { + StructType(HoodieRecord.HOODIE_META_COLUMNS.asScala.map(col => { + StructField(col, StringType, nullable = true) + })) + } + + /** + * This method copied from [[org.apache.spark.deploy.SparkHadoopUtil]]. + * [[org.apache.spark.deploy.SparkHadoopUtil]] becomes private since Spark 3.0.0 and hence we had to copy it locally. + */ + def isGlobPath(pattern: Path): Boolean = { + pattern.toString.exists("{}[]*?\\".toSet.contains) + } + + /** + * This method copied from [[org.apache.spark.deploy.SparkHadoopUtil]]. + * [[org.apache.spark.deploy.SparkHadoopUtil]] becomes private since Spark 3.0.0 and hence we had to copy it locally. + */ + def globPath(fs: FileSystem, pattern: Path): Seq[Path] = { + Option(fs.globStatus(pattern)).map { statuses => + statuses.map(_.getPath.makeQualified(fs.getUri, fs.getWorkingDirectory)).toSeq + }.getOrElse(Seq.empty[Path]) + } + + /** + * This method copied from [[org.apache.spark.deploy.SparkHadoopUtil]]. + * [[org.apache.spark.deploy.SparkHadoopUtil]] becomes private since Spark 3.0.0 and hence we had to copy it locally. + */ + def globPathIfNecessary(fs: FileSystem, pattern: Path): Seq[Path] = { + if (isGlobPath(pattern)) globPath(fs, pattern) else Seq(pattern) + } + + /** + * Checks to see whether input path contains a glob pattern and if yes, maps it to a list of absolute paths + * which match the glob pattern. Otherwise, returns original path + * + * @param paths List of absolute or globbed paths + * @param fs File system + * @return list of absolute file paths + */ + def checkAndGlobPathIfNecessary(paths: Seq[String], fs: FileSystem): Seq[Path] = { + paths.flatMap(path => { + val qualified = new Path(path).makeQualified(fs.getUri, fs.getWorkingDirectory) + val globPaths = globPathIfNecessary(fs, qualified) + globPaths + }) + } + + def createInMemoryFileIndex(sparkSession: SparkSession, globbedPaths: Seq[Path]): InMemoryFileIndex = { + val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) + new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty, fileStatusCache) + } + + def createRdd(df: DataFrame, structName: String, recordNamespace: String): RDD[GenericRecord] = { + val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, recordNamespace) + createRdd(df, avroSchema, structName, recordNamespace) + } + + def createRdd(df: DataFrame, avroSchema: Schema, structName: String, recordNamespace: String) + : RDD[GenericRecord] = { + // Use the Avro schema to derive the StructType which has the correct nullability information + val dataType = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType] + val encoder = RowEncoder.apply(dataType).resolveAndBind() + val deserializer = HoodieSparkUtils.createDeserializer(encoder) + df.queryExecution.toRdd.map(row => deserializer.deserializeRow(row)) + .mapPartitions { records => + if (records.isEmpty) Iterator.empty + else { + val convertor = AvroConversionHelper.createConverterToAvro(dataType, structName, recordNamespace) + records.map { x => convertor(x).asInstanceOf[GenericRecord] } + } + } + } + + def createDeserializer(encoder: ExpressionEncoder[Row]): SparkRowDeserializer = { + // TODO remove Spark2RowDeserializer if Spark 2.x support is dropped + if (SPARK_VERSION.startsWith("2.")) { + new Spark2RowDeserializer(encoder) + } else { + new Spark3RowDeserializer(encoder) + } + } +} diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala similarity index 100% rename from hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala rename to hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala similarity index 100% rename from hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala rename to hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala diff --git a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala similarity index 100% rename from hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala rename to hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala diff --git a/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala similarity index 97% rename from hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala rename to hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index c1a6acdb0425..0b81fa7b804c 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -113,9 +113,6 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf() ) - // Follow the implementation of Spark internal HadoopRDD to handle the broadcast configuration. - FileSystem.getLocal(jobConf) - SparkHadoopUtil.get.addCredentials(jobConf) val rdd = new HoodieMergeOnReadRDD( sqlContext.sparkContext, jobConf, diff --git a/hudi-spark/src/main/scala/org/apache/hudi/package.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/package.scala similarity index 100% rename from hudi-spark/src/main/scala/org/apache/hudi/package.scala rename to hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/package.scala diff --git a/hudi-spark/src/test/java/HoodieJavaApp.java b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java similarity index 100% rename from hudi-spark/src/test/java/HoodieJavaApp.java rename to hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java diff --git a/hudi-spark/src/test/java/HoodieJavaGenerateApp.java b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaGenerateApp.java similarity index 100% rename from hudi-spark/src/test/java/HoodieJavaGenerateApp.java rename to hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaGenerateApp.java diff --git a/hudi-spark/src/test/java/HoodieJavaStreamingApp.java b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java similarity index 99% rename from hudi-spark/src/test/java/HoodieJavaStreamingApp.java rename to hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java index 606490f444e2..1df12a35032a 100644 --- a/hudi-spark/src/test/java/HoodieJavaStreamingApp.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java @@ -43,7 +43,7 @@ import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.streaming.DataStreamWriter; import org.apache.spark.sql.streaming.OutputMode; -import org.apache.spark.sql.streaming.ProcessingTime; +import org.apache.spark.sql.streaming.Trigger; import java.util.List; import java.util.concurrent.ExecutorService; @@ -366,7 +366,7 @@ public void stream(Dataset streamingInput, String operationType, String che .outputMode(OutputMode.Append()); updateHiveSyncConfig(writer); - StreamingQuery query = writer.trigger(new ProcessingTime(500)).start(tablePath); + StreamingQuery query = writer.trigger(Trigger.ProcessingTime(500)).start(tablePath); query.awaitTermination(streamingDurationInMs); } diff --git a/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java similarity index 100% rename from hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java rename to hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java diff --git a/hudi-spark/src/test/java/org/apache/hudi/TestHoodieDatasetBulkInsertHelper.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieDatasetBulkInsertHelper.java similarity index 100% rename from hudi-spark/src/test/java/org/apache/hudi/TestHoodieDatasetBulkInsertHelper.java rename to hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieDatasetBulkInsertHelper.java diff --git a/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java similarity index 100% rename from hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java rename to hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java diff --git a/hudi-spark/src/test/java/org/apache/hudi/payload/TestAWSDmsAvroPayload.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/payload/TestAWSDmsAvroPayload.java similarity index 100% rename from hudi-spark/src/test/java/org/apache/hudi/payload/TestAWSDmsAvroPayload.java rename to hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/payload/TestAWSDmsAvroPayload.java diff --git a/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java similarity index 100% rename from hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java rename to hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java diff --git a/hudi-spark/src/test/resources/exampleSchema.txt b/hudi-spark-datasource/hudi-spark/src/test/resources/exampleSchema.txt similarity index 100% rename from hudi-spark/src/test/resources/exampleSchema.txt rename to hudi-spark-datasource/hudi-spark/src/test/resources/exampleSchema.txt diff --git a/hudi-spark/src/test/resources/log4j-surefire-quiet.properties b/hudi-spark-datasource/hudi-spark/src/test/resources/log4j-surefire-quiet.properties similarity index 100% rename from hudi-spark/src/test/resources/log4j-surefire-quiet.properties rename to hudi-spark-datasource/hudi-spark/src/test/resources/log4j-surefire-quiet.properties diff --git a/hudi-spark/src/test/resources/log4j-surefire.properties b/hudi-spark-datasource/hudi-spark/src/test/resources/log4j-surefire.properties similarity index 100% rename from hudi-spark/src/test/resources/log4j-surefire.properties rename to hudi-spark-datasource/hudi-spark/src/test/resources/log4j-surefire.properties diff --git a/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionHelper.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionHelper.scala similarity index 100% rename from hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionHelper.scala rename to hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionHelper.scala diff --git a/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala similarity index 100% rename from hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala rename to hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala diff --git a/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala similarity index 100% rename from hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala rename to hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala similarity index 100% rename from hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala rename to hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala similarity index 100% rename from hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala rename to hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala similarity index 100% rename from hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala rename to hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala similarity index 100% rename from hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala rename to hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala similarity index 98% rename from hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala rename to hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala index 226cf5313f5d..7a902c14b5f2 100644 --- a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala @@ -26,7 +26,7 @@ import org.apache.hudi.testutils.HoodieClientTestBase import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} import org.apache.log4j.LogManager import org.apache.spark.sql._ -import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime} +import org.apache.spark.sql.streaming.{OutputMode, Trigger} import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} @@ -93,7 +93,7 @@ class TestStructuredStreaming extends HoodieClientTestBase { .writeStream .format("org.apache.hudi") .options(commonOpts) - .trigger(new ProcessingTime(100)) + .trigger(Trigger.ProcessingTime(100)) .option("checkpointLocation", basePath + "/checkpoint") .outputMode(OutputMode.Append) .start(destPath) diff --git a/hudi-spark-datasource/hudi-spark2/pom.xml b/hudi-spark-datasource/hudi-spark2/pom.xml new file mode 100644 index 000000000000..8947bb79f6d7 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark2/pom.xml @@ -0,0 +1,225 @@ + + + + + hudi-spark-datasource + org.apache.hudi + 0.6.1-SNAPSHOT + + 4.0.0 + + hudi-spark2_${scala.binary.version} + ${parent.version} + + hudi-spark2_${scala.binary.version} + jar + + + ${project.parent.parent.basedir} + + + + + + src/main/resources + + + + + + net.alchim31.maven + scala-maven-plugin + ${scala-maven-plugin.version} + + + -nobootcp + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-dependencies + prepare-package + + copy-dependencies + + + ${project.build.directory}/lib + true + true + true + + + + + + net.alchim31.maven + scala-maven-plugin + + + scala-compile-first + process-resources + + add-source + compile + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + compile + + compile + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + test-compile + + + + false + + + + org.apache.rat + apache-rat-plugin + + + org.scalastyle + scalastyle-maven-plugin + + + org.jacoco + jacoco-maven-plugin + + + + + + + + org.scala-lang + scala-library + ${scala.version} + + + + + org.apache.hudi + hudi-client-common + ${project.version} + + + org.apache.hudi + hudi-spark-client + ${project.version} + + + org.apache.hudi + hudi-common + ${project.version} + + + org.apache.hudi + hudi-spark-common + ${project.version} + + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark2.version} + true + + + + io.netty + netty + 3.9.9.Final + true + + + io.netty + netty-all + 4.1.17.Final + true + + + + + org.apache.hudi + hudi-client-common + ${project.version} + tests + test-jar + test + + + org.apache.hudi + hudi-spark-client + ${project.version} + tests + test-jar + test + + + org.apache.hudi + hudi-common + ${project.version} + tests + test-jar + test + + + + org.junit.jupiter + junit-jupiter-api + test + + + + diff --git a/hudi-spark/src/main/java/org/apache/hudi/internal/DefaultSource.java b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java similarity index 100% rename from hudi-spark/src/main/java/org/apache/hudi/internal/DefaultSource.java rename to hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java diff --git a/hudi-spark/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java similarity index 100% rename from hudi-spark/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java rename to hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java diff --git a/hudi-spark/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriterFactory.java b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriterFactory.java similarity index 100% rename from hudi-spark/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriterFactory.java rename to hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriterFactory.java diff --git a/hudi-spark/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java similarity index 100% rename from hudi-spark/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java rename to hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java diff --git a/hudi-spark/src/main/java/org/apache/hudi/internal/HoodieWriterCommitMessage.java b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieWriterCommitMessage.java similarity index 100% rename from hudi-spark/src/main/java/org/apache/hudi/internal/HoodieWriterCommitMessage.java rename to hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieWriterCommitMessage.java diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/hudi/Spark2RowDeserializer.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/hudi/Spark2RowDeserializer.scala new file mode 100644 index 000000000000..84fe4c3e8b28 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/hudi/Spark2RowDeserializer.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.client.utils.SparkRowDeserializer + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder + +class Spark2RowDeserializer(val encoder: ExpressionEncoder[Row]) extends SparkRowDeserializer { + def deserializeRow(internalRow: InternalRow): Row = { + encoder.fromRow(internalRow) + } +} diff --git a/hudi-spark/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java similarity index 96% rename from hudi-spark/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java rename to hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java index 5a5d8b2700d4..ac69af51eb8b 100644 --- a/hudi-spark/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java +++ b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java @@ -27,6 +27,7 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hudi.testutils.HoodieClientTestHarness; +import org.apache.spark.package$; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.InternalRow; @@ -34,7 +35,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Random; @@ -51,6 +51,7 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import static org.junit.jupiter.api.Assumptions.assumeTrue; /** * Unit tests {@link HoodieBulkInsertDataInternalWriter}. @@ -61,6 +62,8 @@ public class TestHoodieBulkInsertDataInternalWriter extends HoodieClientTestHarn @BeforeEach public void setUp() throws Exception { + // this test is only compatible with spark 2 + assumeTrue(package$.MODULE$.SPARK_VERSION().startsWith("2.")); initSparkContexts("TestHoodieBulkInsertDataInternalWriter"); initPath(); initFileSystem(); @@ -74,7 +77,7 @@ public void tearDown() throws Exception { } @Test - public void testDataInternalWriter() throws IOException { + public void testDataInternalWriter() throws Exception { // init config and table HoodieWriteConfig cfg = getConfigBuilder(basePath).build(); HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); @@ -119,7 +122,7 @@ public void testDataInternalWriter() throws IOException { * to throw Global Error. Verify global error is set appropriately and only first batch of records are written to disk. */ @Test - public void testGlobalFailure() throws IOException { + public void testGlobalFailure() throws Exception { // init config and table HoodieWriteConfig cfg = getConfigBuilder(basePath).build(); HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); @@ -165,7 +168,8 @@ public void testGlobalFailure() throws IOException { assertOutput(inputRows, result, instantTime, fileNames); } - private void writeRows(Dataset inputRows, HoodieBulkInsertDataInternalWriter writer) throws IOException { + private void writeRows(Dataset inputRows, HoodieBulkInsertDataInternalWriter writer) + throws Exception { List internalRows = toInternalRows(inputRows, ENCODER); // issue writes for (InternalRow internalRow : internalRows) { diff --git a/hudi-spark/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java similarity index 96% rename from hudi-spark/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java rename to hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java index 89d748f671aa..454c74d967cd 100644 --- a/hudi-spark/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java +++ b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java @@ -26,6 +26,7 @@ import org.apache.hudi.testutils.HoodieClientTestHarness; import org.apache.hudi.testutils.HoodieClientTestUtils; +import org.apache.spark.package$; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.InternalRow; @@ -34,7 +35,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -49,6 +49,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assumptions.assumeTrue; /** * Unit tests {@link HoodieDataSourceInternalWriter}. @@ -59,6 +60,8 @@ public class TestHoodieDataSourceInternalWriter extends HoodieClientTestHarness @BeforeEach public void setUp() throws Exception { + // this test is only compatible with spark 2 + assumeTrue(package$.MODULE$.SPARK_VERSION().startsWith("2.")); initSparkContexts("TestHoodieDataSourceInternalWriter"); initPath(); initFileSystem(); @@ -72,7 +75,7 @@ public void tearDown() throws Exception { } @Test - public void testDataSourceWriter() throws IOException { + public void testDataSourceWriter() throws Exception { // init config and table HoodieWriteConfig cfg = getConfigBuilder(basePath).build(); String instantTime = "001"; @@ -114,7 +117,7 @@ public void testDataSourceWriter() throws IOException { } @Test - public void testMultipleDataSourceWrites() throws IOException { + public void testMultipleDataSourceWrites() throws Exception { // init config and table HoodieWriteConfig cfg = getConfigBuilder(basePath).build(); int partitionCounter = 0; @@ -158,7 +161,7 @@ public void testMultipleDataSourceWrites() throws IOException { } @Test - public void testLargeWrites() throws IOException { + public void testLargeWrites() throws Exception { // init config and table HoodieWriteConfig cfg = getConfigBuilder(basePath).build(); int partitionCounter = 0; @@ -208,7 +211,7 @@ public void testLargeWrites() throws IOException { * verify only records from batch1 is available to read */ @Test - public void testAbort() throws IOException { + public void testAbort() throws Exception { // init config and table HoodieWriteConfig cfg = getConfigBuilder(basePath).build(); @@ -274,7 +277,7 @@ public void testAbort() throws IOException { assertOutput(totalInputRows, result, instantTime0); } - private void writeRows(Dataset inputRows, DataWriter writer) throws IOException { + private void writeRows(Dataset inputRows, DataWriter writer) throws Exception { List internalRows = toInternalRows(inputRows, ENCODER); // issue writes for (InternalRow internalRow : internalRows) { diff --git a/hudi-spark-datasource/hudi-spark3/pom.xml b/hudi-spark-datasource/hudi-spark3/pom.xml new file mode 100644 index 000000000000..7b1ffa977651 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3/pom.xml @@ -0,0 +1,163 @@ + + + + + hudi-spark-datasource + org.apache.hudi + 0.6.1-SNAPSHOT + + 4.0.0 + + hudi-spark3_2.12 + ${parent.version} + + hudi-spark3_2.12 + jar + + + ${project.parent.parent.basedir} + + + + + + src/main/resources + + + + + + net.alchim31.maven + scala-maven-plugin + ${scala-maven-plugin.version} + + + -nobootcp + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-dependencies + prepare-package + + copy-dependencies + + + ${project.build.directory}/lib + true + true + true + + + + + + net.alchim31.maven + scala-maven-plugin + + + scala-compile-first + process-resources + + add-source + compile + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + compile + + compile + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + test-compile + + + + false + + + + org.apache.rat + apache-rat-plugin + + + org.scalastyle + scalastyle-maven-plugin + + + org.jacoco + jacoco-maven-plugin + + + + + + + org.scala-lang + scala-library + ${scala12.version} + + + + org.apache.spark + spark-sql_2.12 + ${spark3.version} + true + + + + org.apache.hudi + hudi-spark-client + ${project.version} + + + + diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/hudi/Spark3RowDeserializer.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/hudi/Spark3RowDeserializer.scala new file mode 100644 index 000000000000..a0606553ff27 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/hudi/Spark3RowDeserializer.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.client.utils.SparkRowDeserializer + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder + +class Spark3RowDeserializer(val encoder: ExpressionEncoder[Row]) extends SparkRowDeserializer { + + private val deserializer: ExpressionEncoder.Deserializer[Row] = encoder.createDeserializer() + + def deserializeRow(internalRow: InternalRow): Row = { + deserializer.apply(internalRow) + } +} diff --git a/hudi-spark-datasource/pom.xml b/hudi-spark-datasource/pom.xml new file mode 100644 index 000000000000..2301e5f2d988 --- /dev/null +++ b/hudi-spark-datasource/pom.xml @@ -0,0 +1,39 @@ + + + + + hudi + org.apache.hudi + 0.6.1-SNAPSHOT + + 4.0.0 + + hudi-spark-datasource + pom + + + ${project.parent.basedir} + + + + hudi-spark-common + hudi-spark + hudi-spark2 + hudi-spark3 + + diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala deleted file mode 100644 index 26babd834b23..000000000000 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi - -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hudi.common.model.HoodieRecord -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.execution.datasources.{FileStatusCache, InMemoryFileIndex} -import org.apache.spark.sql.types.{StringType, StructField, StructType} -import scala.collection.JavaConverters._ - - -object HoodieSparkUtils { - - def getMetaSchema: StructType = { - StructType(HoodieRecord.HOODIE_META_COLUMNS.asScala.map(col => { - StructField(col, StringType, nullable = true) - })) - } - - def checkAndGlobPathIfNecessary(paths: Seq[String], fs: FileSystem): Seq[Path] = { - paths.flatMap(path => { - val qualified = new Path(path).makeQualified(fs.getUri, fs.getWorkingDirectory) - val globPaths = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified) - globPaths - }) - } - - def createInMemoryFileIndex(sparkSession: SparkSession, globbedPaths: Seq[Path]): InMemoryFileIndex = { - val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) - new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty, fileStatusCache) - } -} diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml index ab51475de081..c4a1606fd52f 100644 --- a/hudi-utilities/pom.xml +++ b/hudi-utilities/pom.xml @@ -118,6 +118,11 @@ + + org.apache.hudi + hudi-spark-common + ${project.version} + org.apache.hudi hudi-spark_${scala.binary.version} @@ -129,6 +134,16 @@ + + org.apache.hudi + hudi-spark2_${scala.binary.version} + ${project.version} + + + org.apache.hudi + hudi-spark3_2.12 + ${project.version} + diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index fb427751ab82..7d5c9a439f96 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -55,7 +55,6 @@ import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.apache.spark.Accumulator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -68,6 +67,7 @@ import org.apache.spark.sql.jdbc.JdbcDialect; import org.apache.spark.sql.jdbc.JdbcDialects; import org.apache.spark.sql.types.StructType; +import org.apache.spark.util.LongAccumulator; import java.io.BufferedReader; import java.io.IOException; @@ -292,7 +292,7 @@ public static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, Strin } public static int handleErrors(JavaSparkContext jsc, String instantTime, JavaRDD writeResponse) { - Accumulator errors = jsc.accumulator(0); + LongAccumulator errors = jsc.sc().longAccumulator(); writeResponse.foreach(writeStatus -> { if (writeStatus.hasErrors()) { errors.add(1); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index a27ce996d4bc..e17c1f015651 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -18,8 +18,8 @@ package org.apache.hudi.utilities.deltastreamer; -import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.DataSourceUtils; +import org.apache.hudi.HoodieSparkUtils; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; @@ -342,7 +342,7 @@ public Pair>> readFromSource( // pass in the schema for the Row-to-Avro conversion // to avoid nullability mismatch between Avro schema and Row schema avroRDDOptional = transformed - .map(t -> AvroConversionUtils.createRdd( + .map(t -> HoodieSparkUtils.createRdd( t, this.userProvidedSchemaProvider.getTargetSchema(), HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()); schemaProvider = this.userProvidedSchemaProvider; @@ -356,7 +356,7 @@ public Pair>> readFromSource( UtilHelpers.createRowBasedSchemaProvider(r.schema(), props, jssc))) .orElse(dataAndCheckpoint.getSchemaProvider()); avroRDDOptional = transformed - .map(t -> AvroConversionUtils.createRdd( + .map(t -> HoodieSparkUtils.createRdd( t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()); } } else { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java index 505deed6d65e..379cc4b754e2 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java @@ -19,6 +19,7 @@ package org.apache.hudi.utilities.deltastreamer; import org.apache.hudi.AvroConversionUtils; +import org.apache.hudi.HoodieSparkUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; @@ -73,8 +74,8 @@ public InputBatch> fetchNewDataInAvroFormat(Optionorg.apache.hudi:hudi-spark-client org.apache.hudi:hudi-utilities_${scala.binary.version} org.apache.hudi:hudi-spark_${scala.binary.version} + org.apache.hudi:hudi-spark2_${scala.binary.version} + org.apache.hudi:hudi-spark3_2.12 org.apache.hudi:hudi-hive-sync org.apache.hudi:hudi-sync-common org.apache.hudi:hudi-hadoop-mr @@ -339,6 +341,18 @@ ${project.version} + + org.apache.hudi + hudi-spark2_${scala.binary.version} + ${project.version} + + + + org.apache.hudi + hudi-spark3_2.12 + ${project.version} + + org.apache.hadoop hadoop-hdfs diff --git a/packaging/hudi-spark-bundle/pom.xml b/packaging/hudi-spark-bundle/pom.xml index e4f4bbceddc5..caa254a11f30 100644 --- a/packaging/hudi-spark-bundle/pom.xml +++ b/packaging/hudi-spark-bundle/pom.xml @@ -66,7 +66,10 @@ org.apache.hudi:hudi-common org.apache.hudi:hudi-client-common org.apache.hudi:hudi-spark-client + org.apache.hudi:hudi-spark-common org.apache.hudi:hudi-spark_${scala.binary.version} + org.apache.hudi:hudi-spark2_${scala.binary.version} + org.apache.hudi:hudi-spark3_2.12 org.apache.hudi:hudi-hive-sync org.apache.hudi:hudi-sync-common org.apache.hudi:hudi-hadoop-mr @@ -220,11 +223,26 @@ hudi-hive-sync ${project.version} + + org.apache.hudi + hudi-spark-common + ${project.version} + org.apache.hudi hudi-spark_${scala.binary.version} ${project.version} + + org.apache.hudi + hudi-spark2_${scala.binary.version} + ${project.version} + + + org.apache.hudi + hudi-spark3_2.12 + ${project.version} + org.apache.hudi hudi-timeline-service diff --git a/packaging/hudi-utilities-bundle/pom.xml b/packaging/hudi-utilities-bundle/pom.xml index 39e48bbb50ba..8aec1844d112 100644 --- a/packaging/hudi-utilities-bundle/pom.xml +++ b/packaging/hudi-utilities-bundle/pom.xml @@ -69,7 +69,10 @@ org.apache.hudi:hudi-client-common org.apache.hudi:hudi-spark-client org.apache.hudi:hudi-utilities_${scala.binary.version} + org.apache.hudi:hudi-spark-common org.apache.hudi:hudi-spark_${scala.binary.version} + org.apache.hudi:hudi-spark2_${scala.binary.version} + org.apache.hudi:hudi-spark3_2.12 org.apache.hudi:hudi-hive-sync org.apache.hudi:hudi-sync-common org.apache.hudi:hudi-hadoop-mr @@ -105,6 +108,7 @@ io.prometheus:simpleclient_common com.yammer.metrics:metrics-core org.apache.spark:spark-streaming-kafka-0-10_${scala.binary.version} + org.apache.spark:spark-token-provider-kafka-0-10_${scala.binary.version} org.apache.kafka:kafka_${scala.binary.version} com.101tec:zkclient org.apache.kafka:kafka-clients @@ -227,11 +231,26 @@ + + org.apache.hudi + hudi-spark-common + ${project.version} + org.apache.hudi hudi-spark_${scala.binary.version} ${project.version} + + org.apache.hudi + hudi-spark2_${scala.binary.version} + ${project.version} + + + org.apache.hudi + hudi-spark3_2.12 + ${project.version} + org.apache.hudi hudi-utilities_${scala.binary.version} diff --git a/pom.xml b/pom.xml index 5ad22596cee9..f4111f7ed3d1 100644 --- a/pom.xml +++ b/pom.xml @@ -39,7 +39,7 @@ hudi-cli hudi-client hudi-hadoop-mr - hudi-spark + hudi-spark-datasource hudi-timeline-service hudi-utilities hudi-sync @@ -84,6 +84,9 @@ 1.8 2.6.7 + 2.6.7.3 + 2.6.7.1 + 2.7.4 2.0.0 2.17 1.10.1 @@ -103,9 +106,12 @@ 4.4.1 2.4.4 1.11.2 + 2.4.4 + 3.0.0 1.8.2 2.11.12 2.11 + 2.12.10 0.12 3.3.1 3.0.1 @@ -432,7 +438,7 @@ com.fasterxml.jackson.core jackson-databind - ${fasterxml.version}.3 + ${fasterxml.jackson.databind.version} com.fasterxml.jackson.datatype @@ -442,7 +448,7 @@ com.fasterxml.jackson.module jackson-module-scala_${scala.binary.version} - ${fasterxml.version}.1 + ${fasterxml.jackson.module.scala.version} @@ -1318,7 +1324,7 @@ scala-2.12 - 2.12.10 + ${scala12.version} 2.12 @@ -1353,6 +1359,25 @@ + + + spark3 + + ${spark3.version} + ${scala12.version} + 2.12 + 2.4.1 + 2.10.0 + 2.10.0 + 2.10.0 + 2.10.0 + + + + spark3 + + +