Skip to content

Commit

Permalink
fix issue
Browse files Browse the repository at this point in the history
  • Loading branch information
viper-kun committed Feb 25, 2015
1 parent 71782b5 commit 373f3b9
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 39 deletions.
6 changes: 5 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,11 @@ private[spark] object SparkConf extends Logging {
DeprecatedConfig("spark.files.userClassPathFirst", "spark.executor.userClassPathFirst",
"1.3"),
DeprecatedConfig("spark.yarn.user.classpath.first", null, "1.3",
"Use spark.{driver,executor}.userClassPathFirst instead."))
"Use spark.{driver,executor}.userClassPathFirst instead."),
DeprecatedConfig("spark.history.fs.updateInterval", "spark.history.fs.update.interval.seconds",
"1.3", "Use spark.history.fs.update.interval.seconds instead"),
DeprecatedConfig("spark.history.updateInterval", "spark.history.fs.update.interval.seconds",
"1.3", "Use spark.history.fs.update.interval.seconds instead"))
configs.map { x => (x.oldName, x) }.toMap
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,27 +47,10 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis

private val NOT_STARTED = "<Not Started>"

// One day
private val DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S = Duration(1, TimeUnit.DAYS).toSeconds

// One week
private val DEFAULT_SPARK_HISTORY_FS_MAXAGE_S = Duration(7, TimeUnit.DAYS).toSeconds

private def warnUpdateInterval(key: String, value: String): String = {
logWarning(s"Using $key to set interval " +
"between each check for event log updates is deprecated, " +
"please use spark.history.fs.update.interval.seconds instead.")
value
}

private def getDeprecatedConfig(conf: SparkConf, key: String): Option[String] = {
conf.getOption(key).map(warnUpdateInterval(key, _))
}

// Interval between each check for event log updates
private val UPDATE_INTERVAL_MS = conf.getOption("spark.history.fs.update.interval.seconds")
.orElse(getDeprecatedConfig(conf, "spark.history.fs.updateInterval"))
.orElse(getDeprecatedConfig(conf, "spark.history.updateInterval"))
.orElse(Some(SparkConf.translateConfKey("spark.history.fs.updateInterval", true)))
.orElse(Some(SparkConf.translateConfKey("spark.history.updateInterval", true)))
.map(_.toInt)
.getOrElse(10) * 1000

Expand All @@ -81,6 +64,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis

private val fs = Utils.getHadoopFileSystem(logDir, SparkHadoopUtil.get.newConfiguration(conf))

// Used by check event thread and clean log thread.
// Scheduled thread pool size must be one, otherwise it will have concurrent issues about fs
// and applications between check task and clean task.
private val pool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
Expand All @@ -103,15 +87,15 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
private[history] val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"

/**
* A background thread that periodically do something about event log.
* Return a runnable that performs the given operation on the event logs.
* This operation is expected to be executed periodically.
*/
private def getRunner(operateFun: () => Unit): Runnable = {
val runnable = new Runnable() {
new Runnable() {
override def run() = Utils.logUncaughtExceptions {
operateFun()
}
}
runnable
}

initialize()
Expand All @@ -137,7 +121,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
pool.scheduleAtFixedRate(getRunner(checkForLogs), 0, UPDATE_INTERVAL_MS,
TimeUnit.MILLISECONDS)

if (conf.getBoolean("spark.history.fs.cleaner.enable", false)) {
if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) {
// A task that periodically cleans event logs on disk.
pool.scheduleAtFixedRate(getRunner(cleanLogs), 0, CLEAN_INTERVAL_MS,
TimeUnit.MILLISECONDS)
Expand Down Expand Up @@ -253,35 +237,35 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
/**
* Delete event logs from the log directory according to the clean policy defined by the user.
*/
private def cleanLogs() = {
private def cleanLogs(): Unit = {
try {
val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
.getOrElse(Seq[FileStatus]())
val maxAge = conf.getLong("spark.history.fs.cleaner.maxAge.seconds",
DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000

val now = System.currentTimeMillis()
val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
def addIfNotExpire(info: FsApplicationHistoryInfo) = {
val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()

applications.values.foreach {
if (now - info.lastUpdated <= maxAge) {
newApps += (info.id -> info)
appsToRetain += (info.id -> info)
}
}

val oldIterator = applications.values.iterator.buffered
oldIterator.foreach(addIfNotExpire)

applications = newApps
applications = appsToRetain

// Scan all logs from the log directory.
// Only directories older than now maxAge milliseconds mill will be deleted
// Only directories older than the specified max age will be deleted
statusList.foreach { dir =>
try {
if (now - dir.getModificationTime() > maxAge) {
// if path is a directory and set to true,
// the directory is deleted else throws an exception
fs.delete(dir.getPath, true)
}
} catch {
case t: IOException => logError(s"IOException in cleaning logs of $dir", t)
case t: IOException => logError(s"IOException in cleaning logs of $dir", t)
}
}
} catch {
Expand Down Expand Up @@ -410,6 +394,12 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis

private object FsHistoryProvider {
val DEFAULT_LOG_DIR = "file:/tmp/spark-events"

// One day
val DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S = Duration(1, TimeUnit.DAYS).toSeconds

// One week
val DEFAULT_SPARK_HISTORY_FS_MAXAGE_S = Duration(7, TimeUnit.DAYS).toSeconds
}

private class FsApplicationHistoryInfo(
Expand Down
10 changes: 5 additions & 5 deletions docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ follows:
</td>
</tr>
<tr>
<td>spark.history.fs.interval.seconds</td>
<td>spark.history.fs.update.interval.seconds</td>
<td>10</td>
<td>
The period, in seconds, at which information displayed by this history server is updated.
Expand Down Expand Up @@ -146,7 +146,7 @@ follows:
</td>
</tr>
<tr>
<td>spark.history.fs.cleaner.enable</td>
<td>spark.history.fs.cleaner.enabled</td>
<td>false</td>
<td>
Specifies whether the History Server should periodically clean up event logs from storage.
Expand All @@ -156,16 +156,16 @@ follows:
<td>spark.history.fs.cleaner.interval.seconds</td>
<td>86400</td>
<td>
How often the job history cleaner checks for files to delete, in seconds. Defaults to 864000 (one day).
How often the job history cleaner checks for files to delete, in seconds. Defaults to 86400 (one day).
Files are only deleted if they are older than spark.history.fs.cleaner.maxAge.seconds.
</td>
</tr>
<tr>
<td>spark.history.fs.cleaner.maxAge.seconds</td>
<td>604800</td>
<td>3600 * 24 * 7</td>
<td>
Job history files older than this many seconds will be deleted when the history cleaner runs.
Defaults to 604800 (1 week).
Defaults to 3600 * 24 * 7 (1 week).
</td>
</tr>
</table>
Expand Down

0 comments on commit 373f3b9

Please sign in to comment.