Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24267][SQL] explicitly keep DataSourceReader in DataSourceV2Relation #21319

Closed
wants to merge 4 commits into from

Conversation

cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented May 14, 2018

What changes were proposed in this pull request?

To workaround the issue that Spark only have statistics in logical plan, we need to create the DataSourceReader at logical phase so that we can report statistics. Currently this created DataSourceReader is kept as a lazy val in DataSourceV2Relation. If we think about how lazy val is implemented in Scala, we actually keep a DataSourceReader instance in DataSourceV2Relation, and exclude it when defining equality of DataSourceV2Relation.

This works, but have 2 problems:

  1. after the pushdown rule, if DataSourceV2Relation get transformed and return a new copy, we will re-do the pushdown and re-create the DataSourceReader. This is not a problem now but we may encounter such cases in the future.
  2. It's more intutive to put operator pushdown logic in the rule. When adding pushdown support for streaming relations, It's easier to share the pushdown code for batch and streaming data sources, if the pushdown logic is in the rule. Otherwise we may need to duplicate the pushdown logic in StreamingDataSourceV2Relation.

This PR proposes to implement the lazy val by ourselves: keep DataSourceReader as an optional parameter in the constructor of DataSourceV2Relation, exclude it in the equality definition but include it when copying.

Moving statistics to physical plans would be best, but it's a huge topic and won't be covered in this PR.

How was this patch tested?

existing tests

case _ =>
throw new AnalysisException(s"Data source is not readable: $name")
}
private def asReadSupport: ReadSupport = source match {
Copy link
Contributor Author

@cloud-fan cloud-fan May 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since I'm touching the code around here, I removed the out-most { } and fixed the indentation here. I can revert it if people think this is not worth.

@cloud-fan
Copy link
Contributor Author

@SparkQA
Copy link

SparkQA commented May 14, 2018

Test build #90573 has finished for PR 21319 at commit ca6ccb2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// the data source cannot guarantee the rows returned can pass these filters.
// As a result we must return it so Spark can plan an extra filter operator.
val postScanFilters =
r.pushFilters(translatedFilterToExpr.keys.toArray).map(translatedFilterToExpr)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent

case _ =>
(Nil, Nil)
}
logInfo(s"Post-Scan Filters: ${postScanFilters.mkString(",")}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add logDebug in new code?

val projectAttrs = project.map(_.toAttribute)
val projectSet = AttributeSet(project.flatMap(_.references))
val filterSet = AttributeSet(filters.flatMap(_.references))
val postScanFilters: Seq[Expression] = newReader match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create a single function (E.g. pushFilters) for this? The whole function is quite long.

val newRelation = relation.copy(
projection = projection.asInstanceOf[Seq[AttributeReference]],
filters = Some(filters))
newReader match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move to add new function too

val postScanFilters: Seq[Expression] = newReader match {
case r: SupportsPushDownCatalystFilters =>
val postScanFilters = r.pushCatalystFilters(filters.toArray)
newRelation.copy(pushedFilters = r.pushedCatalystFilters())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newRelation = newRelation.copy(pushedFilters = pushedFilters)

Maybe make pushedFilters as return result and copy after match{ }..


// postScanFilters: filters that need to be evaluated after the scan.
// pushedFilters: filters that will be pushed down and evaluated in the underlying data sources.
// Note: postScanFilters and pushedFilters can overlap, e.g. the parquet row group filter.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: There can be overlap between postScanFilters and pushedFilters

Copy link
Member

@gengliangwang gengliangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@SparkQA
Copy link

SparkQA commented May 15, 2018

Test build #90644 has finished for PR 21319 at commit 65d5664.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rdblue
Copy link
Contributor

rdblue commented May 21, 2018

@cloud-fan, is this necessary if we make the stats changes necessary to move push-down to when the logical plan is converted to the physical plan? I don't think it will be because we don't create a reader at all. If that's the case, I'd much rather see focus on moving stats to the physical plan instead so we don't have to hack around it.

@cloud-fan
Copy link
Contributor Author

Yea this is still a workaround before having stats in the physical plan. My major concern is, moving stats to physical plan is a huge change and I may not have enough free time to drive it. It will be great if @wzhfy can drive it, but I'm not confident whether we can finish it before Spark 2.4.

I think we should not be blocked by this, and I hope we can have a good data source v2 operator pushdown story in Spark 2.4. This is my major motivation for these refactoring/cleanups.

@SparkQA
Copy link

SparkQA commented May 22, 2018

Test build #90933 has finished for PR 21319 at commit 3c5d175.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented May 22, 2018

Test build #90959 has finished for PR 21319 at commit 3c5d175.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rdblue
Copy link
Contributor

rdblue commented May 23, 2018

@cloud-fan, what about adding support for v2 pushdown in the stats visitor instead?

Here's the idea: when the visitor hits a Filter or a Project, it tries to match the plan using PhysicalOperation. If that works and the underlying relation is a DataSourceV2Relation, it does the pushdown to configure a reader and return stats from it.

That would give us the ability to do pushdown on conversion to physical plan, but we'd get correct stats. The only drawback is that we would temporarily make the stats code a bit larger, but at least it is separate so we can get the logical plans right and fix stats on a separate schedule.

@cloud-fan
Copy link
Contributor Author

The problem is that plan visitor can only visit plan but not changing it, and pushing down operators to data source needs to remove filters from plan...

@rdblue
Copy link
Contributor

rdblue commented May 26, 2018

I'll open a PR to demonstrate what I'm proposing. It wouldn't change the plan, it would use a reader and push-down to report stats correctly as a temporary fix.

I'm -1 on this PR. I'll have to take a closer look if needed, but in the first pass I see that it breaks equality by relying on a user-supplied reader instance. The problems this addresses are still working around the fact that the reader is carried in the logical plan, so I don't think we should make these changes.

@cloud-fan
Copy link
Contributor Author

but in the first pass I see that it breaks equality by relying on a user-supplied reader instance.

Can you say more about this? I explicitly mentioned it in the PR description that

keep DataSourceReader as an optional parameter in the constructor of DataSourceV2Relation, exclude it in the equality definition but include it when copying.

@cloud-fan
Copy link
Contributor Author

Anyway I think moving statistics to physical plan is the ultimate solution, all others are workarounds, we should pick the simplest workaround. I'm glad to take your stats visitor approach if it's simpler.

@rdblue
Copy link
Contributor

rdblue commented Jun 6, 2018

Here's the commit with my changes to support v2 stats in the visitor, sorry it took so long for me to find the time!
9d3a11e

The stats visitor now matches on PhysicalOperation to get accurate stats for v2, or any other data source that wants to report more accurate stats that aren't available because push-down happens when converting from logical plan to physical plan.

@cloud-fan
Copy link
Contributor Author

Hi @rdblue , I looked into the plan visitor approach, but was struggling with some problems:

  1. how to pass the DataSourceReader to the physical plan? We don't want to apply operator pushdown again when planning, that's why we made DataSourceReader a lazy val in DataSourceV2Relation, to kind of cache the pushdown result and give the DataSourceReader to the physical plan directly.
  2. How to eliminate unneeded operators like pushed filters? We currently just change the logical plan to eliminate these operators. Alternatively we can do that during planning, if we finally move the stats to physical plan. I feel it's hard to make plan visitor to change the plan.

It will be great if you can share some ideas about it. In the meanwhile, can we unblock this cleanup PR? We can have a new PR for the plan visitor approach when it's ready.

@SparkQA
Copy link

SparkQA commented Jun 12, 2018

Test build #91680 has finished for PR 21319 at commit 91fdedc.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jun 12, 2018

Test build #91685 has finished for PR 21319 at commit 91fdedc.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jun 12, 2018

Test build #91722 has finished for PR 21319 at commit 91fdedc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan cloud-fan closed this Jun 15, 2018
asfgit pushed a commit that referenced this pull request Jun 19, 2018
…physical conversion

## What changes were proposed in this pull request?

This is a followup of #21503, to completely move operator pushdown to the planner rule.

The code are mostly from #21319

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21574 from cloud-fan/followup.
jzhuge pushed a commit to jzhuge/spark that referenced this pull request Mar 7, 2019
…physical conversion

This is a followup of apache#21503, to completely move operator pushdown to the planner rule.

The code are mostly from apache#21319

existing tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#21574 from cloud-fan/followup.

(cherry picked from commit 1737d45)

Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
rdblue pushed a commit to rdblue/spark that referenced this pull request Apr 3, 2019
…physical conversion

This is a followup of apache#21503, to completely move operator pushdown to the planner rule.

The code are mostly from apache#21319

existing tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#21574 from cloud-fan/followup.
jzhuge pushed a commit to jzhuge/spark that referenced this pull request Oct 15, 2019
…physical conversion

This is a followup of apache#21503, to completely move operator pushdown to the planner rule.

The code are mostly from apache#21319

existing tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#21574 from cloud-fan/followup.

(cherry picked from commit 1737d45)

Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
…physical conversion

This is a followup of apache#21503, to completely move operator pushdown to the planner rule.

The code are mostly from apache#21319

existing tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#21574 from cloud-fan/followup.

Ref: LIHADOOP-48531

RB=1853689
G=superfriends-reviewers
R=zolin,fli,yezhou,mshen,latang
A=
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants