Skip to content

Commit

Permalink
[CARMEL-6010] Distinguish duplicate execution event log for command q…
Browse files Browse the repository at this point in the history
…uery (#963)
  • Loading branch information
fenzhu authored and GitHub Enterprise committed Jun 10, 2022
1 parent 5550fdb commit 8b1c5a3
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ class QueryPlanningTracker {

private val viewMap = new java.util.HashMap[String, Int]

private var realExecutionId: Option[String] = None

def markExecution(executionId: String): Unit = {
realExecutionId = Some(executionId)
}

/**
* Measure the start and end time of a phase. Note that if this function is called multiple
* times for the same phase, the recorded start time will be the start time of the first call,
Expand Down Expand Up @@ -196,6 +202,12 @@ class QueryPlanningTracker {
"\n=== view usage start: [" + viewMap.asScala.toSeq.mkString("|") + "] view usage end ===\n"
}

def alreadyExecuted: Boolean = realExecutionId.isDefined

def getRealExecutionInfo: String = {
s"=== Real execution Id:${realExecutionId.getOrElse("unknown")}:Real execution end\n\n"
}

def formattedRulesByTime(): String = {
if (rulesMap.isEmpty) {
return "=== None Metrics of Analyzer/Optimizer Rules Applied ==="
Expand Down
19 changes: 17 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,9 @@ class Dataset[T] private[sql](
// to happen right away to let these side effects take place eagerly.
val plan = queryExecution.analyzed match {
case c: Command =>
LocalRelation(c.output, withAction("command", queryExecution)(_.executeCollect()))
LocalRelation(c.output, withCommandMarkedAction(queryExecution)(_.executeCollect()))
case u @ Union(children) if children.forall(_.isInstanceOf[Command]) =>
LocalRelation(u.output, withAction("command", queryExecution)(_.executeCollect()))
LocalRelation(u.output, withCommandMarkedAction(queryExecution)(_.executeCollect()))
case _ =>
queryExecution.analyzed
}
Expand All @@ -240,6 +240,21 @@ class Dataset[T] private[sql](
plan
}

/**
* Mark the command execution history in tracker to distinguish the
* real execution in event log and Spark ui.
*/
private def withCommandMarkedAction[U](qe: QueryExecution)(action: SparkPlan => U) = {
val sideEffects = withAction("command", qe) { plan =>
val result = action(plan)
val executionId = qe.sparkSession.sparkContext.getLocalProperty(
SQLExecution.EXECUTION_ID_KEY)
qe.tracker.markExecution(executionId)
result
}
sideEffects
}

/**
* Currently [[ExpressionEncoder]] is the only implementation of [[Encoder]], here we turn the
* passed in encoder to [[ExpressionEncoder]] explicitly, and mark it implicit so that we can use
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ class QueryExecution(
try {
// TODO: Set spark.sql.ui.explainMode to cost since 3.1.0
if (SQLConf.get.uiPlanWithMetrics) {
if (tracker.alreadyExecuted) {
append(tracker.getRealExecutionInfo)
}
append(stringWithStats)
if (tracker.hasView) {
append(tracker.formattedViewUsage())
Expand Down

0 comments on commit 8b1c5a3

Please sign in to comment.