Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17701][SQL] Refactor RowDataSourceScanExec so its sameResult call does not compare strings #18600

Closed
wants to merge 2 commits into from

Conversation

cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

Currently, RowDataSourceScanExec and FileSourceScanExec rely on a "metadata" string map to implement equality comparison, since the RDDs they depend on cannot be directly compared. This has resulted in a number of correctness bugs around exchange reuse, e.g. SPARK-17673 and SPARK-16818.

To make these comparisons less brittle, we should refactor these classes to compare constructor parameters directly instead of relying on the metadata map.

This PR refactors RowDataSourceScanExec, FileSourceScanExec will be fixed in the follow-up PR.

How was this patch tested?

existing tests

case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] =>
!SparkSession.getActiveSession.get.sessionState.conf.getConf(
SQLConf.PARQUET_VECTORIZED_READER_ENABLED)
case _: HadoopFsRelation => true
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HadoopFsRelation never goes into RowDataSourceScanExec

@@ -395,25 +367,33 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with
.asInstanceOf[Seq[Attribute]]
// Match original case of attributes.
.map(relation.attributeMap)
// Don't request columns that are only referenced by pushed filters.
.filterNot(handledSet.contains)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this branch, filterSet is a subset of projectSet, so it's a no-op.

@cloud-fan
Copy link
Contributor Author

cc @ericl @gatorsmile

@SparkQA
Copy link

SparkQA commented Jul 11, 2017

Test build #79523 has finished for PR 18600 at commit 5008eb6.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 11, 2017

Test build #79527 has finished for PR 18600 at commit 5008eb6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

extends DataSourceScanExec {

def output: Seq[Attribute] = requiredColumnsIndex.map(fullOutput)
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jul 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def output: Seq[Attribute] -> override def output: Seq[Attribute]?

scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation, UnknownPartitioning(0), metadata,
relation.catalogTable.map(_.identifier))
relation.relation, relation.catalogTable.map(_.identifier))
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jul 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit; can we make this into two lines during refactoring?

relation.relation,
relation.catalogTable.map(_.identifier))

scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation, UnknownPartitioning(0), metadata,
relation.catalogTable.map(_.identifier))
relation.relation, relation.catalogTable.map(_.identifier))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

output: Seq[Attribute],
fullOutput: Seq[Attribute],
requiredColumnsIndex: Seq[Int],
filters: Set[Filter],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Start it in this PR?

rdd: RDD[InternalRow],
@transient relation: BaseRelation,
override val outputPartitioning: Partitioning,
override val metadata: Map[String, String],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uh... This is not being used after our previous refactoring.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metadata is still needed. It is being used here.

// Combines all Catalyst filter `Expression`s that are either not convertible to data source
// `Filter`s or cannot be handled by `relation`.
val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And)

// These metadata values make scan plans uniquely identifiable for equality checking.
// TODO(SPARK-17701) using strings for equality checking is brittle
val metadata: Map[String, String] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to keep it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The target of this cleanup PR is to remove the metadata...

@gatorsmile
Copy link
Member

LGTM except the above three comments.

@@ -72,11 +72,6 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
}

/**
* @return Metadata that describes more details of this SparkPlan.
*/
def metadata: Map[String, String] = Map.empty
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We introduced metadata to work around the equality issue of data source scan. Now it's fixed, and we can remove it.

@@ -31,7 +31,6 @@ class SparkPlanInfo(
val nodeName: String,
val simpleString: String,
val children: Seq[SparkPlanInfo],
val metadata: Map[String, String],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a developer api and I don't think users can do anything useful with metadata because it was just a hack. should be safe to remove it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that @LantaoJin brings back this for event loging at #22353 .

@SparkQA
Copy link

SparkQA commented Jul 12, 2017

Test build #79552 has finished for PR 18600 at commit 2dc4ce1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

LGTM

@gatorsmile
Copy link
Member

Thanks! Merging to master.

@asfgit asfgit closed this in 780586a Jul 12, 2017
asfgit pushed a commit that referenced this pull request Sep 15, 2017
## What changes were proposed in this pull request?

In #18600 we removed the `metadata` field from `SparkPlanInfo`. This causes a problem when we replay event logs that are generated by older Spark versions.

## How was this patch tested?

a regression test.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #19237 from cloud-fan/event.
asfgit pushed a commit that referenced this pull request Sep 13, 2018
…tion like file path to event log

## What changes were proposed in this pull request?

Field metadata removed from SparkPlanInfo in #18600 . Corresponding, many meta data was also removed from event SparkListenerSQLExecutionStart in Spark event log. If we want to analyze event log to get all input paths, we couldn't get them. Instead, simpleString of SparkPlanInfo JSON only display 100 characters, it won't help.

Before 2.3, the fragment of SparkListenerSQLExecutionStart in event log looks like below (It contains the metadata field which has the intact information):
>{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart", Location: InMemoryFileIndex[hdfs://cluster1/sys/edw/test1/test2/test3/test4..., "metadata": {"Location": "InMemoryFileIndex[hdfs://cluster1/sys/edw/test1/test2/test3/test4/test5/snapshot/dt=20180904]","ReadSchema":"struct<snpsht_start_dt:date,snpsht_end_dt:date,am_ntlogin_name:string,am_first_name:string,am_last_name:string,isg_name:string,CRE_DATE:date,CRE_USER:string,UPD_DATE:timestamp,UPD_USER:string>"}

After #18600, metadata field was removed.
>{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart", Location: InMemoryFileIndex[hdfs://cluster1/sys/edw/test1/test2/test3/test4...,

So I add this field back to SparkPlanInfo class. Then it will log out the meta data to event log. Intact information in event log is very useful for offline job analysis.

## How was this patch tested?
Unit test

Closes #22353 from LantaoJin/SPARK-25357.

Authored-by: LantaoJin <jinlantao@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 6dc5921)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
asfgit pushed a commit that referenced this pull request Sep 13, 2018
…tion like file path to event log

## What changes were proposed in this pull request?

Field metadata removed from SparkPlanInfo in #18600 . Corresponding, many meta data was also removed from event SparkListenerSQLExecutionStart in Spark event log. If we want to analyze event log to get all input paths, we couldn't get them. Instead, simpleString of SparkPlanInfo JSON only display 100 characters, it won't help.

Before 2.3, the fragment of SparkListenerSQLExecutionStart in event log looks like below (It contains the metadata field which has the intact information):
>{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart", Location: InMemoryFileIndex[hdfs://cluster1/sys/edw/test1/test2/test3/test4..., "metadata": {"Location": "InMemoryFileIndex[hdfs://cluster1/sys/edw/test1/test2/test3/test4/test5/snapshot/dt=20180904]","ReadSchema":"struct<snpsht_start_dt:date,snpsht_end_dt:date,am_ntlogin_name:string,am_first_name:string,am_last_name:string,isg_name:string,CRE_DATE:date,CRE_USER:string,UPD_DATE:timestamp,UPD_USER:string>"}

After #18600, metadata field was removed.
>{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart", Location: InMemoryFileIndex[hdfs://cluster1/sys/edw/test1/test2/test3/test4...,

So I add this field back to SparkPlanInfo class. Then it will log out the meta data to event log. Intact information in event log is very useful for offline job analysis.

## How was this patch tested?
Unit test

Closes #22353 from LantaoJin/SPARK-25357.

Authored-by: LantaoJin <jinlantao@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 6dc5921)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
fjh100456 pushed a commit to fjh100456/spark that referenced this pull request Sep 13, 2018
…tion like file path to event log

## What changes were proposed in this pull request?

Field metadata removed from SparkPlanInfo in apache#18600 . Corresponding, many meta data was also removed from event SparkListenerSQLExecutionStart in Spark event log. If we want to analyze event log to get all input paths, we couldn't get them. Instead, simpleString of SparkPlanInfo JSON only display 100 characters, it won't help.

Before 2.3, the fragment of SparkListenerSQLExecutionStart in event log looks like below (It contains the metadata field which has the intact information):
>{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart", Location: InMemoryFileIndex[hdfs://cluster1/sys/edw/test1/test2/test3/test4..., "metadata": {"Location": "InMemoryFileIndex[hdfs://cluster1/sys/edw/test1/test2/test3/test4/test5/snapshot/dt=20180904]","ReadSchema":"struct<snpsht_start_dt:date,snpsht_end_dt:date,am_ntlogin_name:string,am_first_name:string,am_last_name:string,isg_name:string,CRE_DATE:date,CRE_USER:string,UPD_DATE:timestamp,UPD_USER:string>"}

After apache#18600, metadata field was removed.
>{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart", Location: InMemoryFileIndex[hdfs://cluster1/sys/edw/test1/test2/test3/test4...,

So I add this field back to SparkPlanInfo class. Then it will log out the meta data to event log. Intact information in event log is very useful for offline job analysis.

## How was this patch tested?
Unit test

Closes apache#22353 from LantaoJin/SPARK-25357.

Authored-by: LantaoJin <jinlantao@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants