Skip to content

Commit

Permalink
[HUDI-1040] Make Hudi support Spark 3 (apache#2208)
Browse files Browse the repository at this point in the history
* Fix flaky MOR unit test

* Update Spark APIs to make it be compatible with both spark2 & spark3

* Refactor bulk insert v2 part to make Hudi be able to compile with Spark3

* Add spark3 profile to handle fasterxml & spark version

* Create hudi-spark-common module & refactor hudi-spark related modules

Co-authored-by: Wenning Ding <wenningd@amazon.com>
  • Loading branch information
2 people authored and lw309637554 committed Dec 10, 2020
1 parent fba6053 commit 0207b1f
Show file tree
Hide file tree
Showing 79 changed files with 1,040 additions and 172 deletions.
2 changes: 2 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ This product includes code from Apache Spark

* org.apache.hudi.AvroConversionHelper copied from classes in org/apache/spark/sql/avro package

* org.apache.hudi.HoodieSparkUtils.scala copied some methods from org.apache.spark.deploy.SparkHadoopUtil.scala

Copyright: 2014 and onwards The Apache Software Foundation
Home page: http://spark.apache.org/
License: http://www.apache.org/licenses/LICENSE-2.0
Expand Down
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@ The default Scala version supported is 2.11. To build for Scala 2.12 version, bu
mvn clean package -DskipTests -Dscala-2.12
```

### Build with Spark 3.0.0

The default Spark version supported is 2.4.4. To build for Spark 3.0.0 version, build using `spark3` profile

```
mvn clean package -DskipTests -Dspark3
```

### Build without spark-avro module

The default hudi-jar bundles spark-avro module. To build without spark-avro module, build using `spark-shade-unbundle-avro` profile
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client.utils;

import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;

import java.io.Serializable;

public interface SparkRowDeserializer extends Serializable {
Row deserializeRow(InternalRow internalRow);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,41 +21,15 @@ package org.apache.hudi
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord}
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.model.HoodieKey
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.{Dataset, Row, SparkSession}

import scala.collection.JavaConverters._

object AvroConversionUtils {

def createRdd(df: DataFrame, structName: String, recordNamespace: String): RDD[GenericRecord] = {
val avroSchema = convertStructTypeToAvroSchema(df.schema, structName, recordNamespace)
createRdd(df, avroSchema, structName, recordNamespace)
}

def createRdd(df: DataFrame, avroSchema: Schema, structName: String, recordNamespace: String)
: RDD[GenericRecord] = {
// Use the Avro schema to derive the StructType which has the correct nullability information
val dataType = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
val encoder = RowEncoder.apply(dataType).resolveAndBind()
df.queryExecution.toRdd.map(encoder.fromRow)
.mapPartitions { records =>
if (records.isEmpty) Iterator.empty
else {
val convertor = AvroConversionHelper.createConverterToAvro(dataType, structName, recordNamespace)
records.map { x => convertor(x).asInstanceOf[GenericRecord] }
}
}
}

def createRddForDeletes(df: DataFrame, rowField: String, partitionField: String): RDD[HoodieKey] = {
df.rdd.map(row => new HoodieKey(row.getAs[String](rowField), row.getAs[String](partitionField)))
}

def createDataFrame(rdd: RDD[GenericRecord], schemaStr: String, ss: SparkSession): Dataset[Row] = {
if (rdd.isEmpty()) {
ss.emptyDataFrame
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void tearDown() throws Exception {
}

@Test
public void testRowCreateHandle() throws IOException {
public void testRowCreateHandle() throws Exception {
// init config and table
HoodieWriteConfig cfg = SparkDatasetTestUtils.getConfigBuilder(basePath).build();
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
Expand Down Expand Up @@ -113,7 +113,7 @@ public void testRowCreateHandle() throws IOException {
* should be thrown.
*/
@Test
public void testGlobalFailure() throws IOException {
public void testGlobalFailure() throws Exception {
// init config and table
HoodieWriteConfig cfg = SparkDatasetTestUtils.getConfigBuilder(basePath).build();
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
Expand Down Expand Up @@ -179,7 +179,8 @@ public void testInstantiationFailure() throws IOException {
}
}

private HoodieInternalWriteStatus writeAndGetWriteStatus(Dataset<Row> inputRows, HoodieRowCreateHandle handle) throws IOException {
private HoodieInternalWriteStatus writeAndGetWriteStatus(Dataset<Row> inputRows, HoodieRowCreateHandle handle)
throws Exception {
List<InternalRow> internalRows = SparkDatasetTestUtils.toInternalRows(inputRows, SparkDatasetTestUtils.ENCODER);
// issue writes
for (InternalRow internalRow : internalRows) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.UUID;
Expand Down Expand Up @@ -64,7 +63,7 @@ public void tearDown() throws Exception {
}

@Test
public void endToEndTest() throws IOException {
public void endToEndTest() throws Exception {
HoodieWriteConfig cfg = SparkDatasetTestUtils.getConfigBuilder(basePath).build();
for (int i = 0; i < 5; i++) {
// init write support and parquet config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -84,36 +83,32 @@ public static List<GenericRecord> getRecordsUsingInputFormat(Configuration conf,
.map(f -> new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal()))
.collect(Collectors.toList()));

return inputPaths.stream().map(path -> {
setInputPath(jobConf, path);
List<GenericRecord> records = new ArrayList<>();
try {
List<InputSplit> splits = Arrays.asList(inputFormat.getSplits(jobConf, 1));
for (InputSplit split : splits) {
RecordReader recordReader = inputFormat.getRecordReader(split, jobConf, null);
Object key = recordReader.createKey();
ArrayWritable writable = (ArrayWritable) recordReader.createValue();
while (recordReader.next(key, writable)) {
GenericRecordBuilder newRecord = new GenericRecordBuilder(projectedSchema);
// writable returns an array with [field1, field2, _hoodie_commit_time,
// _hoodie_commit_seqno]
Writable[] values = writable.get();
schema.getFields().stream()
.filter(f -> !projectCols || projectedColumns.contains(f.name()))
.map(f -> Pair.of(projectedSchema.getFields().stream()
.filter(p -> f.name().equals(p.name())).findFirst().get(), f))
.forEach(fieldsPair -> newRecord.set(fieldsPair.getKey(), values[fieldsPair.getValue().pos()]));
records.add(newRecord.build());
}
List<GenericRecord> records = new ArrayList<>();
try {
FileInputFormat.setInputPaths(jobConf, String.join(",", inputPaths));
InputSplit[] splits = inputFormat.getSplits(jobConf, inputPaths.size());

for (InputSplit split : splits) {
RecordReader recordReader = inputFormat.getRecordReader(split, jobConf, null);
Object key = recordReader.createKey();
ArrayWritable writable = (ArrayWritable) recordReader.createValue();
while (recordReader.next(key, writable)) {
GenericRecordBuilder newRecord = new GenericRecordBuilder(projectedSchema);
// writable returns an array with [field1, field2, _hoodie_commit_time,
// _hoodie_commit_seqno]
Writable[] values = writable.get();
schema.getFields().stream()
.filter(f -> !projectCols || projectedColumns.contains(f.name()))
.map(f -> Pair.of(projectedSchema.getFields().stream()
.filter(p -> f.name().equals(p.name())).findFirst().get(), f))
.forEach(fieldsPair -> newRecord.set(fieldsPair.getKey(), values[fieldsPair.getValue().pos()]));
records.add(newRecord.build());
}
} catch (IOException ie) {
ie.printStackTrace();
}
return records;
}).reduce((a, b) -> {
a.addAll(b);
return a;
}).orElse(new ArrayList<>());
} catch (IOException ie) {
ie.printStackTrace();
}
return records;
}

private static void setPropsForInputFormat(FileInputFormat inputFormat, JobConf jobConf, Schema schema, String hiveColumnTypes, boolean projectCols, List<String> projectedCols) {
Expand Down Expand Up @@ -156,10 +151,4 @@ private static void setPropsForInputFormat(FileInputFormat inputFormat, JobConf
configurable.setConf(conf);
jobConf.addResource(conf);
}

private static void setInputPath(JobConf jobConf, String inputPath) {
jobConf.set("mapreduce.input.fileinputformat.inputdir", inputPath);
jobConf.set("mapreduce.input.fileinputformat.inputdir", inputPath);
jobConf.set("map.input.dir", inputPath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;

import org.apache.spark.package$;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
Expand All @@ -41,6 +42,8 @@
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
Expand Down Expand Up @@ -139,11 +142,11 @@ public static Row getRandomValue(String partitionPath, boolean isError) {
* @param rows Dataset<Row>s to be converted
* @return the List of {@link InternalRow}s thus converted.
*/
public static List<InternalRow> toInternalRows(Dataset<Row> rows, ExpressionEncoder encoder) {
public static List<InternalRow> toInternalRows(Dataset<Row> rows, ExpressionEncoder encoder) throws Exception {
List<InternalRow> toReturn = new ArrayList<>();
List<Row> rowList = rows.collectAsList();
for (Row row : rowList) {
toReturn.add(encoder.toRow(row).copy());
toReturn.add(serializeRow(encoder, row).copy());
}
return toReturn;
}
Expand Down Expand Up @@ -173,4 +176,17 @@ public static HoodieWriteConfig.Builder getConfigBuilder(String basePath) {
.withBulkInsertParallelism(2);
}

private static InternalRow serializeRow(ExpressionEncoder encoder, Row row)
throws InvocationTargetException, IllegalAccessException, NoSuchMethodException, ClassNotFoundException {
// TODO remove reflection if Spark 2.x support is dropped
if (package$.MODULE$.SPARK_VERSION().startsWith("2.")) {
Method spark2method = encoder.getClass().getMethod("toRow", Object.class);
return (InternalRow) spark2method.invoke(encoder, row);
} else {
Class<?> serializerClass = Class.forName("org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer");
Object serializer = encoder.getClass().getMethod("createSerializer").invoke(encoder);
Method aboveSpark2method = serializerClass.getMethod("apply", Object.class);
return (InternalRow) aboveSpark2method.invoke(serializer, row);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ private <T> T executeRequest(String requestPath, Map<String, String> queryParame
break;
}
String content = response.returnContent().asString();
return mapper.readValue(content, reference);
return (T) mapper.readValue(content, reference);
}

private Map<String, String> getParamsWithPartitionPath(String partitionPath) {
Expand Down
8 changes: 1 addition & 7 deletions hudi-integ-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,11 @@
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
<version>2.7.4</version>
<version>${fasterxml.jackson.dataformat.yaml.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.6.7.3</version>
</dependency>

<!-- Fasterxml - Test-->
Expand All @@ -220,11 +219,6 @@
<artifactId>jackson-annotations</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-guava</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
import org.apache.spark.api.java.JavaRDD;
Expand Down Expand Up @@ -49,7 +49,7 @@ public static JavaRDD<GenericRecord> readAvro(SparkSession sparkSession, String
.option(AVRO_SCHEMA_OPTION_KEY, schemaStr)
.load(JavaConverters.asScalaIteratorConverter(listOfPaths.iterator()).asScala().toSeq());

return AvroConversionUtils
return HoodieSparkUtils
.createRdd(dataSet.toDF(), structName.orElse(RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME),
nameSpace.orElse(RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE))
.toJavaRDD();
Expand All @@ -61,7 +61,7 @@ public static JavaRDD<GenericRecord> readParquet(SparkSession sparkSession, List
Dataset<Row> dataSet = sparkSession.read()
.parquet((JavaConverters.asScalaIteratorConverter(listOfPaths.iterator()).asScala().toSeq()));

return AvroConversionUtils
return HoodieSparkUtils
.createRdd(dataSet.toDF(), structName.orElse(RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME),
RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE)
.toJavaRDD();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ public abstract class ITTestBase {
protected static final String HIVESERVER = "/hiveserver";
protected static final String PRESTO_COORDINATOR = "/presto-coordinator-1";
protected static final String HOODIE_WS_ROOT = "/var/hoodie/ws";
protected static final String HOODIE_JAVA_APP = HOODIE_WS_ROOT + "/hudi-spark/run_hoodie_app.sh";
protected static final String HOODIE_GENERATE_APP = HOODIE_WS_ROOT + "/hudi-spark/run_hoodie_generate_app.sh";
protected static final String HOODIE_JAVA_STREAMING_APP = HOODIE_WS_ROOT + "/hudi-spark/run_hoodie_streaming_app.sh";
protected static final String HOODIE_JAVA_APP = HOODIE_WS_ROOT + "/hudi-spark-datasource/hudi-spark/run_hoodie_app.sh";
protected static final String HOODIE_GENERATE_APP = HOODIE_WS_ROOT + "/hudi-spark-datasource/hudi-spark/run_hoodie_generate_app.sh";
protected static final String HOODIE_JAVA_STREAMING_APP = HOODIE_WS_ROOT + "/hudi-spark-datasource/hudi-spark/run_hoodie_streaming_app.sh";
protected static final String HUDI_HADOOP_BUNDLE =
HOODIE_WS_ROOT + "/docker/hoodie/hadoop/hive_base/target/hoodie-hadoop-mr-bundle.jar";
protected static final String HUDI_HIVE_SYNC_BUNDLE =
Expand Down
Loading

0 comments on commit 0207b1f

Please sign in to comment.