From 50d5c23e15e1e535e55122f94456d366a8a1765d Mon Sep 17 00:00:00 2001 From: Vlad Glinsky Date: Fri, 18 Dec 2020 15:16:45 -0800 Subject: [PATCH] [SPARK-33841][CORE][3.1] Fix issue with jobs disappearing intermittently from the SHS under high load ### What changes were proposed in this pull request? Mark SHS event log entries that were `processing` at the beginning of the `checkForLogs` run as not stale and check for this mark before deleting an event log. This fixes the issue when a particular job was displayed in the SHS and disappeared after some time, but then, in several minutes showed up again. ### Why are the changes needed? The issue is caused by [SPARK-29043](https://issues.apache.org/jira/browse/SPARK-29043), which is designated to improve the concurrent performance of the History Server. The [change](https://github.com/apache/spark/pull/25797/files#) breaks the ["app deletion" logic](https://github.com/apache/spark/pull/25797/files#diff-128a6af0d78f4a6180774faedb335d6168dfc4defff58f5aa3021fc1bd767bc0R563) because of missing proper synchronization for `processing` event log entries. Since SHS now [filters out](https://github.com/apache/spark/pull/25797/files#diff-128a6af0d78f4a6180774faedb335d6168dfc4defff58f5aa3021fc1bd767bc0R462) all `processing` event log entries, such entries do not have a chance to be [updated with the new `lastProcessed`](https://github.com/apache/spark/pull/25797/files#diff-128a6af0d78f4a6180774faedb335d6168dfc4defff58f5aa3021fc1bd767bc0R472) time and thus any entity that completes processing right after [filtering](https://github.com/apache/spark/pull/25797/files#diff-128a6af0d78f4a6180774faedb335d6168dfc4defff58f5aa3021fc1bd767bc0R462) and before [the check for stale entities](https://github.com/apache/spark/pull/25797/files#diff-128a6af0d78f4a6180774faedb335d6168dfc4defff58f5aa3021fc1bd767bc0R560) will be identified as stale and will be deleted from the UI until the next `checkForLogs` run. This is because [updated `lastProcessed` time is used as criteria](https://github.com/apache/spark/pull/25797/files#diff-128a6af0d78f4a6180774faedb335d6168dfc4defff58f5aa3021fc1bd767bc0R557), and event log entries that missed to be updated with a new time, will match that criteria. The issue can be reproduced by generating a big number of event logs and uploading them to the SHS event log directory on S3. Essentially, around 439(49.6 MB) copies of an event log file were created using [shs-monitor](https://github.com/vladhlinsky/shs-monitor/tree/branch-3.1) script. Strange behavior of SHS counting the total number of applications was noticed - at first, the number was increasing as expected, but with the next page refresh, the total number of applications decreased. No errors were logged by SHS. 252 entities are displayed at `21:20:23`: ![1-252-entries-at-21-20](https://user-images.githubusercontent.com/61428392/102653857-40901f00-4178-11eb-9d61-6a20e359abb2.png) 178 entities are displayed at `21:22:15`: ![2-178-at-21-22](https://user-images.githubusercontent.com/61428392/102653900-530a5880-4178-11eb-94fb-3f28b082b25a.png) ### Does this PR introduce _any_ user-facing change? Yes, SHS users won't face the behavior when the number of displayed applications decreases periodically. ### How was this patch tested? Tested using [shs-monitor](https://github.com/vladhlinsky/shs-monitor/tree/branch-3.1) script: * Build SHS with the proposed change * Download Hadoop AWS and AWS Java SDK * Prepare S3 bucket and user for programmatic access, grant required roles to the user. Get access key and secret key * Configure SHS to read event logs from S3 * Start [monitor](https://github.com/vladhlinsky/shs-monitor/blob/branch-3.1/monitor.sh) script to query SHS API * Run [producers](https://github.com/vladhlinsky/shs-monitor/blob/branch-3.1/producer.sh) * Wait for SHS to load all the applications * Verify that the number of loaded applications increases continuously over time For more details, please refer to the [shs-monitor](https://github.com/vladhlinsky/shs-monitor/tree/branch-3.1) repository. Closes #30847 from vladhlinsky/SPARK-33841-branch-3.1. Authored-by: Vlad Glinsky Signed-off-by: Dongjoon Hyun --- .../deploy/history/FsHistoryProvider.scala | 26 ++++++++++++++----- 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index e6df260bdeaa3..d35d8606eb4b4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -474,9 +474,21 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val newLastScanTime = clock.getTimeMillis() logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime") + // Mark entries that are processing as not stale. Such entries do not have a chance to be + // updated with the new 'lastProcessed' time and thus any entity that completes processing + // right after this check and before the check for stale entities will be identified as stale + // and will be deleted from the UI until the next 'checkForLogs' run. + val notStale = mutable.HashSet[String]() val updated = Option(fs.listStatus(new Path(logDir))).map(_.toSeq).getOrElse(Nil) .filter { entry => isAccessible(entry.getPath) } - .filter { entry => !isProcessing(entry.getPath) } + .filter { entry => + if (isProcessing(entry.getPath)) { + notStale.add(entry.getPath.toString()) + false + } else { + true + } + } .flatMap { entry => EventLogFileReader(fs, entry) } .filter { reader => try { @@ -576,12 +588,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) .last(newLastScanTime - 1) .asScala .toList - stale.filterNot(isProcessing).foreach { log => - log.appId.foreach { appId => - cleanAppData(appId, log.attemptId, log.logPath) - listing.delete(classOf[LogInfo], log.logPath) + stale.filterNot(isProcessing) + .filterNot(info => notStale.contains(info.logPath)) + .foreach { log => + log.appId.foreach { appId => + cleanAppData(appId, log.attemptId, log.logPath) + listing.delete(classOf[LogInfo], log.logPath) + } } - } lastScanTime.set(newLastScanTime) } catch {