Skip to content

Commit

Permalink
[SPARK-26186][SPARK-26184][CORE] Last updated time is not getting upd…
Browse files Browse the repository at this point in the history
…ated for the Inprogress application

## What changes were proposed in this pull request?

When the 'spark.history.fs.inProgressOptimization.enabled' is true, inProgress application's last updated time is not getting updated in the History UI. Also, during the cleaning time, InProgress application is getting removed from the listing, even if the last updated time is within the cleaning threshold time.

In this PR, if the fastInprogressOptimization enabled, we update the `lastUpdateTime` of the application as last scan time. This will update the `lastUpdateTime` in the historyUI and also while cleaning, it won't remove if the updateTime is within the cleaning interval

## How was this patch tested?
Added UT, attached screen shot.
Before patch:
![screenshot from 2018-11-27 23-22-38](https://user-images.githubusercontent.com/23054875/49101600-9b5a3380-f29c-11e8-8efc-3fb594e4279a.png)
![screenshot from 2018-11-27 23-20-11](https://user-images.githubusercontent.com/23054875/49101601-9c8b6080-f29c-11e8-928e-643a8c8f4477.png)

After Patch:
![screenshot from 2018-11-27 23-37-10](https://user-images.githubusercontent.com/23054875/49101911-669aac00-f29d-11e8-8181-663e4a08ab0e.png)
![screenshot from 2018-11-27 23-39-04](https://user-images.githubusercontent.com/23054875/49102010-a5306680-f29d-11e8-947a-e8a2a09a785a.png)

Closes apache#23158 from shahidki31/HistoryLastUpdateTime.

Authored-by: Shahid <shahidki31@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
  • Loading branch information
shahidki31 authored and jackylee-ch committed Feb 18, 2019
1 parent 01b09bf commit 0f83458
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,28 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
if (info.appId.isDefined && fastInProgressParsing) {
// When fast in-progress parsing is on, we don't need to re-parse when the
// size changes, but we do need to invalidate any existing UIs.
// Also, we need to update the `lastUpdated time` to display the updated time in
// the HistoryUI and to avoid cleaning the inprogress app while running.
val appInfo = listing.read(classOf[ApplicationInfoWrapper], info.appId.get)

val attemptList = appInfo.attempts.map { attempt =>
if (attempt.info.attemptId == info.attemptId) {
new AttemptInfoWrapper(
attempt.info.copy(lastUpdated = new Date(newLastScanTime)),
attempt.logPath,
attempt.fileSize,
attempt.adminAcls,
attempt.viewAcls,
attempt.adminAclsGroups,
attempt.viewAclsGroups)
} else {
attempt
}
}

val updatedAppInfo = new ApplicationInfoWrapper(appInfo.info, attemptList)
listing.write(updatedAppInfo)

invalidateUI(info.appId.get, info.attemptId)
false
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,45 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
assert(!log2.exists())
}

test("should not clean inprogress application with lastUpdated time less than maxTime") {
val firstFileModifiedTime = TimeUnit.DAYS.toMillis(1)
val secondFileModifiedTime = TimeUnit.DAYS.toMillis(6)
val maxAge = TimeUnit.DAYS.toMillis(7)
val clock = new ManualClock(0)
val provider = new FsHistoryProvider(
createTestConf().set(MAX_LOG_AGE_S, maxAge / 1000), clock)
val log = newLogFile("inProgressApp1", None, inProgress = true)
writeFile(log, true, None,
SparkListenerApplicationStart(
"inProgressApp1", Some("inProgressApp1"), 3L, "test", Some("attempt1"))
)
clock.setTime(firstFileModifiedTime)
log.setLastModified(clock.getTimeMillis())
provider.checkForLogs()
writeFile(log, true, None,
SparkListenerApplicationStart(
"inProgressApp1", Some("inProgressApp1"), 3L, "test", Some("attempt1")),
SparkListenerJobStart(0, 1L, Nil, null)
)

clock.setTime(secondFileModifiedTime)
log.setLastModified(clock.getTimeMillis())
provider.checkForLogs()
clock.setTime(TimeUnit.DAYS.toMillis(10))
writeFile(log, true, None,
SparkListenerApplicationStart(
"inProgressApp1", Some("inProgressApp1"), 3L, "test", Some("attempt1")),
SparkListenerJobStart(0, 1L, Nil, null),
SparkListenerJobEnd(0, 1L, JobSucceeded)
)
log.setLastModified(clock.getTimeMillis())
provider.checkForLogs()
// This should not trigger any cleanup
updateAndCheck(provider) { list =>
list.size should be(1)
}
}

test("log cleaner for inProgress files") {
val firstFileModifiedTime = TimeUnit.SECONDS.toMillis(10)
val secondFileModifiedTime = TimeUnit.SECONDS.toMillis(20)
Expand Down

0 comments on commit 0f83458

Please sign in to comment.