-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-17701][SQL] Refactor RowDataSourceScanExec so its sameResult call does not compare strings #18600
Conversation
case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] => | ||
!SparkSession.getActiveSession.get.sessionState.conf.getConf( | ||
SQLConf.PARQUET_VECTORIZED_READER_ENABLED) | ||
case _: HadoopFsRelation => true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HadoopFsRelation
never goes into RowDataSourceScanExec
@@ -395,25 +367,33 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with | |||
.asInstanceOf[Seq[Attribute]] | |||
// Match original case of attributes. | |||
.map(relation.attributeMap) | |||
// Don't request columns that are only referenced by pushed filters. | |||
.filterNot(handledSet.contains) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in this branch, filterSet
is a subset of projectSet
, so it's a no-op.
Test build #79523 has finished for PR 18600 at commit
|
retest this please |
Test build #79527 has finished for PR 18600 at commit
|
extends DataSourceScanExec { | ||
|
||
def output: Seq[Attribute] = requiredColumnsIndex.map(fullOutput) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def output: Seq[Attribute]
-> override def output: Seq[Attribute]
?
scanBuilder(requestedColumns, candidatePredicates, pushedFilters), | ||
relation.relation, UnknownPartitioning(0), metadata, | ||
relation.catalogTable.map(_.identifier)) | ||
relation.relation, relation.catalogTable.map(_.identifier)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit; can we make this into two lines during refactoring?
relation.relation,
relation.catalogTable.map(_.identifier))
scanBuilder(requestedColumns, candidatePredicates, pushedFilters), | ||
relation.relation, UnknownPartitioning(0), metadata, | ||
relation.catalogTable.map(_.identifier)) | ||
relation.relation, relation.catalogTable.map(_.identifier)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
output: Seq[Attribute], | ||
fullOutput: Seq[Attribute], | ||
requiredColumnsIndex: Seq[Int], | ||
filters: Set[Filter], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Start it in this PR?
rdd: RDD[InternalRow], | ||
@transient relation: BaseRelation, | ||
override val outputPartitioning: Partitioning, | ||
override val metadata: Map[String, String], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uh... This is not being used after our previous refactoring.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
metadata
is still needed. It is being used here.
// Combines all Catalyst filter `Expression`s that are either not convertible to data source | ||
// `Filter`s or cannot be handled by `relation`. | ||
val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And) | ||
|
||
// These metadata values make scan plans uniquely identifiable for equality checking. | ||
// TODO(SPARK-17701) using strings for equality checking is brittle | ||
val metadata: Map[String, String] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to keep it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The target of this cleanup PR is to remove the metadata...
LGTM except the above three comments. |
@@ -72,11 +72,6 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ | |||
} | |||
|
|||
/** | |||
* @return Metadata that describes more details of this SparkPlan. | |||
*/ | |||
def metadata: Map[String, String] = Map.empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We introduced metadata
to work around the equality issue of data source scan. Now it's fixed, and we can remove it.
@@ -31,7 +31,6 @@ class SparkPlanInfo( | |||
val nodeName: String, | |||
val simpleString: String, | |||
val children: Seq[SparkPlanInfo], | |||
val metadata: Map[String, String], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a developer api and I don't think users can do anything useful with metadata
because it was just a hack. should be safe to remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that @LantaoJin brings back this for event loging at #22353 .
Test build #79552 has finished for PR 18600 at commit
|
LGTM |
Thanks! Merging to master. |
## What changes were proposed in this pull request? In #18600 we removed the `metadata` field from `SparkPlanInfo`. This causes a problem when we replay event logs that are generated by older Spark versions. ## How was this patch tested? a regression test. Author: Wenchen Fan <wenchen@databricks.com> Closes #19237 from cloud-fan/event.
…tion like file path to event log ## What changes were proposed in this pull request? Field metadata removed from SparkPlanInfo in #18600 . Corresponding, many meta data was also removed from event SparkListenerSQLExecutionStart in Spark event log. If we want to analyze event log to get all input paths, we couldn't get them. Instead, simpleString of SparkPlanInfo JSON only display 100 characters, it won't help. Before 2.3, the fragment of SparkListenerSQLExecutionStart in event log looks like below (It contains the metadata field which has the intact information): >{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart", Location: InMemoryFileIndex[hdfs://cluster1/sys/edw/test1/test2/test3/test4..., "metadata": {"Location": "InMemoryFileIndex[hdfs://cluster1/sys/edw/test1/test2/test3/test4/test5/snapshot/dt=20180904]","ReadSchema":"struct<snpsht_start_dt:date,snpsht_end_dt:date,am_ntlogin_name:string,am_first_name:string,am_last_name:string,isg_name:string,CRE_DATE:date,CRE_USER:string,UPD_DATE:timestamp,UPD_USER:string>"} After #18600, metadata field was removed. >{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart", Location: InMemoryFileIndex[hdfs://cluster1/sys/edw/test1/test2/test3/test4..., So I add this field back to SparkPlanInfo class. Then it will log out the meta data to event log. Intact information in event log is very useful for offline job analysis. ## How was this patch tested? Unit test Closes #22353 from LantaoJin/SPARK-25357. Authored-by: LantaoJin <jinlantao@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 6dc5921) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…tion like file path to event log ## What changes were proposed in this pull request? Field metadata removed from SparkPlanInfo in #18600 . Corresponding, many meta data was also removed from event SparkListenerSQLExecutionStart in Spark event log. If we want to analyze event log to get all input paths, we couldn't get them. Instead, simpleString of SparkPlanInfo JSON only display 100 characters, it won't help. Before 2.3, the fragment of SparkListenerSQLExecutionStart in event log looks like below (It contains the metadata field which has the intact information): >{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart", Location: InMemoryFileIndex[hdfs://cluster1/sys/edw/test1/test2/test3/test4..., "metadata": {"Location": "InMemoryFileIndex[hdfs://cluster1/sys/edw/test1/test2/test3/test4/test5/snapshot/dt=20180904]","ReadSchema":"struct<snpsht_start_dt:date,snpsht_end_dt:date,am_ntlogin_name:string,am_first_name:string,am_last_name:string,isg_name:string,CRE_DATE:date,CRE_USER:string,UPD_DATE:timestamp,UPD_USER:string>"} After #18600, metadata field was removed. >{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart", Location: InMemoryFileIndex[hdfs://cluster1/sys/edw/test1/test2/test3/test4..., So I add this field back to SparkPlanInfo class. Then it will log out the meta data to event log. Intact information in event log is very useful for offline job analysis. ## How was this patch tested? Unit test Closes #22353 from LantaoJin/SPARK-25357. Authored-by: LantaoJin <jinlantao@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 6dc5921) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…tion like file path to event log ## What changes were proposed in this pull request? Field metadata removed from SparkPlanInfo in apache#18600 . Corresponding, many meta data was also removed from event SparkListenerSQLExecutionStart in Spark event log. If we want to analyze event log to get all input paths, we couldn't get them. Instead, simpleString of SparkPlanInfo JSON only display 100 characters, it won't help. Before 2.3, the fragment of SparkListenerSQLExecutionStart in event log looks like below (It contains the metadata field which has the intact information): >{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart", Location: InMemoryFileIndex[hdfs://cluster1/sys/edw/test1/test2/test3/test4..., "metadata": {"Location": "InMemoryFileIndex[hdfs://cluster1/sys/edw/test1/test2/test3/test4/test5/snapshot/dt=20180904]","ReadSchema":"struct<snpsht_start_dt:date,snpsht_end_dt:date,am_ntlogin_name:string,am_first_name:string,am_last_name:string,isg_name:string,CRE_DATE:date,CRE_USER:string,UPD_DATE:timestamp,UPD_USER:string>"} After apache#18600, metadata field was removed. >{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart", Location: InMemoryFileIndex[hdfs://cluster1/sys/edw/test1/test2/test3/test4..., So I add this field back to SparkPlanInfo class. Then it will log out the meta data to event log. Intact information in event log is very useful for offline job analysis. ## How was this patch tested? Unit test Closes apache#22353 from LantaoJin/SPARK-25357. Authored-by: LantaoJin <jinlantao@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
Currently,
RowDataSourceScanExec
andFileSourceScanExec
rely on a "metadata" string map to implement equality comparison, since the RDDs they depend on cannot be directly compared. This has resulted in a number of correctness bugs around exchange reuse, e.g. SPARK-17673 and SPARK-16818.To make these comparisons less brittle, we should refactor these classes to compare constructor parameters directly instead of relying on the metadata map.
This PR refactors
RowDataSourceScanExec
,FileSourceScanExec
will be fixed in the follow-up PR.How was this patch tested?
existing tests