Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-359] [NSE-273] Introduce shim layer to fix compatibility issues …
Browse files Browse the repository at this point in the history
…for gazelle on spark 3.1 & 3.2 (#742)

* Initial commit

* Add withNewChildInternal & withNewChildrenInternal for spark 3.2 compatibility

* Fix compatibility issues for ParquetFileFormat, etc.

* Fix compatibility issue for ColumnarBatchScanExec

* Fix compatibility issues for ColumnarBroadcastHashJoinExec

* Fix compatibility issues for ColumnarHashAggregateExec

* Fix compatibility issues for ColumnarShuffledHashJoinExec, etc

* Remove ColumnarScalarSubquery and the code where it is used, not relevant to spark 3.1/3.2 compatibility issues

* Fix compatibility issues for ColumnarShuffleManager, etc.

* Fix compatibility issues for ColumnarArrowPythonRunner etc.

* Set different scala version for spark 3.1/3.2

* Fix compatibility issues for ColumnarCustomShuffleReaderExec.scala

* Fix compatibility issues for ShuffledColumnarBatchRDD

* Refactor the code to refix compatibility issues for ColumnarCustomShuffleReaderExec

* Multipe fixes in match/case statements

* Set jackson versions for different versions of spark

* Fix compatibility issues caused by renaming CustomShuffleReaderExec to AQEShuffleReadExec

* Small fixes

* Fix compatibility issues for ReaderIterator

* Move AdaptiveSparkPlanExec & MemoryStore under shim layer

* Fix compatibility issues for ShufflePartitionUtils

* Fix issues found in building

* Fix cyclic dependency and import missing dependencies

* Fix accessibility issues about IndexShuffleBlockResolver, BaseShuffleHandle & SortShuffleWriter

* Fix dependency issue for ColumnarBatchScanExec

* Fix compile issues in spark311 module

* Fix compatibility issues for ReaderIterator

* Move Utils.doFetchFile to specific package and do some code refactor

* Make ColumnarBatchScanExec abstract to break the cyclic dependency

* Use a more concise way to fix compatibility issue for OutputWriter

* Add 3.1 or 3.2 shim layer dependency for NSE core module according to profile

* Convert some child plan to expected type

* Fix compile issues on spark 3.2.0

* Change the extension from ShuffledJoin to ColumnarShuffledJoin for ColumnarBHJ/SHJ

* Remove useless code

* Change spark version to 3.2.1 for profile spark-3.2

* Fix compatibility issues from spark 3.2.0 to 3.2.1

* Fix dependency issue for github action test

* Put fully-qualified class name under resources/META-INF/services for spark3.2 shim
  • Loading branch information
PHILO-HE authored Mar 9, 2022
1 parent ca9f2a5 commit 308cb58
Show file tree
Hide file tree
Showing 61 changed files with 1,700 additions and 367 deletions.
8 changes: 2 additions & 6 deletions .github/workflows/unittests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,7 @@ jobs:
- name: Run unit tests
run: |
mvn clean install -N
cd arrow-data-source
mvn clean install -DskipTests -Dbuild_arrow=OFF
cd ..
mvn clean install -DskipTests -Dbuild_arrow=OFF -pl arrow-data-source
mvn clean package -P full-scala-compiler -Phadoop-2.7.4 -am -pl native-sql-engine/core -DskipTests -Dbuild_arrow=OFF
mvn test -P full-scala-compiler -DmembersOnlySuites=org.apache.spark.sql.nativesql -am -DfailIfNoTests=false -Dexec.skip=true -DargLine="-Dspark.test.home=/tmp/spark-3.1.1-bin-hadoop2.7" &> log-file.log
echo '#!/bin/bash' > grep.sh
Expand Down Expand Up @@ -144,9 +142,7 @@ jobs:
- name: Run unit tests
run: |
mvn clean install -N
cd arrow-data-source
mvn clean install -DskipTests -Dbuild_arrow=OFF
cd ..
mvn clean install -DskipTests -Dbuild_arrow=OFF -pl arrow-data-source
mvn clean package -P full-scala-compiler -Phadoop-3.2 -am -pl native-sql-engine/core -DskipTests -Dbuild_arrow=OFF
mvn test -P full-scala-compiler -DmembersOnlySuites=org.apache.spark.sql.nativesql -am -DfailIfNoTests=false -Dexec.skip=true -DargLine="-Dspark.test.home=/tmp/spark-3.1.1-bin-hadoop3.2" &> log-file.log
echo '#!/bin/bash' > grep.sh
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,4 +310,8 @@ case class RowToArrowColumnarExec(child: SparkPlan) extends UnaryExecNode {
}
}
}

// For spark 3.2.
protected def withNewChildInternal(newChild: SparkPlan): RowToArrowColumnarExec =
copy(child = newChild)
}
6 changes: 6 additions & 0 deletions arrow-data-source/parquet/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@
<artifactId>spark-arrow-datasource-standard</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.intel.oap</groupId>
<artifactId>spark-sql-columnar-shims-common</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
import scala.util.{Failure, Try}

import com.intel.oap.spark.sql.execution.datasources.arrow.ArrowFileFormat
import com.intel.oap.sql.shims.SparkShimLoader
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce.{Job, JobID, OutputCommitter, TaskAttemptContext, TaskAttemptID, TaskID, TaskType}
Expand Down Expand Up @@ -274,6 +275,7 @@ class ParquetFileFormat
val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf)

(file: PartitionedFile) => {
assert(file.partitionValues.numFields == partitionSchema.size)
Expand All @@ -292,11 +294,17 @@ class ParquetFileFormat

lazy val footerFileMetaData =
ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData

val datetimeRebaseMode =
SparkShimLoader.getSparkShims.getDatetimeRebaseMode(footerFileMetaData, parquetOptions)

// Try to push down filters when filter push-down is enabled.
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = footerFileMetaData.getSchema
val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp,
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
val parquetFilters =
SparkShimLoader.getSparkShims.newParquetFilters(parquetSchema: MessageType,
pushDownDate, pushDownTimestamp, pushDownDecimal, pushDownStringStartWith,
pushDownInFilterThreshold, isCaseSensitive, footerFileMetaData, parquetOptions)
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
Expand All @@ -322,10 +330,6 @@ class ParquetFileFormat
None
}

val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
footerFileMetaData.getKeyValueMetaData.get,
SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))

val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val hadoopAttemptContext =
new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)
Expand All @@ -337,12 +341,14 @@ class ParquetFileFormat
}
val taskContext = Option(TaskContext.get())
if (enableVectorizedReader) {
val vectorizedReader = new VectorizedParquetRecordReader(
convertTz.orNull,
datetimeRebaseMode.toString,
"",
enableOffHeapColumnVector && taskContext.isDefined,
capacity)
val vectorizedReader = SparkShimLoader.getSparkShims
.newVectorizedParquetRecordReader(
convertTz.orNull,
footerFileMetaData,
parquetOptions,
enableOffHeapColumnVector && taskContext.isDefined,
capacity)

val iter = new RecordReaderIterator(vectorizedReader)
// SPARK-23457 Register a task completion listener before `initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
Expand All @@ -358,8 +364,8 @@ class ParquetFileFormat
} else {
logDebug(s"Falling back to parquet-mr")
// ParquetRecordReader returns InternalRow
val readSupport = new ParquetReadSupport(
convertTz, enableVectorizedReader = false, datetimeRebaseMode, SQLConf.LegacyBehaviorPolicy.LEGACY)
val readSupport = SparkShimLoader.getSparkShims.newParquetReadSupport(
convertTz, false, footerFileMetaData, parquetOptions)
val reader = if (pushed.isDefined && enableRecordFilter) {
val parquetFilter = FilterCompat.get(pushed.get, null)
new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
Expand Down
2 changes: 1 addition & 1 deletion arrow-data-source/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
</pluginRepositories>

<dependencies>
<dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>3.1.0</version>
Expand Down
6 changes: 6 additions & 0 deletions arrow-data-source/standard/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@
<artifactId>spark-arrow-datasource-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.intel.oap</groupId>
<artifactId>spark-sql-columnar-shims-common</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ object ArrowWriteExtension {
private case class ColumnarToFakeRowLogicAdaptor(child: LogicalPlan)
extends OrderPreservingUnaryNode {
override def output: Seq[Attribute] = child.output

// For spark 3.2.
protected def withNewChildInternal(newChild: LogicalPlan): ColumnarToFakeRowLogicAdaptor =
copy(child = newChild)
}

private case class ColumnarToFakeRowAdaptor(child: SparkPlan) extends ColumnarToRowTransition {
Expand All @@ -149,6 +153,10 @@ object ArrowWriteExtension {
}

override def output: Seq[Attribute] = child.output

// For spark 3.2.
protected def withNewChildInternal(newChild: SparkPlan): ColumnarToFakeRowAdaptor =
copy(child = newChild)
}

case class SimpleStrategy() extends Strategy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ class ArrowFileFormat extends FileFormat with DataSourceRegister with Serializab
override def close(): Unit = {
writeQueue.close()
}

// Do NOT add override keyword for compatibility on spark 3.1.
def path(): String = {
path
}
}
}
}
Expand Down
35 changes: 31 additions & 4 deletions native-sql-engine/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,33 @@
<nativesql.build_protobuf>${build_protobuf}</nativesql.build_protobuf>
<nativesql.build_jemalloc>${build_jemalloc}</nativesql.build_jemalloc>
</properties>

<profiles>
<profile>
<id>spark-3.1.1</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>com.intel.oap</groupId>
<artifactId>spark-sql-columnar-shims-spark311</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>spark-3.2</id>
<dependencies>
<dependency>
<groupId>com.intel.oap</groupId>
<artifactId>spark-sql-columnar-shims-spark321</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
</profiles>

<dependencies>
<!-- Prevent our dummy JAR from being included in Spark distributions or uploaded to YARN -->
<dependency>
Expand Down Expand Up @@ -166,19 +193,19 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.10.0</version>
<version>${jackson.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.10.0</version>
<version>${jackson.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.0</version>
<version>${jackson.version}</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -299,7 +326,7 @@
<groupId>com.intel.oap</groupId>
<artifactId>spark-sql-columnar-shims-common</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.*;
import java.util.stream.Collectors;

import org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader;
import org.apache.spark.sql.execution.datasources.VectorizedParquetRecordReaderChild;
import com.intel.oap.datasource.parquet.ParquetReader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
Expand All @@ -46,7 +46,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VectorizedParquetArrowReader extends VectorizedParquetRecordReader {
public class VectorizedParquetArrowReader extends VectorizedParquetRecordReaderChild {
private static final Logger LOG =
LoggerFactory.getLogger(VectorizedParquetArrowReader.class);
private ParquetReader reader = null;
Expand All @@ -70,7 +70,8 @@ public class VectorizedParquetArrowReader extends VectorizedParquetRecordReader

public VectorizedParquetArrowReader(String path, ZoneId convertTz, boolean useOffHeap,
int capacity, StructType sourceSchema, StructType readDataSchema, String tmp_dir) {
super(convertTz, "CORRECTED", "LEGACY", useOffHeap, capacity);
// TODO: datetimeRebaseTz & int96RebaseTz are set to "", needs to check the impact.
super(convertTz, "CORRECTED", "", "LEGACY", "", useOffHeap, capacity);
this.capacity = capacity;
this.path = path;
this.tmp_dir = tmp_dir;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ case class CoalesceBatchesExec(child: SparkPlan) extends UnaryExecNode {
new CloseableColumnBatchIterator(res)
}
}

// For spark 3.2.
protected def withNewChildInternal(newChild: SparkPlan): CoalesceBatchesExec =
copy(child = newChild)
}

object CoalesceBatchesExec {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,11 @@ case class ColumnarConditionProjectExec(
}
}

def isNullIntolerant(expr: Expression): Boolean = expr match {
case e: NullIntolerant => e.children.forall(isNullIntolerant)
// In spark 3.2, PredicateHelper has already introduced isNullIntolerant with completely same
// code. If we use the same method name, override keyword is required. But in spark3.1, no
// method is overridden. So we use an independent method name.
def isNullIntolerantInternal(expr: Expression): Boolean = expr match {
case e: NullIntolerant => e.children.forall(isNullIntolerantInternal)
case _ => false
}

Expand All @@ -110,7 +113,7 @@ case class ColumnarConditionProjectExec(

val notNullAttributes = if (condition != null) {
val (notNullPreds, otherPreds) = splitConjunctivePredicates(condition).partition {
case IsNotNull(a) => isNullIntolerant(a) && a.references.subsetOf(child.outputSet)
case IsNotNull(a) => isNullIntolerantInternal(a) && a.references.subsetOf(child.outputSet)
case _ => false
}
notNullPreds.flatMap(_.references).distinct.map(_.exprId)
Expand Down Expand Up @@ -267,6 +270,9 @@ case class ColumnarConditionProjectExec(
}
}

// For spark 3.2.
protected def withNewChildInternal(newChild: SparkPlan): ColumnarConditionProjectExec =
copy(child = newChild)
}

case class ColumnarUnionExec(children: Seq[SparkPlan]) extends SparkPlan {
Expand Down Expand Up @@ -308,6 +314,10 @@ case class ColumnarUnionExec(children: Seq[SparkPlan]) extends SparkPlan {
: org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = {
throw new UnsupportedOperationException(s"This operator doesn't support doExecute().")
}

// For spark 3.2.
protected def withNewChildrenInternal(newChildren: IndexedSeq[SparkPlan]): ColumnarUnionExec =
copy(children = newChildren)
}

//TODO(): consolidate locallimit and globallimit
Expand Down Expand Up @@ -380,6 +390,10 @@ case class ColumnarLocalLimitExec(limit: Int, child: SparkPlan) extends LimitExe
throw new UnsupportedOperationException(s"This operator doesn't support doExecute().")
}

protected def withNewChildInternal(newChild: SparkPlan):
ColumnarLocalLimitExec =
copy(child = newChild)

}

case class ColumnarGlobalLimitExec(limit: Int, child: SparkPlan) extends LimitExec {
Expand Down Expand Up @@ -451,4 +465,8 @@ case class ColumnarGlobalLimitExec(limit: Int, child: SparkPlan) extends LimitEx
: org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = {
throw new UnsupportedOperationException(s"This operator doesn't support doExecute().")
}

protected def withNewChildInternal(newChild: SparkPlan):
ColumnarGlobalLimitExec =
copy(child = newChild)
}
Loading

0 comments on commit 308cb58

Please sign in to comment.