-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24267][SQL] explicitly keep DataSourceReader in DataSourceV2Relation #21319
Conversation
case _ => | ||
throw new AnalysisException(s"Data source is not readable: $name") | ||
} | ||
private def asReadSupport: ReadSupport = source match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since I'm touching the code around here, I removed the out-most { }
and fixed the indentation here. I can revert it if people think this is not worth.
Test build #90573 has finished for PR 21319 at commit
|
// the data source cannot guarantee the rows returned can pass these filters. | ||
// As a result we must return it so Spark can plan an extra filter operator. | ||
val postScanFilters = | ||
r.pushFilters(translatedFilterToExpr.keys.toArray).map(translatedFilterToExpr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indent
case _ => | ||
(Nil, Nil) | ||
} | ||
logInfo(s"Post-Scan Filters: ${postScanFilters.mkString(",")}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add logDebug in new code?
val projectAttrs = project.map(_.toAttribute) | ||
val projectSet = AttributeSet(project.flatMap(_.references)) | ||
val filterSet = AttributeSet(filters.flatMap(_.references)) | ||
val postScanFilters: Seq[Expression] = newReader match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
create a single function (E.g. pushFilters
) for this? The whole function is quite long.
val newRelation = relation.copy( | ||
projection = projection.asInstanceOf[Seq[AttributeReference]], | ||
filters = Some(filters)) | ||
newReader match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move to add new function too
val postScanFilters: Seq[Expression] = newReader match { | ||
case r: SupportsPushDownCatalystFilters => | ||
val postScanFilters = r.pushCatalystFilters(filters.toArray) | ||
newRelation.copy(pushedFilters = r.pushedCatalystFilters()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
newRelation = newRelation.copy(pushedFilters = pushedFilters)
Maybe make pushedFilters
as return result and copy after match{ }..
|
||
// postScanFilters: filters that need to be evaluated after the scan. | ||
// pushedFilters: filters that will be pushed down and evaluated in the underlying data sources. | ||
// Note: postScanFilters and pushedFilters can overlap, e.g. the parquet row group filter. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: There can be overlap between postScanFilters and pushedFilters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Test build #90644 has finished for PR 21319 at commit
|
@cloud-fan, is this necessary if we make the stats changes necessary to move push-down to when the logical plan is converted to the physical plan? I don't think it will be because we don't create a reader at all. If that's the case, I'd much rather see focus on moving stats to the physical plan instead so we don't have to hack around it. |
Yea this is still a workaround before having stats in the physical plan. My major concern is, moving stats to physical plan is a huge change and I may not have enough free time to drive it. It will be great if @wzhfy can drive it, but I'm not confident whether we can finish it before Spark 2.4. I think we should not be blocked by this, and I hope we can have a good data source v2 operator pushdown story in Spark 2.4. This is my major motivation for these refactoring/cleanups. |
Test build #90933 has finished for PR 21319 at commit
|
retest this please |
Test build #90959 has finished for PR 21319 at commit
|
@cloud-fan, what about adding support for v2 pushdown in the stats visitor instead? Here's the idea: when the visitor hits a That would give us the ability to do pushdown on conversion to physical plan, but we'd get correct stats. The only drawback is that we would temporarily make the stats code a bit larger, but at least it is separate so we can get the logical plans right and fix stats on a separate schedule. |
The problem is that plan visitor can only visit plan but not changing it, and pushing down operators to data source needs to remove filters from plan... |
I'll open a PR to demonstrate what I'm proposing. It wouldn't change the plan, it would use a reader and push-down to report stats correctly as a temporary fix. I'm -1 on this PR. I'll have to take a closer look if needed, but in the first pass I see that it breaks equality by relying on a user-supplied reader instance. The problems this addresses are still working around the fact that the reader is carried in the logical plan, so I don't think we should make these changes. |
Can you say more about this? I explicitly mentioned it in the PR description that
|
Anyway I think moving statistics to physical plan is the ultimate solution, all others are workarounds, we should pick the simplest workaround. I'm glad to take your stats visitor approach if it's simpler. |
Here's the commit with my changes to support v2 stats in the visitor, sorry it took so long for me to find the time! The stats visitor now matches on PhysicalOperation to get accurate stats for v2, or any other data source that wants to report more accurate stats that aren't available because push-down happens when converting from logical plan to physical plan. |
Hi @rdblue , I looked into the plan visitor approach, but was struggling with some problems:
It will be great if you can share some ideas about it. In the meanwhile, can we unblock this cleanup PR? We can have a new PR for the plan visitor approach when it's ready. |
Test build #91680 has finished for PR 21319 at commit
|
retest this please |
Test build #91685 has finished for PR 21319 at commit
|
retest this please |
Test build #91722 has finished for PR 21319 at commit
|
…physical conversion ## What changes were proposed in this pull request? This is a followup of #21503, to completely move operator pushdown to the planner rule. The code are mostly from #21319 ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes #21574 from cloud-fan/followup.
…physical conversion This is a followup of apache#21503, to completely move operator pushdown to the planner rule. The code are mostly from apache#21319 existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes apache#21574 from cloud-fan/followup. (cherry picked from commit 1737d45) Conflicts: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
…physical conversion This is a followup of apache#21503, to completely move operator pushdown to the planner rule. The code are mostly from apache#21319 existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes apache#21574 from cloud-fan/followup.
…physical conversion This is a followup of apache#21503, to completely move operator pushdown to the planner rule. The code are mostly from apache#21319 existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes apache#21574 from cloud-fan/followup. (cherry picked from commit 1737d45) Conflicts: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
…physical conversion This is a followup of apache#21503, to completely move operator pushdown to the planner rule. The code are mostly from apache#21319 existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes apache#21574 from cloud-fan/followup. Ref: LIHADOOP-48531 RB=1853689 G=superfriends-reviewers R=zolin,fli,yezhou,mshen,latang A=
What changes were proposed in this pull request?
To workaround the issue that Spark only have statistics in logical plan, we need to create the
DataSourceReader
at logical phase so that we can report statistics. Currently this createdDataSourceReader
is kept as alazy val
inDataSourceV2Relation
. If we think about howlazy val
is implemented in Scala, we actually keep aDataSourceReader
instance inDataSourceV2Relation
, and exclude it when defining equality ofDataSourceV2Relation
.This works, but have 2 problems:
DataSourceV2Relation
get transformed and return a new copy, we will re-do the pushdown and re-create theDataSourceReader
. This is not a problem now but we may encounter such cases in the future.StreamingDataSourceV2Relation
.This PR proposes to implement the
lazy val
by ourselves: keepDataSourceReader
as an optional parameter in the constructor ofDataSourceV2Relation
, exclude it in the equality definition but include it when copying.Moving statistics to physical plans would be best, but it's a huge topic and won't be covered in this PR.
How was this patch tested?
existing tests