-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-12476][SQL] Implement JdbcRelation#unhandledFilters for removing unnecessary Spark Filter #10427
Conversation
Test build #48159 has finished for PR 10427 at commit
|
BTW, a role of |
Test build #48161 has finished for PR 10427 at commit
|
val doFilterPushdown = (df: DataFrame) => { | ||
val schema = df.schema | ||
val parentPlan = df.queryExecution.executedPlan | ||
assert(parentPlan.isInstanceOf[PhysicalRDD]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we maybe add some comments or messages to know what this assert
mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the comment
cc @liancheng @yhuai |
9ead74a
to
a910a7e
Compare
def checkPlan(df: DataFrame): DataFrame = { | ||
val parentPlan = df.queryExecution.executedPlan | ||
// Check if SparkPlan Filter is removed in a physical plan and | ||
// the plan only has PhysicalRDD to scan JDBCRelation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A nit. Please wrap with [[]] Scala classes (SparkPlan
, Filter
and JDBCRelation
) in comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually you don't need to do so. [[...]]
notation is only used in ScalaDoc for adding links. It's not effective in inline comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need the wrap for scaladoc though, we also need it just for comments?
There are the comments that have no wrap for spark classes;
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala#L558
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's what I mean. Double brackets are not required for inline comments (i.e., here).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Thanks! That was my bad.
@liancheng would you tell me what you think on this? I made some commits locally and want to be sure and make them structurally consistent with this PR becore creating new PRs for them. |
Test build #48188 has finished for PR 10427 at commit
|
case IsNull(_) | IsNotNull(_) => true | ||
case _ => false | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can make JDBCRDD.compileFilters
private[jdbc]
, and then simplify this one to compileFilter(filter) != null
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, smart idea. Fixed.
Test build #48187 has finished for PR 10427 at commit
|
@@ -90,6 +90,11 @@ private[sql] case class JDBCRelation( | |||
|
|||
override val schema: StructType = JDBCRDD.resolveTable(url, table, properties) | |||
|
|||
// Check if JDBCRDD.compileFilter can accept input filters | |||
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = { | |||
filters.filterNot(JDBCRDD.compileFilter(_) != null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: It's a little bit weird to use double-negation here when filters.filter(JDBCRDD.compileFilter(_) == null)
is perfectly OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay and I'll fix it.
Test build #48201 has finished for PR 10427 at commit
|
Overall LGTM. Would like to ask @yhuai to have a look, though. |
Let me leave one more comment. I tested some cases with this PR and looks generally working fine. But I would like to mention one thing that I am pretty sure you folks already know though. Since now For example, in case of Parquet, I implemented this locally and tested and found it calculating a wrong result. That produced the plans below:
Here, I could find any I tried with the original code and it showed the plan below assigning
The reason seems that it adds all the columns in projects and filters to For this
So please take this into account. +Actually, I think it should include the columns in pushed filters for |
c362c91
to
da07ac5
Compare
Test build #48277 has finished for PR 10427 at commit
|
@HyukjinKwon ISTM the root problem is that catalyst cannot fill required columns on-the-fly for data sources. The interface |
@maropu I believe it might be about a Parquet stuff. AFAIK, the columns in filters should be set to Actually, I am now talking with @yhuai to check if it is really an issue. For me I think we can simply include the columns for pushed filters as well just for safety. |
If this patch merged, the comment below is not suitable for Is it okay to fix it? |
And you are right I think the comment I said is not related with this PR. Let's wait for their comments! |
@HyukjinKwon Adding a special handing for Parquet is better? I think that required columns for push-down filters should be passed to each data sources, and this is a more natural way. |
@maropu I agree that can be another way! but I just think an interface should be inclusive not exclusive. Handing in |
@HyukjinKwon Anyway, I added a test by reflecting this discussion. thanks. |
@yhuai ping |
@@ -90,6 +90,11 @@ private[sql] case class JDBCRelation( | |||
|
|||
override val schema: StructType = JDBCRDD.resolveTable(url, table, properties) | |||
|
|||
// Check if JDBCRDD.compileFilter can accept input filters | |||
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = { | |||
filters.filter(JDBCRDD.compileFilter(_) == null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems you want to use isEmpty
?
@yhuai fixed. |
Test build #50818 has finished for PR 10427 at commit
|
@@ -90,6 +90,11 @@ private[sql] case class JDBCRelation( | |||
|
|||
override val schema: StructType = JDBCRDD.resolveTable(url, table, properties) | |||
|
|||
// Check if JDBCRDD.compileFilter can accept input filters | |||
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = { | |||
filters.filter(JDBCRDD.compileFilter(_).isEmpty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the update!
When we were using == null
(I guess all filters were marked as unhandled, right?), all tests still passed. So, I am wondering if existing tests are sufficient?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added tests though, ISTM that JDBCRDD.compileFilter
does not return None
for given Filter
because the function can compile all the Filter
implemented in sql.sources.filters
.
Test build #50904 has finished for PR 10427 at commit
|
df | ||
} | ||
sql("SELECT * FROM foobar WHERE THEID < 1").explain(true) | ||
sql("SELECT * FROM foobar WHERE (THEID + 1) < 2").explain(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove these two lines?
Thank you for the update! Overall looks good. There are two lines of unnecessary changes. Let's remove them and we can merge it once it passes jenkins. |
2b9c766
to
7038bc0
Compare
Test build #50968 has finished for PR 10427 at commit
|
7038bc0
to
7a7b9fa
Compare
Test build #50976 has finished for PR 10427 at commit
|
@yhuai okay, ready to commit. |
LGTM. Merging to master. |
…emoving unnecessary Spark Filter apache#10427
Input: SELECT * FROM jdbcTable WHERE col0 = 'xxx'
Current plan:
This patch enables a plan below;