Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17701][SQL] Refactor RowDataSourceScanExec so its sameResult call does not compare strings #18600

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,18 @@ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partition
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils

trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
val relation: BaseRelation
val metastoreTableIdentifier: Option[TableIdentifier]
val tableIdentifier: Option[TableIdentifier]

protected val nodeNamePrefix: String = ""

override val nodeName: String = {
s"Scan $relation ${metastoreTableIdentifier.map(_.unquotedString).getOrElse("")}"
s"Scan $relation ${tableIdentifier.map(_.unquotedString).getOrElse("")}"
}

override def simpleString: String = {
Expand Down Expand Up @@ -73,34 +72,24 @@ trait DataSourceScanExec extends LeafExecNode with CodegenSupport {

/** Physical plan node for scanning data from a relation. */
case class RowDataSourceScanExec(
output: Seq[Attribute],
fullOutput: Seq[Attribute],
requiredColumnsIndex: Seq[Int],
filters: Set[Filter],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Start it in this PR?

rdd: RDD[InternalRow],
@transient relation: BaseRelation,
override val outputPartitioning: Partitioning,
override val metadata: Map[String, String],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uh... This is not being used after our previous refactoring.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metadata is still needed. It is being used here.

override val metastoreTableIdentifier: Option[TableIdentifier])
override val tableIdentifier: Option[TableIdentifier])
extends DataSourceScanExec {

def output: Seq[Attribute] = requiredColumnsIndex.map(fullOutput)
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jul 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def output: Seq[Attribute] -> override def output: Seq[Attribute]?


override lazy val metrics =
Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))

val outputUnsafeRows = relation match {
case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] =>
!SparkSession.getActiveSession.get.sessionState.conf.getConf(
SQLConf.PARQUET_VECTORIZED_READER_ENABLED)
case _: HadoopFsRelation => true
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HadoopFsRelation never goes into RowDataSourceScanExec

case _ => false
}

protected override def doExecute(): RDD[InternalRow] = {
val unsafeRow = if (outputUnsafeRows) {
rdd
} else {
rdd.mapPartitionsWithIndexInternal { (index, iter) =>
val proj = UnsafeProjection.create(schema)
proj.initialize(index)
iter.map(proj)
}
val unsafeRow = rdd.mapPartitionsWithIndexInternal { (index, iter) =>
val proj = UnsafeProjection.create(schema)
proj.initialize(index)
iter.map(proj)
}

val numOutputRows = longMetric("numOutputRows")
Expand All @@ -126,24 +115,22 @@ case class RowDataSourceScanExec(
ctx.INPUT_ROW = row
ctx.currentVars = null
val columnsRowInput = exprRows.map(_.genCode(ctx))
val inputRow = if (outputUnsafeRows) row else null
s"""
|while ($input.hasNext()) {
| InternalRow $row = (InternalRow) $input.next();
| $numOutputRows.add(1);
| ${consume(ctx, columnsRowInput, inputRow).trim}
| ${consume(ctx, columnsRowInput, null).trim}
| if (shouldStop()) return;
|}
""".stripMargin
}

// Only care about `relation` and `metadata` when canonicalizing.
// Don't care about `rdd` and `tableIdentifier` when canonicalizing.
override lazy val canonicalized: SparkPlan =
copy(
output.map(QueryPlan.normalizeExprId(_, output)),
fullOutput.map(QueryPlan.normalizeExprId(_, fullOutput)),
rdd = null,
outputPartitioning = null,
metastoreTableIdentifier = None)
tableIdentifier = None)
}

/**
Expand All @@ -154,15 +141,15 @@ case class RowDataSourceScanExec(
* @param requiredSchema Required schema of the underlying relation, excluding partition columns.
* @param partitionFilters Predicates to use for partition pruning.
* @param dataFilters Filters on non-partition columns.
* @param metastoreTableIdentifier identifier for the table in the metastore.
* @param tableIdentifier identifier for the table in the metastore.
*/
case class FileSourceScanExec(
@transient relation: HadoopFsRelation,
output: Seq[Attribute],
requiredSchema: StructType,
partitionFilters: Seq[Expression],
dataFilters: Seq[Expression],
override val metastoreTableIdentifier: Option[TableIdentifier])
override val tableIdentifier: Option[TableIdentifier])
extends DataSourceScanExec with ColumnarBatchScan {

val supportsBatch: Boolean = relation.fileFormat.supportBatch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.datasources

import java.util.concurrent.Callable

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
Expand Down Expand Up @@ -288,10 +286,10 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with
case l @ LogicalRelation(baseRelation: TableScan, _, _) =>
RowDataSourceScanExec(
l.output,
l.output.indices,
Set.empty,
toCatalystRDD(l, baseRelation.buildScan()),
baseRelation,
UnknownPartitioning(0),
Map.empty,
None) :: Nil

case _ => Nil
Expand Down Expand Up @@ -354,36 +352,10 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with
val (unhandledPredicates, pushedFilters, handledFilters) =
selectFilters(relation.relation, candidatePredicates)

// A set of column attributes that are only referenced by pushed down filters. We can eliminate
// them from requested columns.
val handledSet = {
val handledPredicates = filterPredicates.filterNot(unhandledPredicates.contains)
val unhandledSet = AttributeSet(unhandledPredicates.flatMap(_.references))
AttributeSet(handledPredicates.flatMap(_.references)) --
(projectSet ++ unhandledSet).map(relation.attributeMap)
}

// Combines all Catalyst filter `Expression`s that are either not convertible to data source
// `Filter`s or cannot be handled by `relation`.
val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And)

// These metadata values make scan plans uniquely identifiable for equality checking.
// TODO(SPARK-17701) using strings for equality checking is brittle
val metadata: Map[String, String] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to keep it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The target of this cleanup PR is to remove the metadata...

val pairs = ArrayBuffer.empty[(String, String)]

// Mark filters which are handled by the underlying DataSource with an Astrisk
if (pushedFilters.nonEmpty) {
val markedFilters = for (filter <- pushedFilters) yield {
if (handledFilters.contains(filter)) s"*$filter" else s"$filter"
}
pairs += ("PushedFilters" -> markedFilters.mkString("[", ", ", "]"))
}
pairs += ("ReadSchema" ->
StructType.fromAttributes(projects.map(_.toAttribute)).catalogString)
pairs.toMap
}

if (projects.map(_.toAttribute) == projects &&
projectSet.size == projects.size &&
filterSet.subsetOf(projectSet)) {
Expand All @@ -395,25 +367,33 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with
.asInstanceOf[Seq[Attribute]]
// Match original case of attributes.
.map(relation.attributeMap)
// Don't request columns that are only referenced by pushed filters.
.filterNot(handledSet.contains)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this branch, filterSet is a subset of projectSet, so it's a no-op.


val scan = RowDataSourceScanExec(
projects.map(_.toAttribute),
relation.output,
requestedColumns.map(relation.output.indexOf),
pushedFilters.toSet,
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation, UnknownPartitioning(0), metadata,
relation.catalogTable.map(_.identifier))
relation.relation, relation.catalogTable.map(_.identifier))
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jul 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit; can we make this into two lines during refactoring?

relation.relation,
relation.catalogTable.map(_.identifier))

filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)
} else {
// A set of column attributes that are only referenced by pushed down filters. We can
// eliminate them from requested columns.
val handledSet = {
val handledPredicates = filterPredicates.filterNot(unhandledPredicates.contains)
val unhandledSet = AttributeSet(unhandledPredicates.flatMap(_.references))
AttributeSet(handledPredicates.flatMap(_.references)) --
(projectSet ++ unhandledSet).map(relation.attributeMap)
}
// Don't request columns that are only referenced by pushed filters.
val requestedColumns =
(projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq

val scan = RowDataSourceScanExec(
requestedColumns,
relation.output,
requestedColumns.map(relation.output.indexOf),
pushedFilters.toSet,
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation, UnknownPartitioning(0), metadata,
relation.catalogTable.map(_.identifier))
relation.relation, relation.catalogTable.map(_.identifier))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

execution.ProjectExec(
projects, filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan))
}
Expand Down