From cb7bbc0379e9dd0952a6fcc5bb91f9497a40529d Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 12 Aug 2019 11:47:29 -0700 Subject: [PATCH] [SPARK-28638][WEBUI] Task summary should only contain successful tasks' metrics ## What changes were proposed in this pull request? Currently, on requesting summary metrics, cached data are returned if the current number of "SUCCESS" tasks is the same as the value in cached data. However, the number of "SUCCESS" tasks is wrong when there are running tasks. In `AppStatusStore`, the KVStore is `ElementTrackingStore`, instead of `InMemoryStore`. The value count is always the number of "SUCCESS" tasks + "RUNNING" tasks. Thus, even when the running tasks are finished, the out-of-update cached data is returned. This PR is to fix the code in getting the number of "SUCCESS" tasks. ## How was this patch tested? Test manually, run ``` sc.parallelize(1 to 160, 40).map(i => Thread.sleep(i*100)).collect() ``` and keep refreshing the stage page , we can see the task summary metrics is wrong. ### Before fix: ![image](https://user-images.githubusercontent.com/1097932/62560343-6a141780-b8af-11e9-8942-d88540659a93.png) ### After fix: ![image](https://user-images.githubusercontent.com/1097932/62560355-7009f880-b8af-11e9-8ba8-10c083a48d7b.png) Closes #25369 from gengliangwang/fixStagePage. Authored-by: Gengliang Wang Signed-off-by: Marcelo Vanzin (cherry picked from commit 48d04f74ca895497b9d8bab18c7708f76f55c520) Signed-off-by: Marcelo Vanzin --- .../apache/spark/status/AppStatusStore.scala | 11 +++++-- .../spark/status/AppStatusStoreSuite.scala | 32 ++++++++++++------- 2 files changed, 30 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 84716f8471bd8..6568f5f4a98d5 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -126,6 +126,12 @@ private[spark] class AppStatusStore( store.read(classOf[StageDataWrapper], Array(stageId, stageAttemptId)).locality } + // SPARK-26119: we only want to consider successful tasks when calculating the metrics summary, + // but currently this is very expensive when using a disk store. So we only trigger the slower + // code path when we know we have all data in memory. The following method checks whether all + // the data will be in memory. + private def isInMemoryStore: Boolean = store.isInstanceOf[InMemoryStore] || listener.isDefined + /** * Calculates a summary of the task metrics for the given stage attempt, returning the * requested quantiles for the recorded metrics. @@ -146,7 +152,8 @@ private[spark] class AppStatusStore( // cheaper for disk stores (avoids deserialization). val count = { Utils.tryWithResource( - if (store.isInstanceOf[InMemoryStore]) { + if (isInMemoryStore) { + // For Live UI, we should count the tasks with status "SUCCESS" only. store.view(classOf[TaskDataWrapper]) .parent(stageKey) .index(TaskIndexNames.STATUS) @@ -235,7 +242,7 @@ private[spark] class AppStatusStore( // and failed tasks differently (would be tricky). Also would require changing the disk store // version (to invalidate old stores). def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = { - if (store.isInstanceOf[InMemoryStore]) { + if (isInMemoryStore) { val quantileTasks = store.view(classOf[TaskDataWrapper]) .parent(stageKey) .index(index) diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala index 75a658161d3ff..165fdb71cc78b 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala @@ -17,8 +17,7 @@ package org.apache.spark.status -import org.apache.spark.SparkFunSuite -import org.apache.spark.status.api.v1.TaskMetricDistributions +import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.util.Distribution import org.apache.spark.util.kvstore._ @@ -77,14 +76,23 @@ class AppStatusStoreSuite extends SparkFunSuite { assert(store.count(classOf[CachedQuantile]) === 2) } - test("only successfull task have taskSummary") { + private def createLiveStore(inMemoryStore: InMemoryStore): AppStatusStore = { + val conf = new SparkConf() + val store = new ElementTrackingStore(inMemoryStore, conf) + val listener = new AppStatusListener(store, conf, true, None) + new AppStatusStore(store, listener = Some(listener)) + } + + test("SPARK-28638: only successful tasks have taskSummary when with in memory kvstore") { val store = new InMemoryStore() (0 until 5).foreach { i => store.write(newTaskData(i, status = "FAILED")) } - val appStore = new AppStatusStore(store).taskSummary(stageId, attemptId, uiQuantiles) - assert(appStore.size === 0) + Seq(new AppStatusStore(store), createLiveStore(store)).foreach { appStore => + val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles) + assert(summary.size === 0) + } } - test("summary should contain task metrics of only successfull tasks") { + test("SPARK-28638: summary should contain successful tasks only when with in memory kvstore") { val store = new InMemoryStore() for (i <- 0 to 5) { @@ -95,13 +103,15 @@ class AppStatusStoreSuite extends SparkFunSuite { } } - val summary = new AppStatusStore(store).taskSummary(stageId, attemptId, uiQuantiles).get + Seq(new AppStatusStore(store), createLiveStore(store)).foreach { appStore => + val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles).get - val values = Array(0.0, 2.0, 4.0) + val values = Array(0.0, 2.0, 4.0) - val dist = new Distribution(values, 0, values.length).getQuantiles(uiQuantiles.sorted) - dist.zip(summary.executorRunTime).foreach { case (expected, actual) => - assert(expected === actual) + val dist = new Distribution(values, 0, values.length).getQuantiles(uiQuantiles.sorted) + dist.zip(summary.executorRunTime).foreach { case (expected, actual) => + assert(expected === actual) + } } }