From 373f3b942e088b53df6a3bf4547a27a0d4001621 Mon Sep 17 00:00:00 2001 From: xukun 00228947 Date: Wed, 25 Feb 2015 17:36:01 +0800 Subject: [PATCH] fix issue --- .../scala/org/apache/spark/SparkConf.scala | 6 +- .../deploy/history/FsHistoryProvider.scala | 56 ++++++++----------- docs/monitoring.md | 10 ++-- 3 files changed, 33 insertions(+), 39 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 0dbd26146cb13..249dfc04077a0 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -362,7 +362,11 @@ private[spark] object SparkConf extends Logging { DeprecatedConfig("spark.files.userClassPathFirst", "spark.executor.userClassPathFirst", "1.3"), DeprecatedConfig("spark.yarn.user.classpath.first", null, "1.3", - "Use spark.{driver,executor}.userClassPathFirst instead.")) + "Use spark.{driver,executor}.userClassPathFirst instead."), + DeprecatedConfig("spark.history.fs.updateInterval", "spark.history.fs.update.interval.seconds", + "1.3", "Use spark.history.fs.update.interval.seconds instead"), + DeprecatedConfig("spark.history.updateInterval", "spark.history.fs.update.interval.seconds", + "1.3", "Use spark.history.fs.update.interval.seconds instead")) configs.map { x => (x.oldName, x) }.toMap } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 7f2f16c6c466f..bf7b9f97c00a5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -47,27 +47,10 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private val NOT_STARTED = "" - // One day - private val DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S = Duration(1, TimeUnit.DAYS).toSeconds - - // One week - private val DEFAULT_SPARK_HISTORY_FS_MAXAGE_S = Duration(7, TimeUnit.DAYS).toSeconds - - private def warnUpdateInterval(key: String, value: String): String = { - logWarning(s"Using $key to set interval " + - "between each check for event log updates is deprecated, " + - "please use spark.history.fs.update.interval.seconds instead.") - value - } - - private def getDeprecatedConfig(conf: SparkConf, key: String): Option[String] = { - conf.getOption(key).map(warnUpdateInterval(key, _)) - } - // Interval between each check for event log updates private val UPDATE_INTERVAL_MS = conf.getOption("spark.history.fs.update.interval.seconds") - .orElse(getDeprecatedConfig(conf, "spark.history.fs.updateInterval")) - .orElse(getDeprecatedConfig(conf, "spark.history.updateInterval")) + .orElse(Some(SparkConf.translateConfKey("spark.history.fs.updateInterval", true))) + .orElse(Some(SparkConf.translateConfKey("spark.history.updateInterval", true))) .map(_.toInt) .getOrElse(10) * 1000 @@ -81,6 +64,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private val fs = Utils.getHadoopFileSystem(logDir, SparkHadoopUtil.get.newConfiguration(conf)) + // Used by check event thread and clean log thread. // Scheduled thread pool size must be one, otherwise it will have concurrent issues about fs // and applications between check task and clean task. private val pool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() @@ -103,15 +87,15 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private[history] val APPLICATION_COMPLETE = "APPLICATION_COMPLETE" /** - * A background thread that periodically do something about event log. + * Return a runnable that performs the given operation on the event logs. + * This operation is expected to be executed periodically. */ private def getRunner(operateFun: () => Unit): Runnable = { - val runnable = new Runnable() { + new Runnable() { override def run() = Utils.logUncaughtExceptions { operateFun() } } - runnable } initialize() @@ -137,7 +121,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis pool.scheduleAtFixedRate(getRunner(checkForLogs), 0, UPDATE_INTERVAL_MS, TimeUnit.MILLISECONDS) - if (conf.getBoolean("spark.history.fs.cleaner.enable", false)) { + if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) { // A task that periodically cleans event logs on disk. pool.scheduleAtFixedRate(getRunner(cleanLogs), 0, CLEAN_INTERVAL_MS, TimeUnit.MILLISECONDS) @@ -253,7 +237,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis /** * Delete event logs from the log directory according to the clean policy defined by the user. */ - private def cleanLogs() = { + private def cleanLogs(): Unit = { try { val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq) .getOrElse(Seq[FileStatus]()) @@ -261,27 +245,27 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000 val now = System.currentTimeMillis() - val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() - def addIfNotExpire(info: FsApplicationHistoryInfo) = { + val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() + + applications.values.foreach { if (now - info.lastUpdated <= maxAge) { - newApps += (info.id -> info) + appsToRetain += (info.id -> info) } } - val oldIterator = applications.values.iterator.buffered - oldIterator.foreach(addIfNotExpire) - - applications = newApps + applications = appsToRetain // Scan all logs from the log directory. - // Only directories older than now maxAge milliseconds mill will be deleted + // Only directories older than the specified max age will be deleted statusList.foreach { dir => try { if (now - dir.getModificationTime() > maxAge) { + // if path is a directory and set to true, + // the directory is deleted else throws an exception fs.delete(dir.getPath, true) } } catch { - case t: IOException => logError(s"IOException in cleaning logs of $dir", t) + case t: IOException => logError(s"IOException in cleaning logs of $dir", t) } } } catch { @@ -410,6 +394,12 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private object FsHistoryProvider { val DEFAULT_LOG_DIR = "file:/tmp/spark-events" + + // One day + val DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S = Duration(1, TimeUnit.DAYS).toSeconds + + // One week + val DEFAULT_SPARK_HISTORY_FS_MAXAGE_S = Duration(7, TimeUnit.DAYS).toSeconds } private class FsApplicationHistoryInfo( diff --git a/docs/monitoring.md b/docs/monitoring.md index 35a30a8264665..37ede476c187d 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -86,7 +86,7 @@ follows: - spark.history.fs.interval.seconds + spark.history.fs.update.interval.seconds 10 The period, in seconds, at which information displayed by this history server is updated. @@ -146,7 +146,7 @@ follows: - spark.history.fs.cleaner.enable + spark.history.fs.cleaner.enabled false Specifies whether the History Server should periodically clean up event logs from storage. @@ -156,16 +156,16 @@ follows: spark.history.fs.cleaner.interval.seconds 86400 - How often the job history cleaner checks for files to delete, in seconds. Defaults to 864000 (one day). + How often the job history cleaner checks for files to delete, in seconds. Defaults to 86400 (one day). Files are only deleted if they are older than spark.history.fs.cleaner.maxAge.seconds. spark.history.fs.cleaner.maxAge.seconds - 604800 + 3600 * 24 * 7 Job history files older than this many seconds will be deleted when the history cleaner runs. - Defaults to 604800 (1 week). + Defaults to 3600 * 24 * 7 (1 week).