Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26119][CORE][WEBUI]Task summary table should contain only successful tasks' metrics #23088

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 51 additions & 22 deletions core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,20 @@ private[spark] class AppStatusStore(
// cheaper for disk stores (avoids deserialization).
val count = {
Utils.tryWithResource(
store.view(classOf[TaskDataWrapper])
.parent(stageKey)
.index(TaskIndexNames.EXEC_RUN_TIME)
.first(0L)
.closeableIterator()
if (store.isInstanceOf[InMemoryStore]) {
store.view(classOf[TaskDataWrapper])
.parent(stageKey)
.index(TaskIndexNames.STATUS)
.first("SUCCESS")
.last("SUCCESS")
.closeableIterator()
} else {
store.view(classOf[TaskDataWrapper])
.parent(stageKey)
.index(TaskIndexNames.EXEC_RUN_TIME)
.first(0L)
.closeableIterator()
}
) { it =>
var _count = 0L
while (it.hasNext()) {
Expand Down Expand Up @@ -221,30 +230,50 @@ private[spark] class AppStatusStore(
// stabilize once the stage finishes. It's also slow, especially with disk stores.
val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) }

// TODO: Summary metrics needs to display all the successful tasks' metrics (SPARK-26119).
// For InMemory case, it is efficient to find using the following code. But for diskStore case
// we need an efficient solution to avoid deserialization time overhead. For that, we need to
// rework on the way indexing works, so that we can index by specific metrics for successful
// and failed tasks differently (would be tricky). Also would require changing the disk store
// version (to invalidate old stores).
def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = {
Utils.tryWithResource(
store.view(classOf[TaskDataWrapper])
if (store.isInstanceOf[InMemoryStore]) {
val quantileTasks = store.view(classOf[TaskDataWrapper])
.parent(stageKey)
.index(index)
.first(0L)
.closeableIterator()
) { it =>
var last = Double.NaN
var currentIdx = -1L
indices.map { idx =>
if (idx == currentIdx) {
last
} else {
val diff = idx - currentIdx
currentIdx = idx
if (it.skip(diff - 1)) {
last = fn(it.next()).toDouble
.asScala
.filter { _.status == "SUCCESS"} // Filter "SUCCESS" tasks
.toIndexedSeq

indices.map { index =>
fn(quantileTasks(index.toInt)).toDouble
}.toIndexedSeq
} else {
Utils.tryWithResource(
store.view(classOf[TaskDataWrapper])
.parent(stageKey)
.index(index)
.first(0L)
.closeableIterator()
) { it =>
var last = Double.NaN
var currentIdx = -1L
indices.map { idx =>
if (idx == currentIdx) {
last
} else {
Double.NaN
val diff = idx - currentIdx
currentIdx = idx
if (it.skip(diff - 1)) {
last = fn(it.next()).toDouble
last
} else {
Double.NaN
}
}
}
}.toIndexedSeq
}.toIndexedSeq
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,34 @@ class AppStatusStoreSuite extends SparkFunSuite {
assert(store.count(classOf[CachedQuantile]) === 2)
}

test("only successfull task have taskSummary") {
val store = new InMemoryStore()
(0 until 5).foreach { i => store.write(newTaskData(i, status = "FAILED")) }
val appStore = new AppStatusStore(store).taskSummary(stageId, attemptId, uiQuantiles)
assert(appStore.size === 0)
}

test("summary should contain task metrics of only successfull tasks") {
val store = new InMemoryStore()

for (i <- 0 to 5) {
if (i % 2 == 1) {
store.write(newTaskData(i, status = "FAILED"))
} else {
store.write(newTaskData(i))
}
}

val summary = new AppStatusStore(store).taskSummary(stageId, attemptId, uiQuantiles).get

val values = Array(0.0, 2.0, 4.0)

val dist = new Distribution(values, 0, values.length).getQuantiles(uiQuantiles.sorted)
dist.zip(summary.executorRunTime).foreach { case (expected, actual) =>
assert(expected === actual)
}
}

private def compareQuantiles(count: Int, quantiles: Array[Double]): Unit = {
val store = new InMemoryStore()
val values = (0 until count).map { i =>
Expand All @@ -93,12 +121,11 @@ class AppStatusStoreSuite extends SparkFunSuite {
}
}

private def newTaskData(i: Int): TaskDataWrapper = {
private def newTaskData(i: Int, status: String = "SUCCESS"): TaskDataWrapper = {
new TaskDataWrapper(
i, i, i, i, i, i, i.toString, i.toString, i.toString, i.toString, false, Nil, None,
i, i, i, i, i, i, i.toString, i.toString, status, i.toString, false, Nil, None,
i, i, i, i, i, i, i, i, i, i,
i, i, i, i, i, i, i, i, i, i,
i, i, i, i, stageId, attemptId)
}

}