Skip to content

Commit

Permalink
[SPARK-8593] [CORE] Sort app attempts by start time.
Browse files Browse the repository at this point in the history
This makes sure attempts are listed in the order they were executed, and that the
app's state matches the state of the most current attempt.

Author: Joshi <rekhajoshm@gmail.com>
Author: Rekha Joshi <rekhajoshm@gmail.com>

Closes #7253 from rekhajoshm/SPARK-8593 and squashes the following commits:

874dd80 [Joshi] History Server: updated order for multiple attempts(logcleaner)
716e0b1 [Joshi] History Server: updated order for multiple attempts(descending start time works everytime)
548c753 [Joshi] History Server: updated order for multiple attempts(descending start time works everytime)
83306a8 [Joshi] History Server: updated order for multiple attempts(descending start time)
b0fc922 [Joshi] History Server: updated order for multiple attempts(updated comment)
cc0fda7 [Joshi] History Server: updated order for multiple attempts(updated test)
304cb0b [Joshi] History Server: updated order for multiple attempts(reverted HistoryPage)
85024e8 [Joshi] History Server: updated order for multiple attempts
a41ac4b [Joshi] History Server: updated order for multiple attempts
ab65fa1 [Joshi] History Server: some attempt completed to work with showIncomplete
0be142d [Rekha Joshi] Merge pull request #3 from apache/master
106fd8e [Rekha Joshi] Merge pull request #2 from apache/master
e3677c9 [Rekha Joshi] Merge pull request #1 from apache/master
  • Loading branch information
rekhajoshm authored and srowen committed Jul 17, 2015
1 parent 8b8be1f commit 42d8a01
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

/**
* Comparison function that defines the sort order for application attempts within the same
* application. Order is: running attempts before complete attempts, running attempts sorted
* by start time, completed attempts sorted by end time.
* application. Order is: attempts are sorted by descending start time.
* Most recent attempt state matches with current state of the app.
*
* Normally applications should have a single running attempt; but failure to call sc.stop()
* may cause multiple running attempts to show up.
Expand All @@ -418,11 +418,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
private def compareAttemptInfo(
a1: FsApplicationAttemptInfo,
a2: FsApplicationAttemptInfo): Boolean = {
if (a1.completed == a2.completed) {
if (a1.completed) a1.endTime >= a2.endTime else a1.startTime >= a2.startTime
} else {
!a1.completed
}
a1.startTime >= a2.startTime
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
appListAfterRename.size should be (1)
}

test("apps with multiple attempts") {
test("apps with multiple attempts with order") {
val provider = new FsHistoryProvider(createTestConf())

val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = false)
val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = true)
writeFile(attempt1, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")),
SparkListenerApplicationEnd(2L)
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1"))
)

updateAndCheck(provider) { list =>
Expand All @@ -259,7 +258,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc

val attempt2 = newLogFile("app1", Some("attempt2"), inProgress = true)
writeFile(attempt2, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2"))
SparkListenerApplicationStart("app1", Some("app1"), 2L, "test", Some("attempt2"))
)

updateAndCheck(provider) { list =>
Expand All @@ -268,30 +267,29 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
list.head.attempts.head.attemptId should be (Some("attempt2"))
}

val completedAttempt2 = newLogFile("app1", Some("attempt2"), inProgress = false)
attempt2.delete()
writeFile(attempt2, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2")),
val attempt3 = newLogFile("app1", Some("attempt3"), inProgress = false)
writeFile(attempt3, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt3")),
SparkListenerApplicationEnd(4L)
)

updateAndCheck(provider) { list =>
list should not be (null)
list.size should be (1)
list.head.attempts.size should be (2)
list.head.attempts.head.attemptId should be (Some("attempt2"))
list.head.attempts.size should be (3)
list.head.attempts.head.attemptId should be (Some("attempt3"))
}

val app2Attempt1 = newLogFile("app2", Some("attempt1"), inProgress = false)
writeFile(attempt2, true, None,
writeFile(attempt1, true, None,
SparkListenerApplicationStart("app2", Some("app2"), 5L, "test", Some("attempt1")),
SparkListenerApplicationEnd(6L)
)

updateAndCheck(provider) { list =>
list.size should be (2)
list.head.attempts.size should be (1)
list.last.attempts.size should be (2)
list.last.attempts.size should be (3)
list.head.attempts.head.attemptId should be (Some("attempt1"))

list.foreach { case app =>
Expand Down

0 comments on commit 42d8a01

Please sign in to comment.