Skip to content

Commit

Permalink
[SPARK-25357][SQL] Add metadata to SparkPlanInfo to dump more informa…
Browse files Browse the repository at this point in the history
…tion like file path to event log

## What changes were proposed in this pull request?

Field metadata removed from SparkPlanInfo in #18600 . Corresponding, many meta data was also removed from event SparkListenerSQLExecutionStart in Spark event log. If we want to analyze event log to get all input paths, we couldn't get them. Instead, simpleString of SparkPlanInfo JSON only display 100 characters, it won't help.

Before 2.3, the fragment of SparkListenerSQLExecutionStart in event log looks like below (It contains the metadata field which has the intact information):
>{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart", Location: InMemoryFileIndex[hdfs://cluster1/sys/edw/test1/test2/test3/test4..., "metadata": {"Location": "InMemoryFileIndex[hdfs://cluster1/sys/edw/test1/test2/test3/test4/test5/snapshot/dt=20180904]","ReadSchema":"struct<snpsht_start_dt:date,snpsht_end_dt:date,am_ntlogin_name:string,am_first_name:string,am_last_name:string,isg_name:string,CRE_DATE:date,CRE_USER:string,UPD_DATE:timestamp,UPD_USER:string>"}

After #18600, metadata field was removed.
>{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart", Location: InMemoryFileIndex[hdfs://cluster1/sys/edw/test1/test2/test3/test4...,

So I add this field back to SparkPlanInfo class. Then it will log out the meta data to event log. Intact information in event log is very useful for offline job analysis.

## How was this patch tested?
Unit test

Closes #22353 from LantaoJin/SPARK-25357.

Authored-by: LantaoJin <jinlantao@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 6dc5921)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
LantaoJin authored and cloud-fan committed Sep 13, 2018
1 parent 776dc42 commit 6f4d647
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.execution

import com.fasterxml.jackson.annotation.JsonIgnoreProperties

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.metric.SQLMetricInfo
Expand All @@ -28,11 +26,11 @@ import org.apache.spark.sql.execution.metric.SQLMetricInfo
* Stores information about a SQL SparkPlan.
*/
@DeveloperApi
@JsonIgnoreProperties(Array("metadata")) // The metadata field was removed in Spark 2.3.
class SparkPlanInfo(
val nodeName: String,
val simpleString: String,
val children: Seq[SparkPlanInfo],
val metadata: Map[String, String],
val metrics: Seq[SQLMetricInfo]) {

override def hashCode(): Int = {
Expand All @@ -59,6 +57,12 @@ private[execution] object SparkPlanInfo {
new SQLMetricInfo(metric.name.getOrElse(key), metric.id, metric.metricType)
}

new SparkPlanInfo(plan.nodeName, plan.simpleString, children.map(fromSparkPlan), metrics)
// dump the file scan metadata (e.g file path) to event log
val metadata = plan match {
case fileScan: FileSourceScanExec => fileScan.metadata
case _ => Map[String, String]()
}
new SparkPlanInfo(plan.nodeName, plan.simpleString, children.map(fromSparkPlan),
metadata, metrics)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class SQLJsonProtocolSuite extends SparkFunSuite {
""".stripMargin
val reconstructedEvent = JsonProtocol.sparkEventFromJson(parse(SQLExecutionStartJsonString))
val expectedEvent = SparkListenerSQLExecutionStart(0, "test desc", "test detail", "test plan",
new SparkPlanInfo("TestNode", "test string", Nil, Nil), 0)
new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), 0)
assert(reconstructedEvent == expectedEvent)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,12 @@ class SparkPlanSuite extends QueryTest with SharedSQLContext {
}
}
}

test("SPARK-25357 SparkPlanInfo of FileScan contains nonEmpty metadata") {
withTempPath { path =>
spark.range(5).write.parquet(path.getAbsolutePath)
val f = spark.read.parquet(path.getAbsolutePath)
assert(SparkPlanInfo.fromSparkPlan(f.queryExecution.sparkPlan).metadata.nonEmpty)
}
}
}

0 comments on commit 6f4d647

Please sign in to comment.