Skip to content

Commit

Permalink
[SPARK-24478][SQL] Move projection and filter push down to physical c…
Browse files Browse the repository at this point in the history
…onversion

This removes the v2 optimizer rule for push-down and instead pushes filters and required columns when converting to a physical plan, as suggested by marmbrus. This makes the v2 relation cleaner because the output and filters do not change in the logical plan.

A side-effect of this change is that the stats from the logical (optimized) plan no longer reflect pushed filters and projection. This is a temporary state, until the planner gathers stats from the physical plan instead. An alternative to this approach is 9d3a11e.

The first commit was proposed in apache#21262. This PR replaces apache#21262.

Existing tests.

Author: Ryan Blue <blue@apache.org>

Closes apache#21503 from rdblue/SPARK-24478-move-push-down-to-physical-conversion.
  • Loading branch information
rdblue committed Aug 29, 2018
1 parent 0ba0844 commit ba436c0
Show file tree
Hide file tree
Showing 10 changed files with 210 additions and 244 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
/**
* Canonicalized copy of this query plan.
*/
protected lazy val canonicalized: PlanType = this
lazy val canonicalized: PlanType = this

/**
* Returns true when the given query plan will return the same results as this query plan.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
/**
* A mix in interface for {@link DataSourceReader}. Data source readers can implement this
* interface to report statistics to Spark.
*
* Statistics are reported to the optimizer before a projection or any filters are pushed to the
* DataSourceReader. Implementations that return more accurate statistics based on projection and
* filters will not improve query performance until the planner can push operators before getting
* stats.
*/
@InterfaceStability.Evolving
public interface SupportsReportStatistics extends DataSourceReader {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,22 +149,20 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
val cls = DataSource.lookupDataSource(source)
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
val source = cls.newInstance().asInstanceOf[DataSourceV2]
val (pathOption, tableOption) = extraOptions.get("path") match {
val options: Map[String, String] = extraOptions.get("path") match {
case Some(path) if !path.contains("/") =>
// without "/", this cannot be a full path. parse it as a table name
val ident = sparkSession.sessionState.sqlParser.parseTableIdentifier(path)
// ensure the database is set correctly
val db = ident.database.getOrElse(sparkSession.catalog.currentDatabase)
(None, Some(ident.copy(database = Some(db))))
case Some(path) =>
(Some(path), None)
(extraOptions ++ Map(
"database" -> ident.database.getOrElse(sparkSession.catalog.currentDatabase),
"table" -> ident.table)).toMap
case _ =>
(None, None)
extraOptions.toMap
}

Dataset.ofRows(sparkSession, DataSourceV2Relation(
source, extraOptions.toMap, pathOption, tableOption,
userSpecifiedSchema = userSpecifiedSchema))
Dataset.ofRows(sparkSession, DataSourceV2Relation.create(
source, options, userSpecifiedSchema = userSpecifiedSchema))

} else {
// Code path for data source v1.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,25 +224,24 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
// save variants always match columns by name
extraOptions.put("matchByName", "true")

val (pathOption, tableOption) = extraOptions.get("path") match {
val options: Map[String, String] = extraOptions.get("path") match {
case Some(path) if !path.contains("/") =>
// without "/", this cannot be a full path. parse it as a table name
val ident = df.sparkSession.sessionState.sqlParser.parseTableIdentifier(path)
// ensure the database is set correctly
val db = ident.database.getOrElse(df.sparkSession.catalog.currentDatabase)
(None, Some(ident.copy(database = Some(db))))
case Some(path) =>
(Some(path), None)
(extraOptions ++ Map(
"database" -> ident.database.getOrElse(df.sparkSession.catalog.currentDatabase),
"table" -> ident.table)).toMap
case _ =>
(None, None)
extraOptions.toMap
}

val partitions = normalizedParCols.map(_.map(col => col -> (None: Option[String])).toMap)
val relation = DataSourceV2Relation(dataSource, extraOptions.toMap, pathOption, tableOption)
val relation = DataSourceV2Relation.create(dataSource, options)

val (overwrite, ifNotExists) = mode match {
case SaveMode.Ignore =>
if (relation.writer(df.logicalPlan.schema, mode).isEmpty) {
if (relation.newWriter(df.logicalPlan.schema, mode).isEmpty) {
return
}
(false, false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import org.apache.spark.sql.ExperimentalMethods
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2PushDown
import org.apache.spark.sql.execution.python.ExtractPythonUDFFromAggregate
import org.apache.spark.sql.internal.SQLConf

Expand All @@ -35,6 +34,5 @@ class SparkOptimizer(
Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog, conf)) :+
Batch("Extract Python UDF from Aggregate", Once, ExtractPythonUDFFromAggregate) :+
Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions) :+
Batch("Push down operators to data source scan", Once, DataSourceV2PushDown) :+
Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*)
}
Loading

0 comments on commit ba436c0

Please sign in to comment.