-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-26119][CORE][WEBUI]Task summary table should contain only successful tasks' metrics #23088
Conversation
Test build #99016 has finished for PR 23088 at commit
|
Need to handle more scenarios and optimization, I'm closing this for some moment. Will update the PR. |
@@ -238,8 +239,16 @@ private[spark] class AppStatusStore( | |||
val diff = idx - currentIdx |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
diff
could be negative here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @gengliangwang I will update the PR. Actually there are more changes required to fix this. Thanks
As the task section shows all the tasks, I think maybe it would be better to show aggregated metrics for all the tasks. |
@gengliangwang Previous versions of Spark were showing only completed tasks summary metrics. I think it make more sense to show only finished tasks, rather than showing metrics of killed and failed |
When evaluating these things I use git blame (or visual tools here and in an IDE) to figure out if there was a previous change that explicitly tried to change the behavior, and whether it was on purpose, wrong, or incomplete. That would really help understand whether it's just a clear bug or something else. |
@srowen Yes. I did check the commit history, and the PR which modified the behavior didn't mention about this behavior change. If it is intentional, then we should change the title of the table from "summary metrics for completed tasks" to "summary metrics for all tasks". |
(Still need more things to test, so changed to WIP) |
8f4498e
to
96fd281
Compare
Test build #99137 has finished for PR 23088 at commit
|
Test build #99139 has finished for PR 23088 at commit
|
@srowen , could you please review the PR? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about the code changes, but I don't know this part. It seems to be changing a number of things that aren't just related to filtering for "SUCCESS"ful tasks.
When did the behavior change? you said you found the commit.
.zipWithIndex | ||
.filter(x => indices.contains(x._2)) | ||
|
||
if(quantileTasks.size > indices.length) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The formatting needs adjustment around here: space after 'if', less indent on the next line, no space after (, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
.index(index) | ||
.first(0L) | ||
.asScala | ||
.filter(_.status.startsWith("S")) // Filter "SUCCESS" tasks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not check for "SUCCESS"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
} | ||
}.toIndexedSeq | ||
val quantileTasks = store.view(classOf[TaskDataWrapper]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This used to get an iterator. I wonder if it's safe to materialize the whole collection? it seems to take care to use the iterator to skip over things for example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. That was the original intend. unfortunately after converting to the scala collection, the skip() functionality is not there. Also the kvstore iterator doesn't have any filter API to filter the "success" tasks.
The PR was for reducing the computational time for loading the stagePage from the diskStore ( for history server), by avoiding in memory sorting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get it, but, it seems like there was a particular reason for using an iterator and skipping, and I'm not sure we can just undo it. If this is only to filter for "SUCCESS", that much seems easy enough in the original code with an 'if' statement somewhere, no? similar code above still uses an iterator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. If we do, "if (status == "SUCCESS")" for every iterator value, we can't do the "skip()" function. Becuase, earlier we know the exact index we need to take. ie. we can directly skip to 25th percentile, 50th percentile and so on. Now, we don't know which index has the 25th percentile of the "SUCCESS" value, unless we iterate each.
Otherwise, we have to filter the "SUCCESS" tasks prior, like I have done in the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I think this reintroduces the problem that this code fixed though. What about adding some kind of filter() method to KVStoreView? it could configure an optional Predicate<T>
that is applied to iterator()
. Commons Collections has a FilterIterator
for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vanzin Yes. It seems, loading the stage page take lot more time if we enable disk store.
InMemory store seems okay. (Also, first time loading the Jobs page from the history server too, taking lots of time. Mostly due to eventLog parsing time.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pardon if I'm missing something, but does this avoid the problem? everything is loaded, and then it's filtered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srowen Yes. everything is loaded in "sorted" order based on the index, and then we do filtering. For In memory case, this doesn't cause any issue. but for diskStore extra de serialization overhead is there.
May be one possible solution can be, for diskStore case, bring only first time and sort based on the corresponding indices to compute the quantiles.
If the solution seems complicated, then we can tell the user that, summary metrics display the quantile summary for all the tasks, instead of completed ones.
correct me if I am wrong
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see so there's no real way to do it right without deserializing everything because that's the only way to know for sure what's successful.
Leaving it as-is isn't so bad; if failed tasks are infrequent then the quantiles are still about right.
I can think of fudges like searching ahead from the current index for the next successful task and using that. For the common case that might make the quantiles a little better. Iterate ahead, incrementing idx, and give up when hitting the next index.
Hm @vanzin is that too complex vs taking the hit and deserializing it all? or just punt on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure the fudging would give you much better results. Perhaps a little bit...
As for the correct solution, I don't see a lot of good options.
- loading all successful tasks in memory will be a little slow and memory-intensive for large stages
Using my 100k stage example, deserializing everything, even once, will stlil be a little expensive (deserializing 100k tasks vs. deserializing metricCount * quantileCount
tasks which is 100 tasks and change in the UI case).
-
changing the way the indexing works, so that you can index by specific metrics for successful and failed tasks differently, would be tricky, and also would require changing the disk store version (to invalidate old stores).
-
maybe something else I'm not able to come up with at the moment.
Unless someone has a better idea, maybe just using this code when using an in-memory store is an ok temporary "fix".
quantileTasks.map(task => fn(task._1).toDouble).toIndexedSeq | ||
} else { | ||
indices.map( index => | ||
fn(quantileTasks.filter(_._2 == index).head._1).toDouble).toIndexedSeq |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.filter + .head = .find? it is more efficient. Why not create a Map though if this needs to be accessed by key?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified. Thanks
test("summary should contain task metrics of only successfull tasks") { | ||
val store = new InMemoryStore() | ||
(0 until 5).foreach { i => | ||
if (i % 2 == 1) store.write(failedTaskData(i)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Braces for if-else. I also personally think a for loop is more obvious here: for (i <- 0 until 5) {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
val summary = new AppStatusStore(store).taskSummary(stageId, attemptId, uiQuantiles).get | ||
|
||
val values = (0 to 2).map( i => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2.0 is much clearer than 2.toDouble. But why not just write Array(0.0, 2.0, 4.0)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Thank you @srowen for the review. I will update the PR.
https://issues.apache.org/jira/browse/SPARK-20657 After the PR correspond to the JIRA, this behavior change occurred. Also, In the PR discussion hasn't mentioned whether it is intentional or not. |
Test build #99238 has finished for PR 23088 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK CC @vanzin on the intent of the original change
} | ||
} | ||
}.toIndexedSeq | ||
val quantileTasks = store.view(classOf[TaskDataWrapper]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get it, but, it seems like there was a particular reason for using an iterator and skipping, and I'm not sure we can just undo it. If this is only to filter for "SUCCESS", that much seems easy enough in the original code with an 'if' statement somewhere, no? similar code above still uses an iterator.
Test build #99244 has finished for PR 23088 at commit
|
} | ||
} | ||
}.toIndexedSeq | ||
val quantileTasks = store.view(classOf[TaskDataWrapper]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I think this reintroduces the problem that this code fixed though. What about adding some kind of filter() method to KVStoreView? it could configure an optional Predicate<T>
that is applied to iterator()
. Commons Collections has a FilterIterator
for this.
core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
Outdated
Show resolved
Hide resolved
.asScala | ||
.filter(_.status == "SUCCESS") // Filter "SUCCESS" tasks | ||
.zipWithIndex | ||
.filter(x => indices.contains(x._2)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style for these is .filter { x => ... }
(happens in a bunch of places)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't feel strongly about it but I have written it as above where a new block isn't needed. I don't think it makes any runtime difference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't make a runtime difference. That's why this is a style nit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I have modified
i, i, i, i, i, i, i, i, i, i, | ||
i, i, i, i, i, i, i, i, i, i, | ||
i, i, i, i, stageId, attemptId) | ||
} | ||
|
||
private def failedTaskData(i: Int): TaskDataWrapper = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better to have the status be a parameter to the existing method (with a default value)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. done.
b29a150
to
c81fcf3
Compare
Test build #99287 has finished for PR 23088 at commit
|
Test build #99289 has finished for PR 23088 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the only outstanding question here is whether we can use this code only when there's a memory-backed store.
While it makes some sense I have two concerns: different answers based on disk vs memory store which shouldn't really affect things. But would a user ever have both and see both side by side and be confused? Second is, that seems like it should still entail pushing down all the quantile logic into the KVStore, to be clean, right? and that's a bigger change.
I'd either punt on this part of the change, or design a change that involves pushing this down to the keystore I think?
c81fcf3
to
dc95355
Compare
Test build #99564 has finished for PR 23088 at commit
|
Test build #99566 has finished for PR 23088 at commit
|
Jenkins, retest this please |
Test build #99577 has finished for PR 23088 at commit
|
.index(TaskIndexNames.EXEC_RUN_TIME) | ||
.first(0L) | ||
.closeableIterator() | ||
if (store.isInstanceOf[LevelDB]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this code path need to be different for disk vs memory? this part seemed like it could work efficiently either way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Now, for diskStore case, it finds "total tasks count" and inMemory case only "successful tasks count".
This 'count' is used to find quantileIndices for all the tasks metrics.
val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) } |
For eg: Assume 200 tasks, out of which 100 success and 100 failed.
For diskStore case => indices = [0, 50, 100, 150, 199], count = 200
For InMemory case => indices = [0, 25, 50, 75, 99], count = 100
@@ -221,29 +230,49 @@ private[spark] class AppStatusStore( | |||
// stabilize once the stage finishes. It's also slow, especially with disk stores. | |||
val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) } | |||
|
|||
// TODO Summary metrics needs to display all the successful tasks' metrics (SPARK-26119). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not ideal but it's a reasonable solution. Are you OK with it @vanzin ?
Hi @shahidki31 ,thank you for pointing that out. Will put up a fix for it shortly. |
Thanks @pgandhi999 |
PR : #23205 for displaying the message on empty data in task summary metrics table. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is ok for now, while the bigger fix is not tackled, Just a couple of minor nits.
.index(TaskIndexNames.EXEC_RUN_TIME) | ||
.first(0L) | ||
.closeableIterator() | ||
if (store.isInstanceOf[LevelDB]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you invert the check so you don't need to import LevelDB
? Just to avoid importing more implementation details of the kvstore module into this class...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Thanks
@@ -77,6 +77,34 @@ class AppStatusStoreSuite extends SparkFunSuite { | |||
assert(store.count(classOf[CachedQuantile]) === 2) | |||
} | |||
|
|||
test("only successfull task have taskSummary") { | |||
val store = new InMemoryStore() | |||
(0 until 5).foreach { i => store.write(newTaskData(i, "FAILED")) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: status = "FAILED"
when param has a default value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Also it would be good to open a separate bug to address the fix for SHS / disk store. |
Hi @vanzin , I have opened a JIRA for disk store case https://issues.apache.org/jira/browse/SPARK-26260. I will try to work on the same. |
Test build #99635 has finished for PR 23088 at commit
|
Retest this please |
Test build #99645 has finished for PR 23088 at commit
|
Test build #99653 has finished for PR 23088 at commit
|
retest this please |
Test build #99654 has finished for PR 23088 at commit
|
Merging to master / 2.4. |
…cessful tasks' metrics ## What changes were proposed in this pull request? Task summary table in the stage page currently displays the summary of all the tasks. However, we should display the task summary of only successful tasks, to follow the behavior of previous versions of spark. ## How was this patch tested? Added UT. attached screenshot Before patch:   Closes #23088 from shahidki31/summaryMetrics. Authored-by: Shahid <shahidki31@gmail.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com> (cherry picked from commit 35f9163) Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
…cessful tasks' metrics ## What changes were proposed in this pull request? Task summary table in the stage page currently displays the summary of all the tasks. However, we should display the task summary of only successful tasks, to follow the behavior of previous versions of spark. ## How was this patch tested? Added UT. attached screenshot Before patch:   Closes apache#23088 from shahidki31/summaryMetrics. Authored-by: Shahid <shahidki31@gmail.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
…cessful tasks' metrics ## What changes were proposed in this pull request? Task summary table in the stage page currently displays the summary of all the tasks. However, we should display the task summary of only successful tasks, to follow the behavior of previous versions of spark. ## How was this patch tested? Added UT. attached screenshot Before patch:   Closes apache#23088 from shahidki31/summaryMetrics. Authored-by: Shahid <shahidki31@gmail.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com> (cherry picked from commit 35f9163) Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
…cessful tasks' metrics ## What changes were proposed in this pull request? Task summary table in the stage page currently displays the summary of all the tasks. However, we should display the task summary of only successful tasks, to follow the behavior of previous versions of spark. ## How was this patch tested? Added UT. attached screenshot Before patch:   Closes apache#23088 from shahidki31/summaryMetrics. Authored-by: Shahid <shahidki31@gmail.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com> (cherry picked from commit 35f9163) Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
…ly successful tasks summary …sks metrics for disk store ### What changes were proposed in this pull request? After #23088 task Summary table in the stage page shows successful tasks metrics for lnMemory store. In this PR, it added for disk store also. ### Why are the changes needed? Now both InMemory and disk store will be consistent in showing the task summary table in the UI, if there are non successful tasks ### Does this PR introduce any user-facing change? no ### How was this patch tested? Added UT. Manually verified Test steps: 1. add the config in spark-defaults.conf -> **spark.history.store.path /tmp/store** 2. sbin/start-hitoryserver 3. bin/spark-shell 4. `sc.parallelize(1 to 1000, 2).map(x => throw new Exception("fail")).count`  Closes #26508 from shahidki31/task. Authored-by: shahid <shahidki31@gmail.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
What changes were proposed in this pull request?
Task summary table in the stage page currently displays the summary of all the tasks. However, we should display the task summary of only successful tasks, to follow the behavior of previous versions of spark.
How was this patch tested?
Added UT. attached screenshot

Before patch: