Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12476][SQL] Implement JdbcRelation#unhandledFilters for removing unnecessary Spark Filter #10427

Closed
wants to merge 12 commits into from

Conversation

maropu
Copy link
Member

@maropu maropu commented Dec 22, 2015

Input: SELECT * FROM jdbcTable WHERE col0 = 'xxx'

Current plan:

== Optimized Logical Plan ==
Project [col0#0,col1#1]
+- Filter (col0#0 = xxx)
   +- Relation[col0#0,col1#1] JDBCRelation(jdbc:postgresql:postgres,testRel,[Lorg.apache.spark.Partition;@2ac7c683,{user=maropu, password=, driver=org.postgresql.Driver})

== Physical Plan ==
+- Filter (col0#0 = xxx)
   +- Scan JDBCRelation(jdbc:postgresql:postgres,testRel,[Lorg.apache.spark.Partition;@2ac7c683,{user=maropu, password=, driver=org.postgresql.Driver})[col0#0,col1#1] PushedFilters: [EqualTo(col0,xxx)]

This patch enables a plan below;

== Optimized Logical Plan ==
Project [col0#0,col1#1]
+- Filter (col0#0 = xxx)
   +- Relation[col0#0,col1#1] JDBCRelation(jdbc:postgresql:postgres,testRel,[Lorg.apache.spark.Partition;@2ac7c683,{user=maropu, password=, driver=org.postgresql.Driver})

== Physical Plan ==
Scan JDBCRelation(jdbc:postgresql:postgres,testRel,[Lorg.apache.spark.Partition;@2ac7c683,{user=maropu, password=, driver=org.postgresql.Driver})[col0#0,col1#1] PushedFilters: [EqualTo(col0,xxx)]

@maropu maropu changed the title Implement JdbcRelation#unhandledFilters for removing unnecessary Spark Filter [SPARK-12476][SQL] Implement JdbcRelation#unhandledFilters for removing unnecessary Spark Filter Dec 22, 2015
@SparkQA
Copy link

SparkQA commented Dec 22, 2015

Test build #48159 has finished for PR 10427 at commit c129f28.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented Dec 22, 2015

BTW, a role of PrunedFilteredScan conflicts with that of BaseRelation#unhandledFilters, that is, BaseRelation#unhandledFilters does the same thing with PrunedFilteredScan.
I think we should rename PrunedFilteredScan to another name, e.g., DbScan.

@SparkQA
Copy link

SparkQA commented Dec 22, 2015

Test build #48161 has finished for PR 10427 at commit c2c6e20.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val doFilterPushdown = (df: DataFrame) => {
val schema = df.schema
val parentPlan = df.queryExecution.executedPlan
assert(parentPlan.isInstanceOf[PhysicalRDD])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we maybe add some comments or messages to know what this assert mean?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the comment

@HyukjinKwon
Copy link
Member

cc @liancheng @yhuai

def checkPlan(df: DataFrame): DataFrame = {
val parentPlan = df.queryExecution.executedPlan
// Check if SparkPlan Filter is removed in a physical plan and
// the plan only has PhysicalRDD to scan JDBCRelation.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A nit. Please wrap with [[]] Scala classes (SparkPlan, Filterand JDBCRelation) in comments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually you don't need to do so. [[...]] notation is only used in ScalaDoc for adding links. It's not effective in inline comments.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need the wrap for scaladoc though, we also need it just for comments?
There are the comments that have no wrap for spark classes;
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala#L558

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's what I mean. Double brackets are not required for inline comments (i.e., here).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thanks! That was my bad.

@HyukjinKwon
Copy link
Member

@liancheng would you tell me what you think on this? I made some commits locally and want to be sure and make them structurally consistent with this PR becore creating new PRs for them.

@SparkQA
Copy link

SparkQA commented Dec 22, 2015

Test build #48188 has finished for PR 10427 at commit a910a7e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case IsNull(_) | IsNotNull(_) => true
case _ => false
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can make JDBCRDD.compileFilters private[jdbc], and then simplify this one to compileFilter(filter) != null.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, smart idea. Fixed.

@SparkQA
Copy link

SparkQA commented Dec 22, 2015

Test build #48187 has finished for PR 10427 at commit 9ead74a.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@@ -90,6 +90,11 @@ private[sql] case class JDBCRelation(

override val schema: StructType = JDBCRDD.resolveTable(url, table, properties)

// Check if JDBCRDD.compileFilter can accept input filters
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
filters.filterNot(JDBCRDD.compileFilter(_) != null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: It's a little bit weird to use double-negation here when filters.filter(JDBCRDD.compileFilter(_) == null) is perfectly OK.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay and I'll fix it.

@SparkQA
Copy link

SparkQA commented Dec 22, 2015

Test build #48201 has finished for PR 10427 at commit c362c91.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@liancheng
Copy link
Contributor

Overall LGTM. Would like to ask @yhuai to have a look, though.

@HyukjinKwon
Copy link
Member

Let me leave one more comment. I tested some cases with this PR and looks generally working fine. But I would like to mention one thing that I am pretty sure you folks already know though.

Since now Filter is removed, for operations such as count() which does not require any columns, it works different internally with filtering. It looks okay with JDBC but it does not with others such as Parquet.

For example, in case of Parquet, I implemented this locally and tested and found it calculating a wrong result. That produced the plans below:

== Physical Plan ==
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#10L])
+- TungstenExchange SinglePartition, None
   +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#14L])
      +- Project
         +- Scan ParquetRelation[] InputPaths: file:/private/var/folders/9j/gf_c342d7d150mwrxvkqnc180000gn/T/spark-bc91cad7-b7c6-4ef0-985d-abaad921572d/part=1, PushedFilters: [EqualTo(a,2)]

Here, I could find any requiredColumns not given but some filters are applied for some columns with Filter producing wrong results by Parquet.

I tried with the original code and it showed the plan below assigning requiredColumns properly.

== Physical Plan ==
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#10L])
+- TungstenExchange SinglePartition, None
   +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#14L])
      +- Project
         +- Filter (a#8 = 2)
            +- Scan ParquetRelation[a#8] InputPaths: file:/private/var/folders/9j/gf_c342d7d150mwrxvkqnc180000gn/T/spark-ef271ec6-95e1-43ae-9b3e-1d4dae6f69c3/part=1, PushedFilters: [EqualTo(a,2)]

The reason seems that it adds all the columns in projects and filters to requiredColumns but it does not add columns only for pushed filters here. So, it ends up with no columns with Project and without Filter. As result, Parquet tries to filter on non-requested column(s) internally. As you know, the columns in filters should be given to requestedSchema for Parquet to filter rows properly.

For this JDBCRelation, I tested and saw the count() working fine with filtering because it gives 1 when requiredColumns is empty and JDBCRelation can filter rows on non-required columns by constructing a WHERE clause. This produced the plan below:

== Physical Plan ==
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#38L])
+- TungstenExchange SinglePartition, None
   +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#42L])
      +- Project
         +- Scan JDBCRelation(jdbc:h2:mem:testdb0,TEST.PEOPLE,[Lorg.apache.spark.Partition;@3e906375,{user=testUser, password=testPass, url=jdbc:h2:mem:testdb0, dbtable=TEST.PEOPLE})[] PushedFilters: [EqualTo(NAME,fred)]

So please take this into account.

+Actually, I think it should include the columns in pushed filters for requiredColumns not exclude them. I will try to fix after asking if this should be fixed in this way.

@SparkQA
Copy link

SparkQA commented Dec 24, 2015

Test build #48277 has finished for PR 10427 at commit 56d14cf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * class JavaWordBlacklist\n * class JavaDroppedWordsCounter\n

@maropu
Copy link
Member Author

maropu commented Dec 24, 2015

@HyukjinKwon ISTM the root problem is that catalyst cannot fill required columns on-the-fly for data sources. The interface unhandleFilters is little related to the problem.

@HyukjinKwon
Copy link
Member

@maropu I believe it might be about a Parquet stuff. AFAIK, the columns in filters should be set to requestedSchema for Parquet. But this exlcudes the columns for pushed filters which ends up with no columns but filters. As result, this produces a wrong result.

Actually, I am now talking with @yhuai to check if it is really an issue. For me I think we can simply include the columns for pushed filters as well just for safety.

@maropu
Copy link
Member Author

maropu commented Dec 24, 2015

If this patch merged, the comment below is not suitable for JDBCRelation.
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L281

Is it okay to fix it?

@HyukjinKwon
Copy link
Member

And you are right I think the comment I said is not related with this PR. Let's wait for their comments!

@maropu
Copy link
Member Author

maropu commented Dec 24, 2015

@HyukjinKwon Adding a special handing for Parquet is better? I think that required columns for push-down filters should be passed to each data sources, and this is a more natural way.

@HyukjinKwon
Copy link
Member

@maropu I agree that can be another way! but I just think an interface should be inclusive not exclusive. Handing in ParquetRelation might mean other datasources might have to deal with similar problems (including external datasources).

@maropu
Copy link
Member Author

maropu commented Dec 24, 2015

@HyukjinKwon Anyway, I added a test by reflecting this discussion. thanks.

@maropu
Copy link
Member Author

maropu commented Feb 5, 2016

@yhuai ping

@@ -90,6 +90,11 @@ private[sql] case class JDBCRelation(

override val schema: StructType = JDBCRDD.resolveTable(url, table, properties)

// Check if JDBCRDD.compileFilter can accept input filters
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
filters.filter(JDBCRDD.compileFilter(_) == null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems you want to use isEmpty?

@maropu
Copy link
Member Author

maropu commented Feb 5, 2016

@yhuai fixed.

@SparkQA
Copy link

SparkQA commented Feb 5, 2016

Test build #50818 has finished for PR 10427 at commit b4dc961.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -90,6 +90,11 @@ private[sql] case class JDBCRelation(

override val schema: StructType = JDBCRDD.resolveTable(url, table, properties)

// Check if JDBCRDD.compileFilter can accept input filters
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
filters.filter(JDBCRDD.compileFilter(_).isEmpty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the update!

When we were using == null (I guess all filters were marked as unhandled, right?), all tests still passed. So, I am wondering if existing tests are sufficient?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added tests though, ISTM that JDBCRDD.compileFilter does not return None for given Filter because the function can compile all the Filter implemented in sql.sources.filters.

@SparkQA
Copy link

SparkQA commented Feb 7, 2016

Test build #50904 has finished for PR 10427 at commit 1ebacc2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

df
}
sql("SELECT * FROM foobar WHERE THEID < 1").explain(true)
sql("SELECT * FROM foobar WHERE (THEID + 1) < 2").explain(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove these two lines?

@yhuai
Copy link
Contributor

yhuai commented Feb 8, 2016

Thank you for the update! Overall looks good. There are two lines of unnecessary changes. Let's remove them and we can merge it once it passes jenkins.

@SparkQA
Copy link

SparkQA commented Feb 9, 2016

Test build #50968 has finished for PR 10427 at commit 7038bc0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 9, 2016

Test build #50976 has finished for PR 10427 at commit 7a7b9fa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented Feb 10, 2016

@yhuai okay, ready to commit.

@yhuai
Copy link
Contributor

yhuai commented Feb 10, 2016

LGTM. Merging to master.

@asfgit asfgit closed this in 6f710f9 Feb 10, 2016
zzcclp added a commit to zzcclp/spark that referenced this pull request Jul 27, 2016
@maropu maropu deleted the RemoveFilterInJdbcScan branch July 5, 2017 11:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants