diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index e6df260bdeaa3..d35d8606eb4b4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -474,9 +474,21 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val newLastScanTime = clock.getTimeMillis() logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime") + // Mark entries that are processing as not stale. Such entries do not have a chance to be + // updated with the new 'lastProcessed' time and thus any entity that completes processing + // right after this check and before the check for stale entities will be identified as stale + // and will be deleted from the UI until the next 'checkForLogs' run. + val notStale = mutable.HashSet[String]() val updated = Option(fs.listStatus(new Path(logDir))).map(_.toSeq).getOrElse(Nil) .filter { entry => isAccessible(entry.getPath) } - .filter { entry => !isProcessing(entry.getPath) } + .filter { entry => + if (isProcessing(entry.getPath)) { + notStale.add(entry.getPath.toString()) + false + } else { + true + } + } .flatMap { entry => EventLogFileReader(fs, entry) } .filter { reader => try { @@ -576,12 +588,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) .last(newLastScanTime - 1) .asScala .toList - stale.filterNot(isProcessing).foreach { log => - log.appId.foreach { appId => - cleanAppData(appId, log.attemptId, log.logPath) - listing.delete(classOf[LogInfo], log.logPath) + stale.filterNot(isProcessing) + .filterNot(info => notStale.contains(info.logPath)) + .foreach { log => + log.appId.foreach { appId => + cleanAppData(appId, log.attemptId, log.logPath) + listing.delete(classOf[LogInfo], log.logPath) + } } - } lastScanTime.set(newLastScanTime) } catch {