-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-17701][SQL] Refactor RowDataSourceScanExec so its sameResult call does not compare strings #18600
[SPARK-17701][SQL] Refactor RowDataSourceScanExec so its sameResult call does not compare strings #18600
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,19 +33,18 @@ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partition | |
import org.apache.spark.sql.execution.datasources._ | ||
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource} | ||
import org.apache.spark.sql.execution.metric.SQLMetrics | ||
import org.apache.spark.sql.internal.SQLConf | ||
import org.apache.spark.sql.sources.BaseRelation | ||
import org.apache.spark.sql.sources.{BaseRelation, Filter} | ||
import org.apache.spark.sql.types.StructType | ||
import org.apache.spark.util.Utils | ||
|
||
trait DataSourceScanExec extends LeafExecNode with CodegenSupport { | ||
val relation: BaseRelation | ||
val metastoreTableIdentifier: Option[TableIdentifier] | ||
val tableIdentifier: Option[TableIdentifier] | ||
|
||
protected val nodeNamePrefix: String = "" | ||
|
||
override val nodeName: String = { | ||
s"Scan $relation ${metastoreTableIdentifier.map(_.unquotedString).getOrElse("")}" | ||
s"Scan $relation ${tableIdentifier.map(_.unquotedString).getOrElse("")}" | ||
} | ||
|
||
override def simpleString: String = { | ||
|
@@ -73,34 +72,24 @@ trait DataSourceScanExec extends LeafExecNode with CodegenSupport { | |
|
||
/** Physical plan node for scanning data from a relation. */ | ||
case class RowDataSourceScanExec( | ||
output: Seq[Attribute], | ||
fullOutput: Seq[Attribute], | ||
requiredColumnsIndex: Seq[Int], | ||
filters: Set[Filter], | ||
rdd: RDD[InternalRow], | ||
@transient relation: BaseRelation, | ||
override val outputPartitioning: Partitioning, | ||
override val metadata: Map[String, String], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. uh... This is not being used after our previous refactoring. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
override val metastoreTableIdentifier: Option[TableIdentifier]) | ||
override val tableIdentifier: Option[TableIdentifier]) | ||
extends DataSourceScanExec { | ||
|
||
def output: Seq[Attribute] = requiredColumnsIndex.map(fullOutput) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
override lazy val metrics = | ||
Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) | ||
|
||
val outputUnsafeRows = relation match { | ||
case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] => | ||
!SparkSession.getActiveSession.get.sessionState.conf.getConf( | ||
SQLConf.PARQUET_VECTORIZED_READER_ENABLED) | ||
case _: HadoopFsRelation => true | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
case _ => false | ||
} | ||
|
||
protected override def doExecute(): RDD[InternalRow] = { | ||
val unsafeRow = if (outputUnsafeRows) { | ||
rdd | ||
} else { | ||
rdd.mapPartitionsWithIndexInternal { (index, iter) => | ||
val proj = UnsafeProjection.create(schema) | ||
proj.initialize(index) | ||
iter.map(proj) | ||
} | ||
val unsafeRow = rdd.mapPartitionsWithIndexInternal { (index, iter) => | ||
val proj = UnsafeProjection.create(schema) | ||
proj.initialize(index) | ||
iter.map(proj) | ||
} | ||
|
||
val numOutputRows = longMetric("numOutputRows") | ||
|
@@ -126,24 +115,22 @@ case class RowDataSourceScanExec( | |
ctx.INPUT_ROW = row | ||
ctx.currentVars = null | ||
val columnsRowInput = exprRows.map(_.genCode(ctx)) | ||
val inputRow = if (outputUnsafeRows) row else null | ||
s""" | ||
|while ($input.hasNext()) { | ||
| InternalRow $row = (InternalRow) $input.next(); | ||
| $numOutputRows.add(1); | ||
| ${consume(ctx, columnsRowInput, inputRow).trim} | ||
| ${consume(ctx, columnsRowInput, null).trim} | ||
| if (shouldStop()) return; | ||
|} | ||
""".stripMargin | ||
} | ||
|
||
// Only care about `relation` and `metadata` when canonicalizing. | ||
// Don't care about `rdd` and `tableIdentifier` when canonicalizing. | ||
override lazy val canonicalized: SparkPlan = | ||
copy( | ||
output.map(QueryPlan.normalizeExprId(_, output)), | ||
fullOutput.map(QueryPlan.normalizeExprId(_, fullOutput)), | ||
rdd = null, | ||
outputPartitioning = null, | ||
metastoreTableIdentifier = None) | ||
tableIdentifier = None) | ||
} | ||
|
||
/** | ||
|
@@ -154,15 +141,15 @@ case class RowDataSourceScanExec( | |
* @param requiredSchema Required schema of the underlying relation, excluding partition columns. | ||
* @param partitionFilters Predicates to use for partition pruning. | ||
* @param dataFilters Filters on non-partition columns. | ||
* @param metastoreTableIdentifier identifier for the table in the metastore. | ||
* @param tableIdentifier identifier for the table in the metastore. | ||
*/ | ||
case class FileSourceScanExec( | ||
@transient relation: HadoopFsRelation, | ||
output: Seq[Attribute], | ||
requiredSchema: StructType, | ||
partitionFilters: Seq[Expression], | ||
dataFilters: Seq[Expression], | ||
override val metastoreTableIdentifier: Option[TableIdentifier]) | ||
override val tableIdentifier: Option[TableIdentifier]) | ||
extends DataSourceScanExec with ColumnarBatchScan { | ||
|
||
val supportsBatch: Boolean = relation.fileFormat.supportBatch( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.datasources | |
|
||
import java.util.concurrent.Callable | ||
|
||
import scala.collection.mutable.ArrayBuffer | ||
|
||
import org.apache.spark.internal.Logging | ||
import org.apache.spark.rdd.RDD | ||
import org.apache.spark.sql._ | ||
|
@@ -288,10 +286,10 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with | |
case l @ LogicalRelation(baseRelation: TableScan, _, _) => | ||
RowDataSourceScanExec( | ||
l.output, | ||
l.output.indices, | ||
Set.empty, | ||
toCatalystRDD(l, baseRelation.buildScan()), | ||
baseRelation, | ||
UnknownPartitioning(0), | ||
Map.empty, | ||
None) :: Nil | ||
|
||
case _ => Nil | ||
|
@@ -354,36 +352,10 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with | |
val (unhandledPredicates, pushedFilters, handledFilters) = | ||
selectFilters(relation.relation, candidatePredicates) | ||
|
||
// A set of column attributes that are only referenced by pushed down filters. We can eliminate | ||
// them from requested columns. | ||
val handledSet = { | ||
val handledPredicates = filterPredicates.filterNot(unhandledPredicates.contains) | ||
val unhandledSet = AttributeSet(unhandledPredicates.flatMap(_.references)) | ||
AttributeSet(handledPredicates.flatMap(_.references)) -- | ||
(projectSet ++ unhandledSet).map(relation.attributeMap) | ||
} | ||
|
||
// Combines all Catalyst filter `Expression`s that are either not convertible to data source | ||
// `Filter`s or cannot be handled by `relation`. | ||
val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And) | ||
|
||
// These metadata values make scan plans uniquely identifiable for equality checking. | ||
// TODO(SPARK-17701) using strings for equality checking is brittle | ||
val metadata: Map[String, String] = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to keep it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The target of this cleanup PR is to remove the metadata... |
||
val pairs = ArrayBuffer.empty[(String, String)] | ||
|
||
// Mark filters which are handled by the underlying DataSource with an Astrisk | ||
if (pushedFilters.nonEmpty) { | ||
val markedFilters = for (filter <- pushedFilters) yield { | ||
if (handledFilters.contains(filter)) s"*$filter" else s"$filter" | ||
} | ||
pairs += ("PushedFilters" -> markedFilters.mkString("[", ", ", "]")) | ||
} | ||
pairs += ("ReadSchema" -> | ||
StructType.fromAttributes(projects.map(_.toAttribute)).catalogString) | ||
pairs.toMap | ||
} | ||
|
||
if (projects.map(_.toAttribute) == projects && | ||
projectSet.size == projects.size && | ||
filterSet.subsetOf(projectSet)) { | ||
|
@@ -395,25 +367,33 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with | |
.asInstanceOf[Seq[Attribute]] | ||
// Match original case of attributes. | ||
.map(relation.attributeMap) | ||
// Don't request columns that are only referenced by pushed filters. | ||
.filterNot(handledSet.contains) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in this branch, |
||
|
||
val scan = RowDataSourceScanExec( | ||
projects.map(_.toAttribute), | ||
relation.output, | ||
requestedColumns.map(relation.output.indexOf), | ||
pushedFilters.toSet, | ||
scanBuilder(requestedColumns, candidatePredicates, pushedFilters), | ||
relation.relation, UnknownPartitioning(0), metadata, | ||
relation.catalogTable.map(_.identifier)) | ||
relation.relation, relation.catalogTable.map(_.identifier)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit; can we make this into two lines during refactoring?
|
||
filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan) | ||
} else { | ||
// A set of column attributes that are only referenced by pushed down filters. We can | ||
// eliminate them from requested columns. | ||
val handledSet = { | ||
val handledPredicates = filterPredicates.filterNot(unhandledPredicates.contains) | ||
val unhandledSet = AttributeSet(unhandledPredicates.flatMap(_.references)) | ||
AttributeSet(handledPredicates.flatMap(_.references)) -- | ||
(projectSet ++ unhandledSet).map(relation.attributeMap) | ||
} | ||
// Don't request columns that are only referenced by pushed filters. | ||
val requestedColumns = | ||
(projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq | ||
|
||
val scan = RowDataSourceScanExec( | ||
requestedColumns, | ||
relation.output, | ||
requestedColumns.map(relation.output.indexOf), | ||
pushedFilters.toSet, | ||
scanBuilder(requestedColumns, candidatePredicates, pushedFilters), | ||
relation.relation, UnknownPartitioning(0), metadata, | ||
relation.catalogTable.map(_.identifier)) | ||
relation.relation, relation.catalogTable.map(_.identifier)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto. |
||
execution.ProjectExec( | ||
projects, filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Start it in this PR?