From 8b1c5a39be72b402f487f378bfc74dbb74ebeae3 Mon Sep 17 00:00:00 2001 From: fenzhu Date: Fri, 10 Jun 2022 17:19:50 +0800 Subject: [PATCH] [CARMEL-6010] Distinguish duplicate execution event log for command query (#963) --- .../sql/catalyst/QueryPlanningTracker.scala | 12 ++++++++++++ .../scala/org/apache/spark/sql/Dataset.scala | 19 +++++++++++++++++-- .../spark/sql/execution/QueryExecution.scala | 3 +++ 3 files changed, 32 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala index 38d4b564b2096..f8d5a7a810709 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala @@ -112,6 +112,12 @@ class QueryPlanningTracker { private val viewMap = new java.util.HashMap[String, Int] + private var realExecutionId: Option[String] = None + + def markExecution(executionId: String): Unit = { + realExecutionId = Some(executionId) + } + /** * Measure the start and end time of a phase. Note that if this function is called multiple * times for the same phase, the recorded start time will be the start time of the first call, @@ -196,6 +202,12 @@ class QueryPlanningTracker { "\n=== view usage start: [" + viewMap.asScala.toSeq.mkString("|") + "] view usage end ===\n" } + def alreadyExecuted: Boolean = realExecutionId.isDefined + + def getRealExecutionInfo: String = { + s"=== Real execution Id:${realExecutionId.getOrElse("unknown")}:Real execution end\n\n" + } + def formattedRulesByTime(): String = { if (rulesMap.isEmpty) { return "=== None Metrics of Analyzer/Optimizer Rules Applied ===" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 5f01e3e53c29c..f623921c2e35d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -228,9 +228,9 @@ class Dataset[T] private[sql]( // to happen right away to let these side effects take place eagerly. val plan = queryExecution.analyzed match { case c: Command => - LocalRelation(c.output, withAction("command", queryExecution)(_.executeCollect())) + LocalRelation(c.output, withCommandMarkedAction(queryExecution)(_.executeCollect())) case u @ Union(children) if children.forall(_.isInstanceOf[Command]) => - LocalRelation(u.output, withAction("command", queryExecution)(_.executeCollect())) + LocalRelation(u.output, withCommandMarkedAction(queryExecution)(_.executeCollect())) case _ => queryExecution.analyzed } @@ -240,6 +240,21 @@ class Dataset[T] private[sql]( plan } + /** + * Mark the command execution history in tracker to distinguish the + * real execution in event log and Spark ui. + */ + private def withCommandMarkedAction[U](qe: QueryExecution)(action: SparkPlan => U) = { + val sideEffects = withAction("command", qe) { plan => + val result = action(plan) + val executionId = qe.sparkSession.sparkContext.getLocalProperty( + SQLExecution.EXECUTION_ID_KEY) + qe.tracker.markExecution(executionId) + result + } + sideEffects + } + /** * Currently [[ExpressionEncoder]] is the only implementation of [[Encoder]], here we turn the * passed in encoder to [[ExpressionEncoder]] explicitly, and mark it implicit so that we can use diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 7a2e7b1189fa1..8369a939f9f0f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -206,6 +206,9 @@ class QueryExecution( try { // TODO: Set spark.sql.ui.explainMode to cost since 3.1.0 if (SQLConf.get.uiPlanWithMetrics) { + if (tracker.alreadyExecuted) { + append(tracker.getRealExecutionInfo) + } append(stringWithStats) if (tracker.hasView) { append(tracker.formattedViewUsage())