From 4177292dcd65f77679e74365dc627a506812ddbb Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 11 Apr 2019 20:03:32 +0800 Subject: [PATCH 001/181] [SPARK-27435][SQL] Support schema pruning in ORC V2 ## What changes were proposed in this pull request? Currently, the optimization rule `SchemaPruning` only works for Parquet/Orc V1. We should have the same optimization in ORC V2. ## How was this patch tested? Unit test Closes #24338 from gengliangwang/schemaPruningForV2. Authored-by: Gengliang Wang Signed-off-by: Wenchen Fan --- .../execution/datasources/SchemaPruning.scala | 104 ++++++++++++++---- .../datasources/SchemaPruningSuite.scala | 4 +- ...te.scala => OrcV1SchemaPruningSuite.scala} | 2 +- .../orc/OrcV2SchemaPruningSuite.scala | 52 +++++++++ 4 files changed, 139 insertions(+), 23 deletions(-) rename sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/{OrcSchemaPruningSuite.scala => OrcV1SchemaPruningSuite.scala} (95%) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV2SchemaPruningSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaPruning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaPruning.scala index 3a37ca7c048e4..15fdf65ee8974 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaPruning.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaPruning.scala @@ -17,12 +17,15 @@ package org.apache.spark.sql.execution.datasources +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, FileTable} +import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType} @@ -48,7 +51,7 @@ object SchemaPruning extends Rule[LogicalPlan] { l @ LogicalRelation(hadoopFsRelation: HadoopFsRelation, _, _, _)) if canPruneRelation(hadoopFsRelation) => val (normalizedProjects, normalizedFilters) = - normalizeAttributeRefNames(l, projects, filters) + normalizeAttributeRefNames(l.output, projects, filters) val requestedRootFields = identifyRootFields(normalizedProjects, normalizedFilters) // If requestedRootFields includes a nested field, continue. Otherwise, @@ -76,6 +79,43 @@ object SchemaPruning extends Rule[LogicalPlan] { } else { op } + + case op @ PhysicalOperation(projects, filters, + d @ DataSourceV2Relation(table: FileTable, output, _)) if canPruneTable(table) => + val (normalizedProjects, normalizedFilters) = + normalizeAttributeRefNames(output, projects, filters) + val requestedRootFields = identifyRootFields(normalizedProjects, normalizedFilters) + + // If requestedRootFields includes a nested field, continue. Otherwise, + // return op + if (requestedRootFields.exists { root: RootField => !root.derivedFromAtt }) { + val dataSchema = table.dataSchema + val prunedDataSchema = pruneDataSchema(dataSchema, requestedRootFields) + + // If the data schema is different from the pruned data schema, continue. Otherwise, + // return op. We effect this comparison by counting the number of "leaf" fields in + // each schemata, assuming the fields in prunedDataSchema are a subset of the fields + // in dataSchema. + if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) { + val prunedFileTable = table match { + case o: OrcTable => o.copy(userSpecifiedSchema = Some(prunedDataSchema)) + case _ => + val message = s"${table.formatName} data source doesn't support schema pruning." + throw new AnalysisException(message) + } + + + val prunedRelationV2 = buildPrunedRelationV2(d, prunedFileTable) + val projectionOverSchema = ProjectionOverSchema(prunedDataSchema) + + buildNewProjection(normalizedProjects, normalizedFilters, prunedRelationV2, + projectionOverSchema) + } else { + op + } + } else { + op + } } /** @@ -85,16 +125,22 @@ object SchemaPruning extends Rule[LogicalPlan] { fsRelation.fileFormat.isInstanceOf[ParquetFileFormat] || fsRelation.fileFormat.isInstanceOf[OrcFileFormat] + /** + * Checks to see if the given [[FileTable]] can be pruned. Currently we support ORC v2. + */ + private def canPruneTable(table: FileTable) = + table.isInstanceOf[OrcTable] + /** * Normalizes the names of the attribute references in the given projects and filters to reflect * the names in the given logical relation. This makes it possible to compare attributes and * fields by name. Returns a tuple with the normalized projects and filters, respectively. */ private def normalizeAttributeRefNames( - logicalRelation: LogicalRelation, + output: Seq[AttributeReference], projects: Seq[NamedExpression], filters: Seq[Expression]): (Seq[NamedExpression], Seq[Expression]) = { - val normalizedAttNameMap = logicalRelation.output.map(att => (att.exprId, att.name)).toMap + val normalizedAttNameMap = output.map(att => (att.exprId, att.name)).toMap val normalizedProjects = projects.map(_.transform { case att: AttributeReference if normalizedAttNameMap.contains(att.exprId) => att.withName(normalizedAttNameMap(att.exprId)) @@ -107,11 +153,13 @@ object SchemaPruning extends Rule[LogicalPlan] { } /** - * Builds the new output [[Project]] Spark SQL operator that has the pruned output relation. + * Builds the new output [[Project]] Spark SQL operator that has the `leafNode`. */ private def buildNewProjection( - projects: Seq[NamedExpression], filters: Seq[Expression], prunedRelation: LogicalRelation, - projectionOverSchema: ProjectionOverSchema) = { + projects: Seq[NamedExpression], + filters: Seq[Expression], + leafNode: LeafNode, + projectionOverSchema: ProjectionOverSchema): Project = { // Construct a new target for our projection by rewriting and // including the original filters where available val projectionChild = @@ -120,9 +168,9 @@ object SchemaPruning extends Rule[LogicalPlan] { case projectionOverSchema(expr) => expr }) val newFilterCondition = projectedFilters.reduce(And) - Filter(newFilterCondition, prunedRelation) + Filter(newFilterCondition, leafNode) } else { - prunedRelation + leafNode } // Construct the new projections of our Project by @@ -145,20 +193,36 @@ object SchemaPruning extends Rule[LogicalPlan] { private def buildPrunedRelation( outputRelation: LogicalRelation, prunedBaseRelation: HadoopFsRelation) = { + val prunedOutput = getPrunedOutput(outputRelation.output, prunedBaseRelation.schema) + outputRelation.copy(relation = prunedBaseRelation, output = prunedOutput) + } + + /** + * Builds a pruned data source V2 relation from the output of the relation and the schema + * of the pruned [[FileTable]]. + */ + private def buildPrunedRelationV2( + outputRelation: DataSourceV2Relation, + prunedFileTable: FileTable) = { + val prunedOutput = getPrunedOutput(outputRelation.output, prunedFileTable.schema) + outputRelation.copy(table = prunedFileTable, output = prunedOutput) + } + + // Prune the given output to make it consistent with `requiredSchema`. + private def getPrunedOutput( + output: Seq[AttributeReference], + requiredSchema: StructType): Seq[AttributeReference] = { // We need to replace the expression ids of the pruned relation output attributes // with the expression ids of the original relation output attributes so that // references to the original relation's output are not broken - val outputIdMap = outputRelation.output.map(att => (att.name, att.exprId)).toMap - val prunedRelationOutput = - prunedBaseRelation - .schema - .toAttributes - .map { - case att if outputIdMap.contains(att.name) => - att.withExprId(outputIdMap(att.name)) - case att => att - } - outputRelation.copy(relation = prunedBaseRelation, output = prunedRelationOutput) + val outputIdMap = output.map(att => (att.name, att.exprId)).toMap + requiredSchema + .toAttributes + .map { + case att if outputIdMap.contains(att.name) => + att.withExprId(outputIdMap(att.name)) + case att => att + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala index 22317fe8d13ad..09ca42851836b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala @@ -407,7 +407,7 @@ abstract class SchemaPruningSuite } } - private val schemaEquality = new Equality[StructType] { + protected val schemaEquality = new Equality[StructType] { override def areEqual(a: StructType, b: Any): Boolean = b match { case otherType: StructType => a.sameType(otherType) @@ -422,7 +422,7 @@ abstract class SchemaPruningSuite df.collect() } - private def checkScanSchemata(df: DataFrame, expectedSchemaCatalogStrings: String*): Unit = { + protected def checkScanSchemata(df: DataFrame, expectedSchemaCatalogStrings: String*): Unit = { val fileSourceScanSchemata = df.queryExecution.executedPlan.collect { case scan: FileSourceScanExec => scan.requiredSchema diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV1SchemaPruningSuite.scala similarity index 95% rename from sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSchemaPruningSuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV1SchemaPruningSuite.scala index 2623bf9433681..832da59a78ed8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSchemaPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV1SchemaPruningSuite.scala @@ -21,7 +21,7 @@ import org.apache.spark.SparkConf import org.apache.spark.sql.execution.datasources.SchemaPruningSuite import org.apache.spark.sql.internal.SQLConf -class OrcSchemaPruningSuite extends SchemaPruningSuite { +class OrcV1SchemaPruningSuite extends SchemaPruningSuite { override protected val dataSourceName: String = "orc" override protected val vectorizedReaderEnabledKey: String = SQLConf.ORC_VECTORIZED_READER_ENABLED.key diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV2SchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV2SchemaPruningSuite.scala new file mode 100644 index 0000000000000..b042f7f9d7160 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV2SchemaPruningSuite.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.spark.SparkConf +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.execution.datasources.SchemaPruningSuite +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan +import org.apache.spark.sql.internal.SQLConf + +class OrcV2SchemaPruningSuite extends SchemaPruningSuite { + override protected val dataSourceName: String = "orc" + override protected val vectorizedReaderEnabledKey: String = + SQLConf.ORC_VECTORIZED_READER_ENABLED.key + + override protected def sparkConf: SparkConf = + super + .sparkConf + .set(SQLConf.USE_V1_SOURCE_READER_LIST, "") + + override def checkScanSchemata(df: DataFrame, expectedSchemaCatalogStrings: String*): Unit = { + val fileSourceScanSchemata = + df.queryExecution.executedPlan.collect { + case BatchScanExec(_, scan: OrcScan) => scan.readDataSchema + } + assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size, + s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " + + s"but expected $expectedSchemaCatalogStrings") + fileSourceScanSchemata.zip(expectedSchemaCatalogStrings).foreach { + case (scanSchema, expectedScanSchemaCatalogString) => + val expectedScanSchema = CatalystSqlParser.parseDataType(expectedScanSchemaCatalogString) + implicit val equality = schemaEquality + assert(scanSchema === expectedScanSchema) + } + } +} From d33ae2e9edda08313f5a430a6d162ad23e95a7cb Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 11 Apr 2019 07:58:57 -0700 Subject: [PATCH 002/181] [SPARK-26953][CORE][TEST] Disable result checking in the test: java.lang.ArrayIndexOutOfBoundsException in TimSort ## What changes were proposed in this pull request? I propose to disable (comment) result checking in `SorterSuite`.`java.lang.ArrayIndexOutOfBoundsException in TimSort` because: 1. The check is optional, and correctness of TimSort is checked by another tests. Purpose of the test is to check that TimSort doesn't fail with `ArrayIndexOutOfBoundsException`. 2. Significantly drops execution time of the test. Here are timing of running the test locally: ``` Sort: 1.4 seconds Result checking: 15.6 seconds ``` ## How was this patch tested? By `SorterSuite`. Closes #24343 from MaxGekk/timsort-test-speedup. Authored-by: Maxim Gekk Signed-off-by: Dongjoon Hyun --- .../scala/org/apache/spark/util/collection/SorterSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala index acd0b0e295551..e80bd96c982df 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala @@ -154,6 +154,7 @@ class SorterSuite extends SparkFunSuite with Logging { // The sort must finish without ArrayIndexOutOfBoundsException // The arrayToSort contains runLengths.length elements of 1, and others are 0. // Those 1 must be placed at the end of arrayToSort after sorting. + /* var i: Int = 0 sum = 0 val amountOfZeros = arrayToSort.length - runLengths.length @@ -169,6 +170,7 @@ class SorterSuite extends SparkFunSuite with Logging { i += 1 } while (i < sizeOfArrayToSort) assert(sum === runLengths.length) + */ } /** Runs an experiment several times. */ From 239082d9667a4fa4198bd9524d63c739df147e0e Mon Sep 17 00:00:00 2001 From: s71955 Date: Thu, 11 Apr 2019 08:53:00 -0700 Subject: [PATCH 003/181] [SPARK-27403][SQL] Fix `updateTableStats` to update table stats always with new stats or None ## What changes were proposed in this pull request? System shall update the table stats automatically if user set spark.sql.statistics.size.autoUpdate.enabled as true, currently this property is not having any significance even if it is enabled or disabled. This feature is similar to Hives auto-gather feature where statistics are automatically computed by default if this feature is enabled. Reference: https://cwiki.apache.org/confluence/display/Hive/StatsDev As part of fix , autoSizeUpdateEnabled validation is been done initially so that system will calculate the table size for the user automatically and record it in metastore as per user expectation. ## How was this patch tested? UT is written and manually verified in cluster. Tested with unit tests + some internal tests on real cluster. Before fix: ![image](https://user-images.githubusercontent.com/12999161/55688682-cd8d4780-5998-11e9-85da-e1a4e34419f6.png) After fix ![image](https://user-images.githubusercontent.com/12999161/55688654-7d15ea00-5998-11e9-973f-1f4cee27018f.png) Closes #24315 from sujith71955/master_autoupdate. Authored-by: s71955 Signed-off-by: Dongjoon Hyun --- .../sql/execution/command/CommandUtils.scala | 18 ++++++++--------- .../spark/sql/StatisticsCollectionSuite.scala | 20 +++++++++++++++++++ 2 files changed, 28 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index dea1e017b2e51..70e7cd9a1e40d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -42,16 +42,14 @@ object CommandUtils extends Logging { /** Change statistics after changing data by commands. */ def updateTableStats(sparkSession: SparkSession, table: CatalogTable): Unit = { - if (table.stats.nonEmpty) { - val catalog = sparkSession.sessionState.catalog - if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) { - val newTable = catalog.getTableMetadata(table.identifier) - val newSize = CommandUtils.calculateTotalSize(sparkSession, newTable) - val newStats = CatalogStatistics(sizeInBytes = newSize) - catalog.alterTableStats(table.identifier, Some(newStats)) - } else { - catalog.alterTableStats(table.identifier, None) - } + val catalog = sparkSession.sessionState.catalog + if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) { + val newTable = catalog.getTableMetadata(table.identifier) + val newSize = CommandUtils.calculateTotalSize(sparkSession, newTable) + val newStats = CatalogStatistics(sizeInBytes = newSize) + catalog.alterTableStats(table.identifier, Some(newStats)) + } else if (table.stats.nonEmpty) { + catalog.alterTableStats(table.identifier, None) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala index 90b3586712a8c..4e1e42488b580 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala @@ -337,6 +337,26 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } } + test("auto gather stats after insert command") { + val table = "change_stats_insert_datasource_table" + Seq(false, true).foreach { autoUpdate => + withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> autoUpdate.toString) { + withTable(table) { + sql(s"CREATE TABLE $table (i int, j string) USING PARQUET") + // insert into command + sql(s"INSERT INTO TABLE $table SELECT 1, 'abc'") + val stats = getCatalogTable(table).stats + if (autoUpdate) { + assert(stats.isDefined) + assert(stats.get.sizeInBytes >= 0) + } else { + assert(stats.isEmpty) + } + } + } + } + } + test("invalidation of tableRelationCache after inserts") { val table = "invalidate_catalog_cache_table" Seq(false, true).foreach { autoUpdate => From 43da473c1c49a99a038f558c64b5b34eb2fec764 Mon Sep 17 00:00:00 2001 From: maryannxue Date: Fri, 12 Apr 2019 00:14:37 +0800 Subject: [PATCH 004/181] [SPARK-27225][SQL] Implement join strategy hints ## What changes were proposed in this pull request? This PR extends the existing BROADCAST join hint (for both broadcast-hash join and broadcast-nested-loop join) by implementing other join strategy hints corresponding to the rest of Spark's existing join strategies: shuffle-hash, sort-merge, cartesian-product. The hint names: SHUFFLE_MERGE, SHUFFLE_HASH, SHUFFLE_REPLICATE_NL are partly different from the code names in order to make them clearer to users and reflect the actual algorithms better. The hinted strategy will be used for the join with which it is associated if it is applicable/doable. Conflict resolving rules in case of multiple hints: 1. Conflicts within either side of the join: take the first strategy hint specified in the query, or the top hint node in Dataset. For example, in "select /*+ merge(t1) */ /*+ broadcast(t1) */ k1, v2 from t1 join t2 on t1.k1 = t2.k2", take "merge(t1)"; in ```df1.hint("merge").hint("shuffle_hash").join(df2)```, take "shuffle_hash". This is a general hint conflict resolving strategy, not specific to join strategy hint. 2. Conflicts between two sides of the join: a) In case of different strategy hints, hints are prioritized as ```BROADCAST``` over ```SHUFFLE_MERGE``` over ```SHUFFLE_HASH``` over ```SHUFFLE_REPLICATE_NL```. b) In case of same strategy hints but conflicts in build side, choose the build side based on join type and size. ## How was this patch tested? Added new UTs. Closes #24164 from maryannxue/join-hints. Lead-authored-by: maryannxue Co-authored-by: Wenchen Fan Signed-off-by: Wenchen Fan --- docs/sql-performance-tuning.md | 24 +- .../sql/catalyst/analysis/Analyzer.scala | 2 +- .../sql/catalyst/analysis/ResolveHints.scala | 92 ++-- .../sql/catalyst/catalog/interface.scala | 2 +- .../optimizer/EliminateResolvedHint.scala | 52 ++- .../sql/catalyst/plans/logical/hints.scala | 95 ++++- .../catalyst/analysis/ResolveHintsSuite.scala | 47 ++- .../spark/sql/execution/SparkStrategies.scala | 366 +++++++++------- .../org/apache/spark/sql/functions.scala | 4 +- .../apache/spark/sql/CachedTableSuite.scala | 4 +- .../org/apache/spark/sql/JoinHintSuite.scala | 396 ++++++++++++++++-- .../execution/joins/BroadcastJoinSuite.scala | 5 +- 12 files changed, 837 insertions(+), 252 deletions(-) diff --git a/docs/sql-performance-tuning.md b/docs/sql-performance-tuning.md index 68569744afcc1..2a1edda84252c 100644 --- a/docs/sql-performance-tuning.md +++ b/docs/sql-performance-tuning.md @@ -107,14 +107,22 @@ that these options will be deprecated in future release as more optimizations ar -## Broadcast Hint for SQL Queries - -The `BROADCAST` hint guides Spark to broadcast each specified table when joining them with another table or view. -When Spark deciding the join methods, the broadcast hash join (i.e., BHJ) is preferred, -even if the statistics is above the configuration `spark.sql.autoBroadcastJoinThreshold`. -When both sides of a join are specified, Spark broadcasts the one having the lower statistics. -Note Spark does not guarantee BHJ is always chosen, since not all cases (e.g. full outer join) -support BHJ. When the broadcast nested loop join is selected, we still respect the hint. +## Join Strategy Hints for SQL Queries + +The join strategy hints, namely `BROADCAST`, `MERGE`, `SHUFFLE_HASH` and `SHUFFLE_REPLICATE_NL`, +instruct Spark to use the hinted strategy on each specified relation when joining them with another +relation. For example, when the `BROADCAST` hint is used on table 't1', broadcast join (either +broadcast hash join or broadcast nested loop join depending on whether there is any equi-join key) +with 't1' as the build side will be prioritized by Spark even if the size of table 't1' suggested +by the statistics is above the configuration `spark.sql.autoBroadcastJoinThreshold`. + +When different join strategy hints are specified on both sides of a join, Spark prioritizes the +`BROADCAST` hint over the `MERGE` hint over the `SHUFFLE_HASH` hint over the `SHUFFLE_REPLICATE_NL` +hint. When both sides are specified with the `BROADCAST` hint or the `SHUFFLE_HASH` hint, Spark will +pick the build side based on the join type and the sizes of the relations. + +Note that there is no guarantee that Spark will choose the join strategy specified in the hint since +a specific strategy may not support all join types.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 01e40e64a3e8d..02d83e7e8cb65 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -153,7 +153,7 @@ class Analyzer( lazy val batches: Seq[Batch] = Seq( Batch("Hints", fixedPoint, - new ResolveHints.ResolveBroadcastHints(conf), + new ResolveHints.ResolveJoinStrategyHints(conf), ResolveHints.ResolveCoalesceHints, ResolveHints.RemoveAllHints), Batch("Simple Sanity Check", Once, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala index dbd4ed845e329..9440a3f806b4e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.analysis import java.util.Locale +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.IntegerLiteral import org.apache.spark.sql.catalyst.plans.logical._ @@ -28,45 +30,66 @@ import org.apache.spark.sql.internal.SQLConf /** - * Collection of rules related to hints. The only hint currently available is broadcast join hint. + * Collection of rules related to hints. The only hint currently available is join strategy hint. * * Note that this is separately into two rules because in the future we might introduce new hint - * rules that have different ordering requirements from broadcast. + * rules that have different ordering requirements from join strategies. */ object ResolveHints { /** - * For broadcast hint, we accept "BROADCAST", "BROADCASTJOIN", and "MAPJOIN", and a sequence of - * relation aliases can be specified in the hint. A broadcast hint plan node will be inserted - * on top of any relation (that is not aliased differently), subquery, or common table expression - * that match the specified name. + * The list of allowed join strategy hints is defined in [[JoinStrategyHint.strategies]], and a + * sequence of relation aliases can be specified with a join strategy hint, e.g., "MERGE(a, c)", + * "BROADCAST(a)". A join strategy hint plan node will be inserted on top of any relation (that + * is not aliased differently), subquery, or common table expression that match the specified + * name. * * The hint resolution works by recursively traversing down the query plan to find a relation or - * subquery that matches one of the specified broadcast aliases. The traversal does not go past - * beyond any existing broadcast hints, subquery aliases. + * subquery that matches one of the specified relation aliases. The traversal does not go past + * beyond any view reference, with clause or subquery alias. * * This rule must happen before common table expressions. */ - class ResolveBroadcastHints(conf: SQLConf) extends Rule[LogicalPlan] { - private val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN", "MAPJOIN") + class ResolveJoinStrategyHints(conf: SQLConf) extends Rule[LogicalPlan] { + private val STRATEGY_HINT_NAMES = JoinStrategyHint.strategies.flatMap(_.hintAliases) def resolver: Resolver = conf.resolver - private def applyBroadcastHint(plan: LogicalPlan, toBroadcast: Set[String]): LogicalPlan = { + private def createHintInfo(hintName: String): HintInfo = { + HintInfo(strategy = + JoinStrategyHint.strategies.find( + _.hintAliases.map( + _.toUpperCase(Locale.ROOT)).contains(hintName.toUpperCase(Locale.ROOT)))) + } + + private def applyJoinStrategyHint( + plan: LogicalPlan, + relations: mutable.HashSet[String], + hintName: String): LogicalPlan = { // Whether to continue recursing down the tree var recurse = true val newNode = CurrentOrigin.withOrigin(plan.origin) { plan match { - case u: UnresolvedRelation if toBroadcast.exists(resolver(_, u.tableIdentifier.table)) => - ResolvedHint(plan, HintInfo(broadcast = true)) - case r: SubqueryAlias if toBroadcast.exists(resolver(_, r.alias)) => - ResolvedHint(plan, HintInfo(broadcast = true)) + case ResolvedHint(u: UnresolvedRelation, hint) + if relations.exists(resolver(_, u.tableIdentifier.table)) => + relations.remove(u.tableIdentifier.table) + ResolvedHint(u, createHintInfo(hintName).merge(hint, handleOverriddenHintInfo)) + case ResolvedHint(r: SubqueryAlias, hint) + if relations.exists(resolver(_, r.alias)) => + relations.remove(r.alias) + ResolvedHint(r, createHintInfo(hintName).merge(hint, handleOverriddenHintInfo)) + + case u: UnresolvedRelation if relations.exists(resolver(_, u.tableIdentifier.table)) => + relations.remove(u.tableIdentifier.table) + ResolvedHint(plan, createHintInfo(hintName)) + case r: SubqueryAlias if relations.exists(resolver(_, r.alias)) => + relations.remove(r.alias) + ResolvedHint(plan, createHintInfo(hintName)) case _: ResolvedHint | _: View | _: With | _: SubqueryAlias => // Don't traverse down these nodes. - // For an existing broadcast hint, there is no point going down (if we do, we either - // won't change the structure, or will introduce another broadcast hint that is useless. + // For an existing strategy hint, there is no chance for a match from this point down. // The rest (view, with, subquery) indicates different scopes that we shouldn't traverse // down. Note that technically when this rule is executed, we haven't completed view // resolution yet and as a result the view part should be deadcode. I'm leaving it here @@ -80,25 +103,38 @@ object ResolveHints { } if ((plan fastEquals newNode) && recurse) { - newNode.mapChildren(child => applyBroadcastHint(child, toBroadcast)) + newNode.mapChildren(child => applyJoinStrategyHint(child, relations, hintName)) } else { newNode } } + private def handleOverriddenHintInfo(hint: HintInfo): Unit = { + logWarning(s"Join hint $hint is overridden by another hint and will not take effect.") + } + def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp { - case h: UnresolvedHint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) => + case h: UnresolvedHint if STRATEGY_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) => if (h.parameters.isEmpty) { - // If there is no table alias specified, turn the entire subtree into a BroadcastHint. - ResolvedHint(h.child, HintInfo(broadcast = true)) + // If there is no table alias specified, apply the hint on the entire subtree. + ResolvedHint(h.child, createHintInfo(h.name)) } else { - // Otherwise, find within the subtree query plans that should be broadcasted. - applyBroadcastHint(h.child, h.parameters.map { + // Otherwise, find within the subtree query plans to apply the hint. + val relationNames = h.parameters.map { case tableName: String => tableName case tableId: UnresolvedAttribute => tableId.name - case unsupported => throw new AnalysisException("Broadcast hint parameter should be " + - s"an identifier or string but was $unsupported (${unsupported.getClass}") - }.toSet) + case unsupported => throw new AnalysisException("Join strategy hint parameter " + + s"should be an identifier or string but was $unsupported (${unsupported.getClass}") + } + val relationNameSet = new mutable.HashSet[String] + relationNames.foreach(relationNameSet.add) + + val applied = applyJoinStrategyHint(h.child, relationNameSet, h.name) + relationNameSet.foreach { n => + logWarning(s"Count not find relation '$n' for join strategy hint " + + s"'${h.name}${relationNames.mkString("(", ", ", ")")}'.") + } + applied } } } @@ -135,7 +171,9 @@ object ResolveHints { */ object RemoveAllHints extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp { - case h: UnresolvedHint => h.child + case h: UnresolvedHint => + logWarning(s"Unrecognized hint: ${h.name}${h.parameters.mkString("(", ", ", ")")}") + h.child } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 60066371e3dc2..2d646721f87a2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -374,7 +374,7 @@ object CatalogTable { /** * This class of statistics is used in [[CatalogTable]] to interact with metastore. * We define this new class instead of directly using [[Statistics]] here because there are no - * concepts of attributes or broadcast hint in catalog. + * concepts of attributes in catalog. */ case class CatalogStatistics( sizeInBytes: BigInt, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/EliminateResolvedHint.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/EliminateResolvedHint.scala index a136f0493699e..5586690520c28 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/EliminateResolvedHint.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/EliminateResolvedHint.scala @@ -30,30 +30,58 @@ object EliminateResolvedHint extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = { val pulledUp = plan transformUp { case j: Join => - val leftHint = mergeHints(collectHints(j.left)) - val rightHint = mergeHints(collectHints(j.right)) - j.copy(hint = JoinHint(leftHint, rightHint)) + val (newLeft, leftHints) = extractHintsFromPlan(j.left) + val (newRight, rightHints) = extractHintsFromPlan(j.right) + val newJoinHint = JoinHint(mergeHints(leftHints), mergeHints(rightHints)) + j.copy(left = newLeft, right = newRight, hint = newJoinHint) } pulledUp.transformUp { - case h: ResolvedHint => h.child + case h: ResolvedHint => + handleInvalidHintInfo(h.hints) + h.child } } + /** + * Combine a list of [[HintInfo]]s into one [[HintInfo]]. + */ private def mergeHints(hints: Seq[HintInfo]): Option[HintInfo] = { - hints.reduceOption((h1, h2) => HintInfo( - broadcast = h1.broadcast || h2.broadcast)) + hints.reduceOption((h1, h2) => h1.merge(h2, handleOverriddenHintInfo)) } - private def collectHints(plan: LogicalPlan): Seq[HintInfo] = { + /** + * Extract all hints from the plan, returning a list of extracted hints and the transformed plan + * with [[ResolvedHint]] nodes removed. The returned hint list comes in top-down order. + * Note that hints can only be extracted from under certain nodes. Those that cannot be extracted + * in this method will be cleaned up later by this rule, and may emit warnings depending on the + * configurations. + */ + private def extractHintsFromPlan(plan: LogicalPlan): (LogicalPlan, Seq[HintInfo]) = { plan match { - case h: ResolvedHint => collectHints(h.child) :+ h.hints - case u: UnaryNode => collectHints(u.child) + case h: ResolvedHint => + val (plan, hints) = extractHintsFromPlan(h.child) + (plan, h.hints +: hints) + case u: UnaryNode => + val (plan, hints) = extractHintsFromPlan(u.child) + (u.withNewChildren(Seq(plan)), hints) // TODO revisit this logic: // except and intersect are semi/anti-joins which won't return more data then // their left argument, so the broadcast hint should be propagated here - case i: Intersect => collectHints(i.left) - case e: Except => collectHints(e.left) - case _ => Seq.empty + case i: Intersect => + val (plan, hints) = extractHintsFromPlan(i.left) + (i.copy(left = plan), hints) + case e: Except => + val (plan, hints) = extractHintsFromPlan(e.left) + (e.copy(left = plan), hints) + case p: LogicalPlan => (p, Seq.empty) } } + + private def handleInvalidHintInfo(hint: HintInfo): Unit = { + logWarning(s"A join hint $hint is specified but it is not part of a join relation.") + } + + private def handleOverriddenHintInfo(hint: HintInfo): Unit = { + logWarning(s"Join hint $hint is overridden by another hint and will not take effect.") + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala index b2ba725e9d44f..870dd87d8a360 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala @@ -66,17 +66,94 @@ object JoinHint { /** * The hint attributes to be applied on a specific node. * - * @param broadcast If set to true, it indicates that the broadcast hash join is the preferred join - * strategy and the node with this hint is preferred to be the build side. + * @param strategy The preferred join strategy. */ -case class HintInfo(broadcast: Boolean = false) { +case class HintInfo(strategy: Option[JoinStrategyHint] = None) { - override def toString: String = { - val hints = scala.collection.mutable.ArrayBuffer.empty[String] - if (broadcast) { - hints += "broadcast" + /** + * Combine this [[HintInfo]] with another [[HintInfo]] and return the new [[HintInfo]]. + * @param other the other [[HintInfo]] + * @param hintOverriddenCallback a callback to notify if any [[HintInfo]] has been overridden + * in this merge. + * + * Currently, for join strategy hints, the new [[HintInfo]] will contain the strategy in this + * [[HintInfo]] if defined, otherwise the strategy in the other [[HintInfo]]. The + * `hintOverriddenCallback` will be called if this [[HintInfo]] and the other [[HintInfo]] + * both have a strategy defined but the join strategies are different. + */ + def merge(other: HintInfo, hintOverriddenCallback: HintInfo => Unit): HintInfo = { + if (this.strategy.isDefined && + other.strategy.isDefined && + this.strategy.get != other.strategy.get) { + hintOverriddenCallback(other) } - - if (hints.isEmpty) "none" else hints.mkString("(", ", ", ")") + HintInfo(strategy = this.strategy.orElse(other.strategy)) } + + override def toString: String = strategy.map(s => s"(strategy=$s)").getOrElse("none") +} + +sealed abstract class JoinStrategyHint { + + def displayName: String + def hintAliases: Set[String] + + override def toString: String = displayName +} + +/** + * The enumeration of join strategy hints. + * + * The hinted strategy will be used for the join with which it is associated if doable. In case + * of contradicting strategy hints specified for each side of the join, hints are prioritized as + * BROADCAST over SHUFFLE_MERGE over SHUFFLE_HASH over SHUFFLE_REPLICATE_NL. + */ +object JoinStrategyHint { + + val strategies: Set[JoinStrategyHint] = Set( + BROADCAST, + SHUFFLE_MERGE, + SHUFFLE_HASH, + SHUFFLE_REPLICATE_NL) +} + +/** + * The hint for broadcast hash join or broadcast nested loop join, depending on the availability of + * equi-join keys. + */ +case object BROADCAST extends JoinStrategyHint { + override def displayName: String = "broadcast" + override def hintAliases: Set[String] = Set( + "BROADCAST", + "BROADCASTJOIN", + "MAPJOIN") +} + +/** + * The hint for shuffle sort merge join. + */ +case object SHUFFLE_MERGE extends JoinStrategyHint { + override def displayName: String = "merge" + override def hintAliases: Set[String] = Set( + "SHUFFLE_MERGE", + "MERGE", + "MERGEJOIN") +} + +/** + * The hint for shuffle hash join. + */ +case object SHUFFLE_HASH extends JoinStrategyHint { + override def displayName: String = "shuffle_hash" + override def hintAliases: Set[String] = Set( + "SHUFFLE_HASH") +} + +/** + * The hint for shuffle-and-replicate nested loop join, a.k.a. cartesian product join. + */ +case object SHUFFLE_REPLICATE_NL extends JoinStrategyHint { + override def displayName: String = "shuffle_replicate_nl" + override def hintAliases: Set[String] = Set( + "SHUFFLE_REPLICATE_NL") } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala index 563e8adf87edc..474e58a335e7c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala @@ -17,6 +17,11 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable.ArrayBuffer + +import org.apache.log4j.{AppenderSkeleton, Level} +import org.apache.log4j.spi.LoggingEvent + import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions.Literal @@ -27,6 +32,14 @@ import org.apache.spark.sql.catalyst.plans.logical._ class ResolveHintsSuite extends AnalysisTest { import org.apache.spark.sql.catalyst.analysis.TestRelations._ + class MockAppender extends AppenderSkeleton { + val loggingEvents = new ArrayBuffer[LoggingEvent]() + + override def append(loggingEvent: LoggingEvent): Unit = loggingEvents.append(loggingEvent) + override def close(): Unit = {} + override def requiresLayout(): Boolean = false + } + test("invalid hints should be ignored") { checkAnalysis( UnresolvedHint("some_random_hint_that_does_not_exist", Seq("TaBlE"), table("TaBlE")), @@ -37,17 +50,17 @@ class ResolveHintsSuite extends AnalysisTest { test("case-sensitive or insensitive parameters") { checkAnalysis( UnresolvedHint("MAPJOIN", Seq("TaBlE"), table("TaBlE")), - ResolvedHint(testRelation, HintInfo(broadcast = true)), + ResolvedHint(testRelation, HintInfo(strategy = Some(BROADCAST))), caseSensitive = false) checkAnalysis( UnresolvedHint("MAPJOIN", Seq("table"), table("TaBlE")), - ResolvedHint(testRelation, HintInfo(broadcast = true)), + ResolvedHint(testRelation, HintInfo(strategy = Some(BROADCAST))), caseSensitive = false) checkAnalysis( UnresolvedHint("MAPJOIN", Seq("TaBlE"), table("TaBlE")), - ResolvedHint(testRelation, HintInfo(broadcast = true)), + ResolvedHint(testRelation, HintInfo(strategy = Some(BROADCAST))), caseSensitive = true) checkAnalysis( @@ -59,28 +72,29 @@ class ResolveHintsSuite extends AnalysisTest { test("multiple broadcast hint aliases") { checkAnalysis( UnresolvedHint("MAPJOIN", Seq("table", "table2"), table("table").join(table("table2"))), - Join(ResolvedHint(testRelation, HintInfo(broadcast = true)), - ResolvedHint(testRelation2, HintInfo(broadcast = true)), Inner, None, JoinHint.NONE), + Join(ResolvedHint(testRelation, HintInfo(strategy = Some(BROADCAST))), + ResolvedHint(testRelation2, HintInfo(strategy = Some(BROADCAST))), + Inner, None, JoinHint.NONE), caseSensitive = false) } test("do not traverse past existing broadcast hints") { checkAnalysis( UnresolvedHint("MAPJOIN", Seq("table"), - ResolvedHint(table("table").where('a > 1), HintInfo(broadcast = true))), - ResolvedHint(testRelation.where('a > 1), HintInfo(broadcast = true)).analyze, + ResolvedHint(table("table").where('a > 1), HintInfo(strategy = Some(BROADCAST)))), + ResolvedHint(testRelation.where('a > 1), HintInfo(strategy = Some(BROADCAST))).analyze, caseSensitive = false) } test("should work for subqueries") { checkAnalysis( UnresolvedHint("MAPJOIN", Seq("tableAlias"), table("table").as("tableAlias")), - ResolvedHint(testRelation, HintInfo(broadcast = true)), + ResolvedHint(testRelation, HintInfo(strategy = Some(BROADCAST))), caseSensitive = false) checkAnalysis( UnresolvedHint("MAPJOIN", Seq("tableAlias"), table("table").subquery('tableAlias)), - ResolvedHint(testRelation, HintInfo(broadcast = true)), + ResolvedHint(testRelation, HintInfo(strategy = Some(BROADCAST))), caseSensitive = false) // Negative case: if the alias doesn't match, don't match the original table name. @@ -105,7 +119,7 @@ class ResolveHintsSuite extends AnalysisTest { |SELECT /*+ BROADCAST(ctetable) */ * FROM ctetable """.stripMargin ), - ResolvedHint(testRelation.where('a > 1).select('a), HintInfo(broadcast = true)) + ResolvedHint(testRelation.where('a > 1).select('a), HintInfo(strategy = Some(BROADCAST))) .select('a).analyze, caseSensitive = false) } @@ -155,4 +169,17 @@ class ResolveHintsSuite extends AnalysisTest { UnresolvedHint("REPARTITION", Seq(Literal(true)), table("TaBlE")), Seq(errMsgRepa)) } + + test("log warnings for invalid hints") { + val logAppender = new MockAppender() + withLogAppender(logAppender) { + checkAnalysis( + UnresolvedHint("unknown_hint", Seq("TaBlE"), table("TaBlE")), + testRelation, + caseSensitive = false) + } + assert(logAppender.loggingEvents.exists( + e => e.getLevel == Level.WARN && + e.getRenderedMessage.contains("Unrecognized hint: unknown_hint"))) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index bf38189c2fb54..efd05a3e2b3e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -90,61 +90,35 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } /** - * Select the proper physical plan for join based on joining keys and size of logical plan. - * - * At first, uses the [[ExtractEquiJoinKeys]] pattern to find joins where at least some of the - * predicates can be evaluated by matching join keys. If found, join implementations are chosen - * with the following precedence: + * Select the proper physical plan for join based on join strategy hints, the availability of + * equi-join keys and the sizes of joining relations. Below are the existing join strategies, + * their characteristics and their limitations. * * - Broadcast hash join (BHJ): - * BHJ is not supported for full outer join. For right outer join, we only can broadcast the - * left side. For left outer, left semi, left anti and the internal join type ExistenceJoin, - * we only can broadcast the right side. For inner like join, we can broadcast both sides. - * Normally, BHJ can perform faster than the other join algorithms when the broadcast side is - * small. However, broadcasting tables is a network-intensive operation. It could cause OOM - * or perform worse than the other join algorithms, especially when the build/broadcast side - * is big. - * - * For the supported cases, users can specify the broadcast hint (e.g. the user applied the - * [[org.apache.spark.sql.functions.broadcast()]] function to a DataFrame) and session-based - * [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold to adjust whether BHJ is used and - * which join side is broadcast. - * - * 1) Broadcast the join side with the broadcast hint, even if the size is larger than - * [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]]. If both sides have the hint (only when the type - * is inner like join), the side with a smaller estimated physical size will be broadcast. - * 2) Respect the [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold and broadcast the side - * whose estimated physical size is smaller than the threshold. If both sides are below the - * threshold, broadcast the smaller side. If neither is smaller, BHJ is not used. - * - * - Shuffle hash join: if the average size of a single partition is small enough to build a hash - * table. - * - * - Sort merge: if the matching join keys are sortable. - * - * If there is no joining keys, Join implementations are chosen with the following precedence: - * - BroadcastNestedLoopJoin (BNLJ): - * BNLJ supports all the join types but the impl is OPTIMIZED for the following scenarios: - * For right outer join, the left side is broadcast. For left outer, left semi, left anti - * and the internal join type ExistenceJoin, the right side is broadcast. For inner like - * joins, either side is broadcast. + * Only supported for equi-joins, while the join keys do not need to be sortable. + * Supported for all join types except full outer joins. + * BHJ usually performs faster than the other join algorithms when the broadcast side is + * small. However, broadcasting tables is a network-intensive operation and it could cause + * OOM or perform badly in some cases, especially when the build/broadcast side is big. * - * Like BHJ, users still can specify the broadcast hint and session-based - * [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold to impact which side is broadcast. + * - Shuffle hash join: + * Only supported for equi-joins, while the join keys do not need to be sortable. + * Supported for all join types except full outer joins. * - * 1) Broadcast the join side with the broadcast hint, even if the size is larger than - * [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]]. If both sides have the hint (i.e., just for - * inner-like join), the side with a smaller estimated physical size will be broadcast. - * 2) Respect the [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold and broadcast the side - * whose estimated physical size is smaller than the threshold. If both sides are below the - * threshold, broadcast the smaller side. If neither is smaller, BNLJ is not used. + * - Shuffle sort merge join (SMJ): + * Only supported for equi-joins and the join keys have to be sortable. + * Supported for all join types. * - * - CartesianProduct: for inner like join, CartesianProduct is the fallback option. + * - Broadcast nested loop join (BNLJ): + * Supports both equi-joins and non-equi-joins. + * Supports all the join types, but the implementation is optimized for: + * 1) broadcasting the left side in a right outer join; + * 2) broadcasting the right side in a left outer, left semi, left anti or existence join; + * 3) broadcasting either side in an inner-like join. * - * - BroadcastNestedLoopJoin (BNLJ): - * For the other join types, BNLJ is the fallback option. Here, we just pick the broadcast - * side with the broadcast hint. If neither side has a hint, we broadcast the side with - * the smaller estimated physical size. + * - Shuffle-and-replicate nested loop join (a.k.a. cartesian product join): + * Supports both equi-joins and non-equi-joins. + * Supports only inner like joins. */ object JoinSelection extends Strategy with PredicateHelper { @@ -186,126 +160,218 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case _ => false } - private def broadcastSide( - canBuildLeft: Boolean, - canBuildRight: Boolean, + private def getBuildSide( + wantToBuildLeft: Boolean, + wantToBuildRight: Boolean, left: LogicalPlan, - right: LogicalPlan): BuildSide = { - - def smallerSide = - if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight else BuildLeft - - if (canBuildRight && canBuildLeft) { - // Broadcast smaller side base on its estimated physical size - // if both sides have broadcast hint - smallerSide - } else if (canBuildRight) { - BuildRight - } else if (canBuildLeft) { - BuildLeft + right: LogicalPlan): Option[BuildSide] = { + if (wantToBuildLeft && wantToBuildRight) { + // returns the smaller side base on its estimated physical size, if we want to build the + // both sides. + Some(getSmallerSide(left, right)) + } else if (wantToBuildLeft) { + Some(BuildLeft) + } else if (wantToBuildRight) { + Some(BuildRight) } else { - // for the last default broadcast nested loop join - smallerSide + None } } - private def canBroadcastByHints( - joinType: JoinType, left: LogicalPlan, right: LogicalPlan, hint: JoinHint): Boolean = { - val buildLeft = canBuildLeft(joinType) && hint.leftHint.exists(_.broadcast) - val buildRight = canBuildRight(joinType) && hint.rightHint.exists(_.broadcast) - buildLeft || buildRight + private def getSmallerSide(left: LogicalPlan, right: LogicalPlan) = { + if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight else BuildLeft } - private def broadcastSideByHints( - joinType: JoinType, left: LogicalPlan, right: LogicalPlan, hint: JoinHint): BuildSide = { - val buildLeft = canBuildLeft(joinType) && hint.leftHint.exists(_.broadcast) - val buildRight = canBuildRight(joinType) && hint.rightHint.exists(_.broadcast) - broadcastSide(buildLeft, buildRight, left, right) + private def hintToBroadcastLeft(hint: JoinHint): Boolean = { + hint.leftHint.exists(_.strategy.contains(BROADCAST)) } - private def canBroadcastBySizes(joinType: JoinType, left: LogicalPlan, right: LogicalPlan) - : Boolean = { - val buildLeft = canBuildLeft(joinType) && canBroadcast(left) - val buildRight = canBuildRight(joinType) && canBroadcast(right) - buildLeft || buildRight + private def hintToBroadcastRight(hint: JoinHint): Boolean = { + hint.rightHint.exists(_.strategy.contains(BROADCAST)) } - private def broadcastSideBySizes(joinType: JoinType, left: LogicalPlan, right: LogicalPlan) - : BuildSide = { - val buildLeft = canBuildLeft(joinType) && canBroadcast(left) - val buildRight = canBuildRight(joinType) && canBroadcast(right) - broadcastSide(buildLeft, buildRight, left, right) + private def hintToShuffleHashLeft(hint: JoinHint): Boolean = { + hint.leftHint.exists(_.strategy.contains(SHUFFLE_HASH)) + } + + private def hintToShuffleHashRight(hint: JoinHint): Boolean = { + hint.rightHint.exists(_.strategy.contains(SHUFFLE_HASH)) + } + + private def hintToSortMergeJoin(hint: JoinHint): Boolean = { + hint.leftHint.exists(_.strategy.contains(SHUFFLE_MERGE)) || + hint.rightHint.exists(_.strategy.contains(SHUFFLE_MERGE)) + } + + private def hintToShuffleReplicateNL(hint: JoinHint): Boolean = { + hint.leftHint.exists(_.strategy.contains(SHUFFLE_REPLICATE_NL)) || + hint.rightHint.exists(_.strategy.contains(SHUFFLE_REPLICATE_NL)) } def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - // --- BroadcastHashJoin -------------------------------------------------------------------- - - // broadcast hints were specified - case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right, hint) - if canBroadcastByHints(joinType, left, right, hint) => - val buildSide = broadcastSideByHints(joinType, left, right, hint) - Seq(joins.BroadcastHashJoinExec( - leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right))) - - // broadcast hints were not specified, so need to infer it from size and configuration. - case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right, _) - if canBroadcastBySizes(joinType, left, right) => - val buildSide = broadcastSideBySizes(joinType, left, right) - Seq(joins.BroadcastHashJoinExec( - leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right))) - - // --- ShuffledHashJoin --------------------------------------------------------------------- - - case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right, _) - if !conf.preferSortMergeJoin && canBuildRight(joinType) && canBuildLocalHashMap(right) - && muchSmaller(right, left) || - !RowOrdering.isOrderable(leftKeys) => - Seq(joins.ShuffledHashJoinExec( - leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left), planLater(right))) - - case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right, _) - if !conf.preferSortMergeJoin && canBuildLeft(joinType) && canBuildLocalHashMap(left) - && muchSmaller(left, right) || - !RowOrdering.isOrderable(leftKeys) => - Seq(joins.ShuffledHashJoinExec( - leftKeys, rightKeys, joinType, BuildLeft, condition, planLater(left), planLater(right))) - - // --- SortMergeJoin ------------------------------------------------------------ - - case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right, _) - if RowOrdering.isOrderable(leftKeys) => - joins.SortMergeJoinExec( - leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil - - // --- Without joining keys ------------------------------------------------------------ - - // Pick BroadcastNestedLoopJoin if one side could be broadcast - case j @ logical.Join(left, right, joinType, condition, hint) - if canBroadcastByHints(joinType, left, right, hint) => - val buildSide = broadcastSideByHints(joinType, left, right, hint) - joins.BroadcastNestedLoopJoinExec( - planLater(left), planLater(right), buildSide, joinType, condition) :: Nil - - case j @ logical.Join(left, right, joinType, condition, _) - if canBroadcastBySizes(joinType, left, right) => - val buildSide = broadcastSideBySizes(joinType, left, right) - joins.BroadcastNestedLoopJoinExec( - planLater(left), planLater(right), buildSide, joinType, condition) :: Nil - - // Pick CartesianProduct for InnerJoin - case logical.Join(left, right, _: InnerLike, condition, _) => - joins.CartesianProductExec(planLater(left), planLater(right), condition) :: Nil + // If it is an equi-join, we first look at the join hints w.r.t. the following order: + // 1. broadcast hint: pick broadcast hash join if the join type is supported. If both sides + // have the broadcast hints, choose the smaller side (based on stats) to broadcast. + // 2. sort merge hint: pick sort merge join if join keys are sortable. + // 3. shuffle hash hint: We pick shuffle hash join if the join type is supported. If both + // sides have the shuffle hash hints, choose the smaller side (based on stats) as the + // build side. + // 4. shuffle replicate NL hint: pick cartesian product if join type is inner like. + // + // If there is no hint or the hints are not applicable, we follow these rules one by one: + // 1. Pick broadcast hash join if one side is small enough to broadcast, and the join type + // is supported. If both sides are small, choose the smaller side (based on stats) + // to broadcast. + // 2. Pick shuffle hash join if one side is small enough to build local hash map, and is + // much smaller than the other side, and `spark.sql.join.preferSortMergeJoin` is false. + // 3. Pick sort merge join if the join keys are sortable. + // 4. Pick cartesian product if join type is inner like. + // 5. Pick broadcast nested loop join as the final solution. It may OOM but we don't have + // other choice. + case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right, hint) => + def createBroadcastHashJoin(buildLeft: Boolean, buildRight: Boolean) = { + val wantToBuildLeft = canBuildLeft(joinType) && buildLeft + val wantToBuildRight = canBuildRight(joinType) && buildRight + getBuildSide(wantToBuildLeft, wantToBuildRight, left, right).map { buildSide => + Seq(joins.BroadcastHashJoinExec( + leftKeys, + rightKeys, + joinType, + buildSide, + condition, + planLater(left), + planLater(right))) + } + } + + def createShuffleHashJoin(buildLeft: Boolean, buildRight: Boolean) = { + val wantToBuildLeft = canBuildLeft(joinType) && buildLeft + val wantToBuildRight = canBuildRight(joinType) && buildRight + getBuildSide(wantToBuildLeft, wantToBuildRight, left, right).map { buildSide => + Seq(joins.ShuffledHashJoinExec( + leftKeys, + rightKeys, + joinType, + buildSide, + condition, + planLater(left), + planLater(right))) + } + } + + def createSortMergeJoin() = { + if (RowOrdering.isOrderable(leftKeys)) { + Some(Seq(joins.SortMergeJoinExec( + leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)))) + } else { + None + } + } + + def createCartesianProduct() = { + if (joinType.isInstanceOf[InnerLike]) { + Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), condition))) + } else { + None + } + } + + def createJoinWithoutHint() = { + createBroadcastHashJoin(canBroadcast(left), canBroadcast(right)) + .orElse { + if (!conf.preferSortMergeJoin) { + createShuffleHashJoin( + canBuildLocalHashMap(left) && muchSmaller(left, right), + canBuildLocalHashMap(right) && muchSmaller(right, left)) + } else { + None + } + } + .orElse(createSortMergeJoin()) + .orElse(createCartesianProduct()) + .getOrElse { + // This join could be very slow or OOM + val buildSide = getSmallerSide(left, right) + Seq(joins.BroadcastNestedLoopJoinExec( + planLater(left), planLater(right), buildSide, joinType, condition)) + } + } + createBroadcastHashJoin(hintToBroadcastLeft(hint), hintToBroadcastRight(hint)) + .orElse { if (hintToSortMergeJoin(hint)) createSortMergeJoin() else None } + .orElse(createShuffleHashJoin(hintToShuffleHashLeft(hint), hintToShuffleHashRight(hint))) + .orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None } + .getOrElse(createJoinWithoutHint()) + + // If it is not an equi-join, we first look at the join hints w.r.t. the following order: + // 1. broadcast hint: pick broadcast nested loop join. If both sides have the broadcast + // hints, choose the smaller side (based on stats) to broadcast. + // 2. shuffle replicate NL hint: pick cartesian product if join type is inner like. + // + // If there is no hint or the hints are not applicable, we follow these rules one by one: + // 1. Pick cartesian product if join type is inner like, and both sides are too big to + // to broadcast. + // 2. Pick broadcast nested loop join. Pick the smaller side (based on stats) to broadcast. case logical.Join(left, right, joinType, condition, hint) => - val buildSide = broadcastSide( - hint.leftHint.exists(_.broadcast), hint.rightHint.exists(_.broadcast), left, right) - // This join could be very slow or OOM - joins.BroadcastNestedLoopJoinExec( - planLater(left), planLater(right), buildSide, joinType, condition) :: Nil + def createBroadcastNLJoin(buildLeft: Boolean, buildRight: Boolean) = { + getBuildSide(buildLeft, buildRight, left, right).map { buildSide => + Seq(joins.BroadcastNestedLoopJoinExec( + planLater(left), planLater(right), buildSide, joinType, condition)) + } + } - // --- Cases where this strategy does not apply --------------------------------------------- + def createCartesianProduct() = { + if (joinType.isInstanceOf[InnerLike]) { + Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), condition))) + } else { + None + } + } + + def createJoinWithoutHint() = { + (if (!canBroadcast(left) && !canBroadcast(right)) createCartesianProduct() else None) + .getOrElse { + // This join could be very slow or OOM + val buildSide = getSmallerSide(left, right) + Seq(joins.BroadcastNestedLoopJoinExec( + planLater(left), planLater(right), buildSide, joinType, condition)) + } + } + + if (joinType.isInstanceOf[InnerLike] || joinType == FullOuter) { + createBroadcastNLJoin(hintToBroadcastLeft(hint), hintToBroadcastRight(hint)) + .orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None } + .getOrElse(createJoinWithoutHint()) + } else { + val smallerSide = getSmallerSide(left, right) + val buildSide = if (canBuildLeft(joinType)) { + // For RIGHT JOIN, we may broadcast left side even if the hint asks us to broadcast + // the right side. This is for history reasons. + if (hintToBroadcastLeft(hint) || canBroadcast(left)) { + BuildLeft + } else if (hintToBroadcastRight(hint)) { + BuildRight + } else { + smallerSide + } + } else { + // For LEFT JOIN, we may broadcast right side even if the hint asks us to broadcast + // the left side. This is for history reasons. + if (hintToBroadcastRight(hint) || canBroadcast(right)) { + BuildRight + } else if (hintToBroadcastLeft(hint)) { + BuildLeft + } else { + smallerSide + } + } + Seq(joins.BroadcastNestedLoopJoinExec( + planLater(left), planLater(right), buildSide, joinType, condition)) + } + + // --- Cases where this strategy does not apply --------------------------------------------- case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index bcb57836021e3..7ac3ed5a44f08 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.analysis.{Star, UnresolvedFunction} import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ -import org.apache.spark.sql.catalyst.plans.logical.{HintInfo, ResolvedHint} +import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, ResolvedHint} import org.apache.spark.sql.execution.SparkSqlParser import org.apache.spark.sql.expressions.{SparkUserDefinedFunction, UserDefinedFunction} import org.apache.spark.sql.internal.SQLConf @@ -1045,7 +1045,7 @@ object functions { */ def broadcast[T](df: Dataset[T]): Dataset[T] = { Dataset[T](df.sparkSession, - ResolvedHint(df.logicalPlan, HintInfo(broadcast = true)))(df.exprEnc) + ResolvedHint(df.logicalPlan, HintInfo(strategy = Some(BROADCAST))))(df.exprEnc) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 4d63390fb4524..92157d8ad49ed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.executor.DataReadMethod.DataReadMethod import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.SubqueryExpression -import org.apache.spark.sql.catalyst.plans.logical.Join +import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, Join} import org.apache.spark.sql.execution.{RDDScanExec, SparkPlan} import org.apache.spark.sql.execution.columnar._ import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec @@ -951,7 +951,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext case Join(_, _, _, _, hint) => hint } assert(hint.size == 1) - assert(hint(0).leftHint.get.broadcast) + assert(hint(0).leftHint.get.strategy.contains(BROADCAST)) assert(hint(0).rightHint.isEmpty) // Clean-up diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala index 67f0f1a6fd23d..9c2dc0c62b2ff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala @@ -17,8 +17,14 @@ package org.apache.spark.sql +import scala.collection.mutable.ArrayBuffer + +import org.apache.log4j.{AppenderSkeleton, Level} +import org.apache.log4j.spi.LoggingEvent + import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -30,6 +36,41 @@ class JoinHintSuite extends PlanTest with SharedSQLContext { lazy val df2 = df.selectExpr("id as b1", "id as b2") lazy val df3 = df.selectExpr("id as c1", "id as c2") + class MockAppender extends AppenderSkeleton { + val loggingEvents = new ArrayBuffer[LoggingEvent]() + + override def append(loggingEvent: LoggingEvent): Unit = loggingEvents.append(loggingEvent) + override def close(): Unit = {} + override def requiresLayout(): Boolean = false + } + + def msgNoHintRelationFound(relation: String, hint: String): String = + s"Count not find relation '$relation' for join strategy hint '$hint'." + + def msgNoJoinForJoinHint(strategy: String): String = + s"A join hint (strategy=$strategy) is specified but it is not part of a join relation." + + def msgJoinHintOverridden(strategy: String): String = + s"Join hint (strategy=$strategy) is overridden by another hint and will not take effect." + + def verifyJoinHintWithWarnings( + df: => DataFrame, + expectedHints: Seq[JoinHint], + warnings: Seq[String]): Unit = { + val logAppender = new MockAppender() + withLogAppender(logAppender) { + verifyJoinHint(df, expectedHints) + } + val warningMessages = logAppender.loggingEvents + .filter(_.getLevel == Level.WARN) + .map(_.getRenderedMessage) + .filter(_.contains("hint")) + assert(warningMessages.size == warnings.size) + warnings.foreach { w => + assert(warningMessages.contains(w)) + } + } + def verifyJoinHint(df: DataFrame, expectedHints: Seq[JoinHint]): Unit = { val optimized = df.queryExecution.optimizedPlan val joinHints = optimized collect { @@ -43,14 +84,14 @@ class JoinHintSuite extends PlanTest with SharedSQLContext { verifyJoinHint( df.hint("broadcast").join(df, "id"), JoinHint( - Some(HintInfo(broadcast = true)), + Some(HintInfo(strategy = Some(BROADCAST))), None) :: Nil ) verifyJoinHint( df.join(df.hint("broadcast"), "id"), JoinHint( None, - Some(HintInfo(broadcast = true))) :: Nil + Some(HintInfo(strategy = Some(BROADCAST)))) :: Nil ) } @@ -59,18 +100,18 @@ class JoinHintSuite extends PlanTest with SharedSQLContext { df1.join(df2.hint("broadcast").join(df3, 'b1 === 'c1).hint("broadcast"), 'a1 === 'c1), JoinHint( None, - Some(HintInfo(broadcast = true))) :: + Some(HintInfo(strategy = Some(BROADCAST)))) :: JoinHint( - Some(HintInfo(broadcast = true)), + Some(HintInfo(strategy = Some(BROADCAST))), None) :: Nil ) verifyJoinHint( df1.hint("broadcast").join(df2, 'a1 === 'b1).hint("broadcast").join(df3, 'a1 === 'c1), JoinHint( - Some(HintInfo(broadcast = true)), + Some(HintInfo(strategy = Some(BROADCAST))), None) :: JoinHint( - Some(HintInfo(broadcast = true)), + Some(HintInfo(strategy = Some(BROADCAST))), None) :: Nil ) } @@ -89,13 +130,13 @@ class JoinHintSuite extends PlanTest with SharedSQLContext { |) b on a.a1 = b.b1 """.stripMargin), JoinHint( - Some(HintInfo(broadcast = true)), - Some(HintInfo(broadcast = true))) :: + Some(HintInfo(strategy = Some(BROADCAST))), + Some(HintInfo(strategy = Some(BROADCAST)))) :: JoinHint( None, - Some(HintInfo(broadcast = true))) :: + Some(HintInfo(strategy = Some(BROADCAST)))) :: JoinHint( - Some(HintInfo(broadcast = true)), + Some(HintInfo(strategy = Some(BROADCAST))), None) :: Nil ) } @@ -112,9 +153,9 @@ class JoinHintSuite extends PlanTest with SharedSQLContext { "where a.a1 = b.b1 and b.b1 = c.c1"), JoinHint( None, - Some(HintInfo(broadcast = true))) :: + Some(HintInfo(strategy = Some(BROADCAST)))) :: JoinHint( - Some(HintInfo(broadcast = true)), + Some(HintInfo(strategy = Some(BROADCAST))), None) :: Nil ) verifyJoinHint( @@ -122,25 +163,25 @@ class JoinHintSuite extends PlanTest with SharedSQLContext { "where a.a1 = b.b1 and b.b1 = c.c1"), JoinHint.NONE :: JoinHint( - Some(HintInfo(broadcast = true)), - Some(HintInfo(broadcast = true))) :: Nil + Some(HintInfo(strategy = Some(BROADCAST))), + Some(HintInfo(strategy = Some(BROADCAST)))) :: Nil ) verifyJoinHint( sql("select /*+ broadcast(b, c)*/ * from a, c, b " + "where a.a1 = b.b1 and b.b1 = c.c1"), JoinHint( None, - Some(HintInfo(broadcast = true))) :: + Some(HintInfo(strategy = Some(BROADCAST)))) :: JoinHint( None, - Some(HintInfo(broadcast = true))) :: Nil + Some(HintInfo(strategy = Some(BROADCAST)))) :: Nil ) verifyJoinHint( df1.join(df2, 'a1 === 'b1 && 'a1 > 5).hint("broadcast") .join(df3, 'b1 === 'c1 && 'a1 < 10), JoinHint( - Some(HintInfo(broadcast = true)), + Some(HintInfo(strategy = Some(BROADCAST))), None) :: JoinHint.NONE :: Nil ) @@ -151,7 +192,7 @@ class JoinHintSuite extends PlanTest with SharedSQLContext { .join(df, 'b1 === 'id), JoinHint.NONE :: JoinHint( - Some(HintInfo(broadcast = true)), + Some(HintInfo(strategy = Some(BROADCAST))), None) :: JoinHint.NONE :: Nil ) @@ -164,7 +205,7 @@ class JoinHintSuite extends PlanTest with SharedSQLContext { verifyJoinHint( df.hint("broadcast").except(dfSub).join(df, "id"), JoinHint( - Some(HintInfo(broadcast = true)), + Some(HintInfo(strategy = Some(BROADCAST))), None) :: JoinHint.NONE :: Nil ) @@ -172,31 +213,112 @@ class JoinHintSuite extends PlanTest with SharedSQLContext { df.join(df.hint("broadcast").intersect(dfSub), "id"), JoinHint( None, - Some(HintInfo(broadcast = true))) :: + Some(HintInfo(strategy = Some(BROADCAST)))) :: JoinHint.NONE :: Nil ) } test("hint merge") { - verifyJoinHint( + verifyJoinHintWithWarnings( df.hint("broadcast").filter('id > 2).hint("broadcast").join(df, "id"), JoinHint( - Some(HintInfo(broadcast = true)), - None) :: Nil + Some(HintInfo(strategy = Some(BROADCAST))), + None) :: Nil, + Nil ) - verifyJoinHint( + verifyJoinHintWithWarnings( df.join(df.hint("broadcast").limit(2).hint("broadcast"), "id"), JoinHint( None, - Some(HintInfo(broadcast = true))) :: Nil + Some(HintInfo(strategy = Some(BROADCAST)))) :: Nil, + Nil + ) + verifyJoinHintWithWarnings( + df.hint("merge").filter('id > 2).hint("shuffle_hash").join(df, "id").hint("broadcast"), + JoinHint( + Some(HintInfo(strategy = Some(SHUFFLE_HASH))), + None) :: Nil, + msgJoinHintOverridden("merge") :: + msgNoJoinForJoinHint("broadcast") :: Nil + ) + verifyJoinHintWithWarnings( + df.join(df.hint("broadcast").limit(2).hint("merge"), "id") + .hint("shuffle_hash") + .hint("shuffle_replicate_nl") + .join(df, "id"), + JoinHint( + Some(HintInfo(strategy = Some(SHUFFLE_REPLICATE_NL))), + None) :: + JoinHint( + None, + Some(HintInfo(strategy = Some(SHUFFLE_MERGE)))) :: Nil, + msgJoinHintOverridden("broadcast") :: + msgJoinHintOverridden("shuffle_hash") :: Nil ) } + test("hint merge - SQL") { + withTempView("a", "b", "c") { + df1.createOrReplaceTempView("a") + df2.createOrReplaceTempView("b") + df3.createOrReplaceTempView("c") + verifyJoinHintWithWarnings( + sql("select /*+ shuffle_hash merge(a, c) broadcast(a, b)*/ * from a, b, c " + + "where a.a1 = b.b1 and b.b1 = c.c1"), + JoinHint( + None, + Some(HintInfo(strategy = Some(SHUFFLE_MERGE)))) :: + JoinHint( + Some(HintInfo(strategy = Some(SHUFFLE_MERGE))), + Some(HintInfo(strategy = Some(BROADCAST)))) :: Nil, + msgNoJoinForJoinHint("shuffle_hash") :: + msgJoinHintOverridden("broadcast") :: Nil + ) + verifyJoinHintWithWarnings( + sql("select /*+ shuffle_hash(a, b) merge(b, d) broadcast(b)*/ * from a, b, c " + + "where a.a1 = b.b1 and b.b1 = c.c1"), + JoinHint.NONE :: + JoinHint( + Some(HintInfo(strategy = Some(SHUFFLE_HASH))), + Some(HintInfo(strategy = Some(SHUFFLE_HASH)))) :: Nil, + msgNoHintRelationFound("d", "merge(b, d)") :: + msgJoinHintOverridden("broadcast") :: + msgJoinHintOverridden("merge") :: Nil + ) + verifyJoinHintWithWarnings( + sql( + """ + |select /*+ broadcast(a, c) merge(a, d)*/ * from a + |join ( + | select /*+ shuffle_hash(c) shuffle_replicate_nl(b, c)*/ * from b + | join c on b.b1 = c.c1 + |) as d + |on a.a2 = d.b2 + """.stripMargin), + JoinHint( + Some(HintInfo(strategy = Some(BROADCAST))), + Some(HintInfo(strategy = Some(SHUFFLE_MERGE)))) :: + JoinHint( + Some(HintInfo(strategy = Some(SHUFFLE_REPLICATE_NL))), + Some(HintInfo(strategy = Some(SHUFFLE_HASH)))) :: Nil, + msgNoHintRelationFound("c", "broadcast(a, c)") :: + msgJoinHintOverridden("merge") :: + msgJoinHintOverridden("shuffle_replicate_nl") :: Nil + ) + } + } + test("nested hint") { verifyJoinHint( df.hint("broadcast").hint("broadcast").filter('id > 2).join(df, "id"), JoinHint( - Some(HintInfo(broadcast = true)), + Some(HintInfo(strategy = Some(BROADCAST))), + None) :: Nil + ) + verifyJoinHint( + df.hint("shuffle_hash").hint("broadcast").hint("merge").filter('id > 2).join(df, "id"), + JoinHint( + Some(HintInfo(strategy = Some(SHUFFLE_MERGE))), None) :: Nil ) } @@ -209,12 +331,230 @@ class JoinHintSuite extends PlanTest with SharedSQLContext { join.join(broadcasted, "id").join(broadcasted, "id"), JoinHint( None, - Some(HintInfo(broadcast = true))) :: + Some(HintInfo(strategy = Some(BROADCAST)))) :: JoinHint( None, - Some(HintInfo(broadcast = true))) :: + Some(HintInfo(strategy = Some(BROADCAST)))) :: JoinHint.NONE :: JoinHint.NONE :: JoinHint.NONE :: Nil ) } } + + def equiJoinQueryWithHint(hints: Seq[String], joinType: String = "INNER"): String = + hints.map("/*+ " + _ + " */").mkString( + "SELECT ", " ", s" * FROM t1 $joinType JOIN t2 ON t1.key = t2.key") + + def nonEquiJoinQueryWithHint(hints: Seq[String], joinType: String = "INNER"): String = + hints.map("/*+ " + _ + " */").mkString( + "SELECT ", " ", s" * FROM t1 $joinType JOIN t2 ON t1.key > t2.key") + + private def assertBroadcastHashJoin(df: DataFrame, buildSide: BuildSide): Unit = { + val executedPlan = df.queryExecution.executedPlan + val broadcastHashJoins = executedPlan.collect { + case b: BroadcastHashJoinExec => b + } + assert(broadcastHashJoins.size == 1) + assert(broadcastHashJoins.head.buildSide == buildSide) + } + + private def assertBroadcastNLJoin(df: DataFrame, buildSide: BuildSide): Unit = { + val executedPlan = df.queryExecution.executedPlan + val broadcastNLJoins = executedPlan.collect { + case b: BroadcastNestedLoopJoinExec => b + } + assert(broadcastNLJoins.size == 1) + assert(broadcastNLJoins.head.buildSide == buildSide) + } + + private def assertShuffleHashJoin(df: DataFrame, buildSide: BuildSide): Unit = { + val executedPlan = df.queryExecution.executedPlan + val shuffleHashJoins = executedPlan.collect { + case s: ShuffledHashJoinExec => s + } + assert(shuffleHashJoins.size == 1) + assert(shuffleHashJoins.head.buildSide == buildSide) + } + + private def assertShuffleMergeJoin(df: DataFrame): Unit = { + val executedPlan = df.queryExecution.executedPlan + val shuffleMergeJoins = executedPlan.collect { + case s: SortMergeJoinExec => s + } + assert(shuffleMergeJoins.size == 1) + } + + private def assertShuffleReplicateNLJoin(df: DataFrame): Unit = { + val executedPlan = df.queryExecution.executedPlan + val shuffleReplicateNLJoins = executedPlan.collect { + case c: CartesianProductExec => c + } + assert(shuffleReplicateNLJoins.size == 1) + } + + test("join strategy hint - broadcast") { + withTempView("t1", "t2") { + Seq((1, "4"), (2, "2")).toDF("key", "value").createTempView("t1") + Seq((1, "1"), (2, "12.3"), (2, "123")).toDF("key", "value").createTempView("t2") + + val t1Size = spark.table("t1").queryExecution.analyzed.children.head.stats.sizeInBytes + val t2Size = spark.table("t2").queryExecution.analyzed.children.head.stats.sizeInBytes + assert(t1Size < t2Size) + + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + // Broadcast hint specified on one side + assertBroadcastHashJoin( + sql(equiJoinQueryWithHint("BROADCAST(t1)" :: Nil)), BuildLeft) + assertBroadcastNLJoin( + sql(nonEquiJoinQueryWithHint("BROADCAST(t2)" :: Nil)), BuildRight) + + // Determine build side based on the join type and child relation sizes + assertBroadcastHashJoin( + sql(equiJoinQueryWithHint("BROADCAST(t1, t2)" :: Nil)), BuildLeft) + assertBroadcastNLJoin( + sql(nonEquiJoinQueryWithHint("BROADCAST(t1, t2)" :: Nil, "left")), BuildRight) + assertBroadcastNLJoin( + sql(nonEquiJoinQueryWithHint("BROADCAST(t1, t2)" :: Nil, "right")), BuildLeft) + + // Use broadcast-hash join if hinted "broadcast" and equi-join + assertBroadcastHashJoin( + sql(equiJoinQueryWithHint("BROADCAST(t2)" :: "SHUFFLE_HASH(t1)" :: Nil)), BuildRight) + assertBroadcastHashJoin( + sql(equiJoinQueryWithHint("BROADCAST(t1)" :: "MERGE(t1, t2)" :: Nil)), BuildLeft) + assertBroadcastHashJoin( + sql(equiJoinQueryWithHint("BROADCAST(t1)" :: "SHUFFLE_REPLICATE_NL(t2)" :: Nil)), + BuildLeft) + + // Use broadcast-nl join if hinted "broadcast" and non-equi-join + assertBroadcastNLJoin( + sql(nonEquiJoinQueryWithHint("SHUFFLE_HASH(t2)" :: "BROADCAST(t1)" :: Nil)), BuildLeft) + assertBroadcastNLJoin( + sql(nonEquiJoinQueryWithHint("MERGE(t1)" :: "BROADCAST(t2)" :: Nil)), BuildRight) + assertBroadcastNLJoin( + sql(nonEquiJoinQueryWithHint("SHUFFLE_REPLICATE_NL(t1)" :: "BROADCAST(t2)" :: Nil)), + BuildRight) + + // Broadcast hint specified but not doable + assertShuffleMergeJoin( + sql(equiJoinQueryWithHint("BROADCAST(t1)" :: Nil, "left"))) + assertShuffleMergeJoin( + sql(equiJoinQueryWithHint("BROADCAST(t2)" :: Nil, "right"))) + } + } + } + + test("join strategy hint - shuffle-merge") { + withTempView("t1", "t2") { + Seq((1, "4"), (2, "2")).toDF("key", "value").createTempView("t1") + Seq((1, "1"), (2, "12.3"), (2, "123")).toDF("key", "value").createTempView("t2") + + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Int.MaxValue.toString) { + // Shuffle-merge hint specified on one side + assertShuffleMergeJoin( + sql(equiJoinQueryWithHint("SHUFFLE_MERGE(t1)" :: Nil))) + assertShuffleMergeJoin( + sql(equiJoinQueryWithHint("MERGEJOIN(t2)" :: Nil))) + + // Shuffle-merge hint specified on both sides + assertShuffleMergeJoin( + sql(equiJoinQueryWithHint("MERGE(t1, t2)" :: Nil))) + + // Shuffle-merge hint prioritized over shuffle-hash hint and shuffle-replicate-nl hint + assertShuffleMergeJoin( + sql(equiJoinQueryWithHint("SHUFFLE_REPLICATE_NL(t2)" :: "MERGE(t1)" :: Nil, "left"))) + assertShuffleMergeJoin( + sql(equiJoinQueryWithHint("MERGE(t2)" :: "SHUFFLE_HASH(t1)" :: Nil, "right"))) + + // Broadcast hint prioritized over shuffle-merge hint, but broadcast hint is not applicable + assertShuffleMergeJoin( + sql(equiJoinQueryWithHint("BROADCAST(t1)" :: "MERGE(t2)" :: Nil, "left"))) + assertShuffleMergeJoin( + sql(equiJoinQueryWithHint("BROADCAST(t2)" :: "MERGE(t1)" :: Nil, "right"))) + + // Shuffle-merge hint specified but not doable + assertBroadcastNLJoin( + sql(nonEquiJoinQueryWithHint("MERGE(t1, t2)" :: Nil, "left")), BuildRight) + } + } + } + + test("join strategy hint - shuffle-hash") { + withTempView("t1", "t2") { + Seq((1, "4"), (2, "2")).toDF("key", "value").createTempView("t1") + Seq((1, "1"), (2, "12.3"), (2, "123")).toDF("key", "value").createTempView("t2") + + val t1Size = spark.table("t1").queryExecution.analyzed.children.head.stats.sizeInBytes + val t2Size = spark.table("t2").queryExecution.analyzed.children.head.stats.sizeInBytes + assert(t1Size < t2Size) + + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Int.MaxValue.toString) { + // Shuffle-hash hint specified on one side + assertShuffleHashJoin( + sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1)" :: Nil)), BuildLeft) + assertShuffleHashJoin( + sql(equiJoinQueryWithHint("SHUFFLE_HASH(t2)" :: Nil)), BuildRight) + + // Determine build side based on the join type and child relation sizes + assertShuffleHashJoin( + sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil)), BuildLeft) + assertShuffleHashJoin( + sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil, "left")), BuildRight) + assertShuffleHashJoin( + sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil, "right")), BuildLeft) + + // Shuffle-hash hint prioritized over shuffle-replicate-nl hint + assertShuffleHashJoin( + sql(equiJoinQueryWithHint("SHUFFLE_REPLICATE_NL(t2)" :: "SHUFFLE_HASH(t1)" :: Nil)), + BuildLeft) + + // Broadcast hint prioritized over shuffle-hash hint, but broadcast hint is not applicable + assertShuffleHashJoin( + sql(equiJoinQueryWithHint("BROADCAST(t1)" :: "SHUFFLE_HASH(t2)" :: Nil, "left")), + BuildRight) + assertShuffleHashJoin( + sql(equiJoinQueryWithHint("BROADCAST(t2)" :: "SHUFFLE_HASH(t1)" :: Nil, "right")), + BuildLeft) + + // Shuffle-hash hint specified but not doable + assertBroadcastHashJoin( + sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1)" :: Nil, "left")), BuildRight) + assertBroadcastNLJoin( + sql(nonEquiJoinQueryWithHint("SHUFFLE_HASH(t1)" :: Nil)), BuildLeft) + } + } + } + + test("join strategy hint - shuffle-replicate-nl") { + withTempView("t1", "t2") { + Seq((1, "4"), (2, "2")).toDF("key", "value").createTempView("t1") + Seq((1, "1"), (2, "12.3"), (2, "123")).toDF("key", "value").createTempView("t2") + + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Int.MaxValue.toString) { + // Shuffle-replicate-nl hint specified on one side + assertShuffleReplicateNLJoin( + sql(equiJoinQueryWithHint("SHUFFLE_REPLICATE_NL(t1)" :: Nil))) + assertShuffleReplicateNLJoin( + sql(equiJoinQueryWithHint("SHUFFLE_REPLICATE_NL(t2)" :: Nil))) + + // Shuffle-replicate-nl hint specified on both sides + assertShuffleReplicateNLJoin( + sql(equiJoinQueryWithHint("SHUFFLE_REPLICATE_NL(t1, t2)" :: Nil))) + + // Shuffle-merge hint prioritized over shuffle-replicate-nl hint, but shuffle-merge hint + // is not applicable + assertShuffleReplicateNLJoin( + sql(nonEquiJoinQueryWithHint("MERGE(t1)" :: "SHUFFLE_REPLICATE_NL(t2)" :: Nil))) + + // Shuffle-hash hint prioritized over shuffle-replicate-nl hint, but shuffle-hash hint is + // not applicable + assertShuffleReplicateNLJoin( + sql(nonEquiJoinQueryWithHint("SHUFFLE_HASH(t2)" :: "SHUFFLE_REPLICATE_NL(t1)" :: Nil))) + + // Shuffle-replicate-nl hint specified but not doable + assertBroadcastHashJoin( + sql(equiJoinQueryWithHint("SHUFFLE_REPLICATE_NL(t1, t2)" :: Nil, "left")), BuildRight) + assertBroadcastNLJoin( + sql(nonEquiJoinQueryWithHint("SHUFFLE_REPLICATE_NL(t1, t2)" :: Nil, "right")), BuildLeft) + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala index f238148e61c39..05c583c80e505 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala @@ -22,6 +22,7 @@ import scala.reflect.ClassTag import org.apache.spark.AccumulatorSuite import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession} import org.apache.spark.sql.catalyst.expressions.{BitwiseAnd, BitwiseOr, Cast, Literal, ShiftLeft} +import org.apache.spark.sql.catalyst.plans.logical.BROADCAST import org.apache.spark.sql.execution.{SparkPlan, WholeStageCodegenExec} import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.exchange.EnsureRequirements @@ -216,10 +217,10 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils { val plan3 = sql(s"SELECT /*+ $name(v) */ * FROM t JOIN u ON t.id = u.id").queryExecution .optimizedPlan - assert(plan1.asInstanceOf[Join].hint.leftHint.get.broadcast) + assert(plan1.asInstanceOf[Join].hint.leftHint.get.strategy.contains(BROADCAST)) assert(plan1.asInstanceOf[Join].hint.rightHint.isEmpty) assert(plan2.asInstanceOf[Join].hint.leftHint.isEmpty) - assert(plan2.asInstanceOf[Join].hint.rightHint.get.broadcast) + assert(plan2.asInstanceOf[Join].hint.rightHint.get.strategy.contains(BROADCAST)) assert(plan3.asInstanceOf[Join].hint.leftHint.isEmpty) assert(plan3.asInstanceOf[Join].hint.rightHint.isEmpty) } From 4ec7f631aa23bc0121656dd77f05767f447112c0 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Thu, 11 Apr 2019 13:43:44 -0500 Subject: [PATCH 005/181] [SPARK-27404][CORE][SQL][STREAMING][YARN] Fix build warnings for 3.0: postfixOps edition ## What changes were proposed in this pull request? Fix build warnings -- see some details below. But mostly, remove use of postfix syntax where it causes warnings without the `scala.language.postfixOps` import. This is mostly in expressions like "120000 milliseconds". Which, I'd like to simplify to things like "2.minutes" anyway. ## How was this patch tested? Existing tests. Closes #24314 from srowen/SPARK-27404. Authored-by: Sean Owen Signed-off-by: Sean Owen --- .../shuffle/RetryingBlockFetcherSuite.java | 8 +-- .../org/apache/spark/BarrierTaskContext.scala | 3 +- .../spark/deploy/FaultToleranceTest.scala | 17 +++--- .../org/apache/spark/rpc/RpcTimeout.scala | 2 +- .../apache/spark/scheduler/DAGScheduler.scala | 3 +- .../scala/org/apache/spark/ui/UIUtils.scala | 19 +++--- .../spark/BarrierStageOnSubmittedSuite.scala | 3 +- .../apache/spark/ContextCleanerSuite.scala | 16 ++--- .../scala/org/apache/spark/DriverSuite.scala | 4 +- .../apache/spark/JobCancellationSuite.scala | 4 +- .../org/apache/spark/SparkConfSuite.scala | 5 +- .../org/apache/spark/StatusTrackerSuite.scala | 29 +++++----- .../spark/deploy/SparkSubmitSuite.scala | 2 +- .../StandaloneDynamicAllocationSuite.scala | 2 +- .../spark/deploy/client/AppClientSuite.scala | 2 +- .../history/FsHistoryProviderSuite.scala | 11 ++-- .../deploy/history/HistoryServerSuite.scala | 20 +------ .../spark/deploy/master/MasterSuite.scala | 11 ++-- .../spark/deploy/worker/WorkerSuite.scala | 6 +- .../apache/spark/executor/ExecutorSuite.scala | 1 - .../spark/launcher/LauncherBackendSuite.scala | 5 +- .../spark/rdd/AsyncRDDActionsSuite.scala | 4 +- .../spark/rdd/LocalCheckpointSuite.scala | 3 +- .../org/apache/spark/rpc/RpcEnvSuite.scala | 55 +++++++++--------- .../CoarseGrainedSchedulerBackendSuite.scala | 2 +- .../spark/scheduler/DAGSchedulerSuite.scala | 2 +- .../OutputCommitCoordinatorSuite.scala | 3 +- .../scheduler/SchedulerIntegrationSuite.scala | 2 +- .../scheduler/TaskResultGetterSuite.scala | 7 +-- .../BlockManagerReplicationSuite.scala | 15 +++-- .../spark/storage/BlockManagerSuite.scala | 22 +++---- .../org/apache/spark/ui/UISeleniumSuite.scala | 54 ++++++++--------- .../scala/org/apache/spark/ui/UISuite.scala | 4 +- .../apache/spark/util/EventLoopSuite.scala | 19 +++--- .../ExternalAppendOnlyMapSuite.scala | 3 +- .../sql/jdbc/DockerJDBCIntegrationSuite.scala | 2 +- .../KafkaDontFailOnDataLossSuite.scala | 2 +- .../spark/sql/kafka010/KafkaTestUtils.scala | 9 ++- .../kafka010/DirectKafkaStreamSuite.scala | 21 ++++--- .../kinesis/KinesisCheckpointerSuite.scala | 11 ++-- .../kinesis/KinesisReceiverSuite.scala | 6 +- .../kinesis/KinesisStreamSuite.scala | 42 ++++++++------ .../org/apache/spark/ml/MLEventsSuite.scala | 41 +++++++------ .../deploy/yarn/BaseYarnClusterSuite.scala | 4 +- .../spark/deploy/yarn/YarnClusterSuite.scala | 8 +-- .../yarn/YarnShuffleServiceSuite.scala | 5 +- .../sql/sources/v2/JavaSimpleBatchTable.java | 54 ----------------- .../sources/v2/JavaSimpleReaderFactory.java | 50 ++++++++++++++++ .../sql/sources/v2/JavaSimpleScanBuilder.java | 47 +++++++++++++++ .../apache/spark/sql/CachedTableSuite.scala | 11 ++-- .../apache/spark/sql/DatasetCacheSuite.scala | 4 +- .../sources/TextSocketStreamSuite.scala | 2 +- .../state/StateStoreCoordinatorSuite.scala | 8 +-- .../streaming/state/StateStoreSuite.scala | 2 +- .../StreamingQueryManagerSuite.scala | 58 +++++++++---------- ...StreamingQueryStatusAndProgressSuite.scala | 3 +- .../hive/thriftserver/UISeleniumSuite.scala | 4 +- .../receiver/ReceivedBlockHandler.scala | 2 +- .../util/FileBasedWriteAheadLog.scala | 5 +- .../spark/streaming/CheckpointSuite.scala | 4 +- .../streaming/ReceivedBlockHandlerSuite.scala | 3 +- .../streaming/ReceivedBlockTrackerSuite.scala | 4 +- .../spark/streaming/ReceiverSuite.scala | 22 +++---- .../streaming/StreamingContextSuite.scala | 52 ++++++++--------- .../streaming/StreamingListenerSuite.scala | 4 +- .../spark/streaming/UISeleniumSuite.scala | 8 +-- .../receiver/BlockGeneratorSuite.scala | 16 ++--- .../ExecutorAllocationManagerSuite.scala | 2 +- .../scheduler/JobGeneratorSuite.scala | 5 +- .../scheduler/ReceiverTrackerSuite.scala | 8 +-- .../streaming/util/WriteAheadLogSuite.scala | 20 +++---- 71 files changed, 458 insertions(+), 459 deletions(-) create mode 100644 sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleReaderFactory.java create mode 100644 sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleScanBuilder.java diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java index a530e16734db4..6f90df5f611a9 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java @@ -46,9 +46,9 @@ */ public class RetryingBlockFetcherSuite { - ManagedBuffer block0 = new NioManagedBuffer(ByteBuffer.wrap(new byte[13])); - ManagedBuffer block1 = new NioManagedBuffer(ByteBuffer.wrap(new byte[7])); - ManagedBuffer block2 = new NioManagedBuffer(ByteBuffer.wrap(new byte[19])); + private final ManagedBuffer block0 = new NioManagedBuffer(ByteBuffer.wrap(new byte[13])); + private final ManagedBuffer block1 = new NioManagedBuffer(ByteBuffer.wrap(new byte[7])); + private final ManagedBuffer block2 = new NioManagedBuffer(ByteBuffer.wrap(new byte[19])); @Test public void testNoFailures() throws IOException, InterruptedException { @@ -291,7 +291,7 @@ private static void performInteractions(List> inte } assertNotNull(stub); - stub.when(fetchStarter).createAndStart(any(), anyObject()); + stub.when(fetchStarter).createAndStart(any(), any()); String[] blockIdArray = blockIds.toArray(new String[blockIds.size()]); new RetryingBlockFetcher(conf, fetchStarter, blockIdArray, listener).start(); } diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala index 2d842b98ead43..a354f44a1be19 100644 --- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala +++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala @@ -20,7 +20,6 @@ package org.apache.spark import java.util.{Properties, Timer, TimerTask} import scala.concurrent.duration._ -import scala.language.postfixOps import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.executor.TaskMetrics @@ -122,7 +121,7 @@ class BarrierTaskContext private[spark] ( barrierEpoch), // Set a fixed timeout for RPC here, so users shall get a SparkException thrown by // BarrierCoordinator on timeout, instead of RPCTimeoutException from the RPC framework. - timeout = new RpcTimeout(31536000 /* = 3600 * 24 * 365 */ seconds, "barrierTimeout")) + timeout = new RpcTimeout(365.days, "barrierTimeout")) barrierEpoch += 1 logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) finished " + "global sync successfully, waited for " + diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala index a66243012041c..99f841234005e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala +++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala @@ -26,7 +26,6 @@ import scala.collection.mutable.ListBuffer import scala.concurrent.{Future, Promise} import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ -import scala.language.postfixOps import scala.sys.process._ import org.json4s._ @@ -112,7 +111,7 @@ private object FaultToleranceTest extends App with Logging { assertValidClusterState() killLeader() - delay(30 seconds) + delay(30.seconds) assertValidClusterState() createClient() assertValidClusterState() @@ -126,12 +125,12 @@ private object FaultToleranceTest extends App with Logging { killLeader() addMasters(1) - delay(30 seconds) + delay(30.seconds) assertValidClusterState() killLeader() addMasters(1) - delay(30 seconds) + delay(30.seconds) assertValidClusterState() } @@ -156,7 +155,7 @@ private object FaultToleranceTest extends App with Logging { killLeader() workers.foreach(_.kill()) workers.clear() - delay(30 seconds) + delay(30.seconds) addWorkers(2) assertValidClusterState() } @@ -174,7 +173,7 @@ private object FaultToleranceTest extends App with Logging { (1 to 3).foreach { _ => killLeader() - delay(30 seconds) + delay(30.seconds) assertValidClusterState() assertTrue(getLeader == masters.head) addMasters(1) @@ -264,7 +263,7 @@ private object FaultToleranceTest extends App with Logging { } // Avoid waiting indefinitely (e.g., we could register but get no executors). - assertTrue(ThreadUtils.awaitResult(f, 120 seconds)) + assertTrue(ThreadUtils.awaitResult(f, 2.minutes)) } /** @@ -317,7 +316,7 @@ private object FaultToleranceTest extends App with Logging { } try { - assertTrue(ThreadUtils.awaitResult(f, 120 seconds)) + assertTrue(ThreadUtils.awaitResult(f, 2.minutes)) } catch { case e: TimeoutException => logError("Master states: " + masters.map(_.state)) @@ -421,7 +420,7 @@ private object SparkDocker { } dockerCmd.run(ProcessLogger(findIpAndLog _)) - val ip = ThreadUtils.awaitResult(ipPromise.future, 30 seconds) + val ip = ThreadUtils.awaitResult(ipPromise.future, 30.seconds) val dockerId = Docker.getLastProcessId (ip, dockerId, outFile) } diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala b/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala index 3dc41f7f12798..770ae2f1dd22f 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala @@ -52,7 +52,7 @@ private[spark] class RpcTimeout(val duration: FiniteDuration, val timeoutProp: S * * @note This can be used in the recover callback of a Future to add to a TimeoutException * Example: - * val timeout = new RpcTimeout(5 millis, "short timeout") + * val timeout = new RpcTimeout(5.milliseconds, "short timeout") * Future(throw new TimeoutException).recover(timeout.addMessageIfTimeout) */ def addMessageIfTimeout[T]: PartialFunction[Throwable, T] = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index d967d38c52631..524b0c4f6c3ac 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -27,7 +27,6 @@ import scala.collection.Map import scala.collection.mutable.{ArrayStack, HashMap, HashSet} import scala.concurrent.duration._ import scala.language.existentials -import scala.language.postfixOps import scala.util.control.NonFatal import org.apache.commons.lang3.SerializationUtils @@ -270,7 +269,7 @@ private[spark] class DAGScheduler( listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates, Some(executorUpdates))) blockManagerMaster.driverEndpoint.askSync[Boolean]( - BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat")) + BlockManagerHeartbeat(blockManagerId), new RpcTimeout(10.minutes, "BlockManagerHeartbeat")) } /** diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index d7bda8b80c24f..11647c0c7c623 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -109,12 +109,12 @@ private[spark] object UIUtils extends Logging { } } // if time is more than a year - return s"$yearString $weekString $dayString" + s"$yearString $weekString $dayString" } catch { case e: Exception => logError("Error converting time to string", e) // if there is some error, return blank string - return "" + "" } } @@ -336,7 +336,7 @@ private[spark] object UIUtils extends Logging { def getHeaderContent(header: String): Seq[Node] = { if (newlinesInHeader) {
    - { header.split("\n").map { case t =>
  • {t}
  • } } + { header.split("\n").map(t =>
  • {t}
  • ) }
} else { Text(header) @@ -446,7 +446,7 @@ private[spark] object UIUtils extends Logging { * the whole string will rendered as a simple escaped text. * * Note: In terms of security, only anchor tags with root relative links are supported. So any - * attempts to embed links outside Spark UI, or other tags like {@code