Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26186][SPARK-26184][CORE] Last updated time is not getting updated for the Inprogress application #23158

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,28 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
if (info.appId.isDefined && fastInProgressParsing) {
// When fast in-progress parsing is on, we don't need to re-parse when the
// size changes, but we do need to invalidate any existing UIs.
// Also, we need to update the `lastUpdated time` to display the updated time in
// the HistoryUI and to avoid cleaning the inprogress app while running.
val appInfo = listing.read(classOf[ApplicationInfoWrapper], info.appId.get)

val attemptList = appInfo.attempts.map { attempt =>
if (attempt.info.attemptId == info.attemptId) {
new AttemptInfoWrapper(
attempt.info.copy(lastUpdated = new Date(newLastScanTime)),
attempt.logPath,
attempt.fileSize,
attempt.adminAcls,
attempt.viewAcls,
attempt.adminAclsGroups,
attempt.viewAclsGroups)
} else {
attempt
}
}

val updatedAppInfo = new ApplicationInfoWrapper(appInfo.info, attemptList)
listing.write(updatedAppInfo)

invalidateUI(info.appId.get, info.attemptId)
false
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,45 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
assert(!log2.exists())
}

test("should not clean inprogress application with lastUpdated time less than maxTime") {
val firstFileModifiedTime = TimeUnit.DAYS.toMillis(1)
val secondFileModifiedTime = TimeUnit.DAYS.toMillis(6)
val maxAge = TimeUnit.DAYS.toMillis(7)
val clock = new ManualClock(0)
val provider = new FsHistoryProvider(
createTestConf().set(MAX_LOG_AGE_S, maxAge / 1000), clock)
val log = newLogFile("inProgressApp1", None, inProgress = true)
writeFile(log, true, None,
SparkListenerApplicationStart(
"inProgressApp1", Some("inProgressApp1"), 3L, "test", Some("attempt1"))
)
clock.setTime(firstFileModifiedTime)
log.setLastModified(clock.getTimeMillis())
provider.checkForLogs()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to set the log file's modified time before calling this, otherwise the cleaner won't be checking what you expect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added Thanks.
But for inProgress application, do we really need to set log file's last modified time, as the cleaner check only the application's lastUpdated time, which we update whenever size of the logFile changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps. Better to be consistent with other tests. Also because you're using a manual clock, and otherwise your mod times will be way higher than the clock's time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, Thanks.

writeFile(log, true, None,
SparkListenerApplicationStart(
"inProgressApp1", Some("inProgressApp1"), 3L, "test", Some("attempt1")),
SparkListenerJobStart(0, 1L, Nil, null)
)

clock.setTime(secondFileModifiedTime)
log.setLastModified(clock.getTimeMillis())
provider.checkForLogs()
clock.setTime(TimeUnit.DAYS.toMillis(10))
writeFile(log, true, None,
SparkListenerApplicationStart(
"inProgressApp1", Some("inProgressApp1"), 3L, "test", Some("attempt1")),
SparkListenerJobStart(0, 1L, Nil, null),
SparkListenerJobEnd(0, 1L, JobSucceeded)
)
log.setLastModified(clock.getTimeMillis())
provider.checkForLogs()
// This should not trigger any cleanup
updateAndCheck(provider) { list =>
list.size should be(1)
}
}

test("log cleaner for inProgress files") {
val firstFileModifiedTime = TimeUnit.SECONDS.toMillis(10)
val secondFileModifiedTime = TimeUnit.SECONDS.toMillis(20)
Expand Down