Skip to content

Commit

Permalink
Report application start/end times to HistoryServer
Browse files Browse the repository at this point in the history
This involves adding application start and end events. This also
allows us to record the actual app name instead of simply using
the name of the directory.
  • Loading branch information
andrewor14 committed Mar 21, 2014
1 parent 8aac163 commit 7584418
Show file tree
Hide file tree
Showing 21 changed files with 250 additions and 63 deletions.
16 changes: 16 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ class SparkContext(
dagScheduler.start()

postEnvironmentUpdate()
postApplicationStart()

/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = {
Expand Down Expand Up @@ -826,6 +827,7 @@ class SparkContext(

/** Shut down the SparkContext. */
def stop() {
postApplicationEnd()
ui.stop()
eventLogger.foreach(_.stop())
// Do this only if not stopped already - best case effort.
Expand Down Expand Up @@ -1066,6 +1068,20 @@ class SparkContext(
/** Register a new RDD, returning its RDD ID */
private[spark] def newRddId(): Int = nextRddId.getAndIncrement()

/** Post the application start event */
private def postApplicationStart() {
listenerBus.post(SparkListenerApplicationStart(appName, startTime))
}

/**
* Post the application end event to all listeners immediately, rather than adding it
* to the event queue for it to be asynchronously processed eventually. Otherwise, a race
* condition exists in which the listeners may stop before this event has been propagated.
*/
private def postApplicationEnd() {
listenerBus.post(SparkListenerApplicationEnd(System.currentTimeMillis), blocking = true)
}

/** Post the environment update event once the task scheduler is ready */
private def postEnvironmentUpdate() {
if (taskScheduler != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.deploy.SparkUIContainer
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.Utils
import org.apache.spark.scheduler.ReplayListenerBus
import org.apache.spark.scheduler.{ApplicationListener, ReplayListenerBus}

/**
* A web server that re-renders SparkUIs of finished applications.
Expand Down Expand Up @@ -59,11 +59,7 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int, conf: SparkConf)
(request: HttpServletRequest) => indexPage.render(request), securityMgr = securityManager)
)

// A mapping from an event log path to the associated, already rendered, SparkUI
val logPathToUI = mutable.HashMap[String, SparkUI]()

// A mapping from an event log path to a timestamp of when it was last updated
val logPathToLastUpdated = mutable.HashMap[String, Long]()
val appIdToInfo = mutable.HashMap[String, ApplicationHistoryInfo]()

/** Bind to the HTTP server behind this web interface */
override def bind() {
Expand All @@ -78,6 +74,12 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int, conf: SparkConf)
checkForLogs()
}

/** Parse app ID from the given log path. */
def getAppId(logPath: String): String = logPath.split("/").last

/** Return the address of this server. */
def getAddress = "http://" + host + ":" + boundPort

/**
* Check for any updated event logs.
*
Expand All @@ -92,46 +94,56 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int, conf: SparkConf)
// Render any missing or outdated SparkUI
logDirs.foreach { dir =>
val path = dir.getPath.toString
val lastUpdated = dir.getModificationTime
if (!logPathToLastUpdated.contains(path) ||
logPathToLastUpdated.getOrElse(path, -1L) < lastUpdated) {
maybeRenderUI(path, lastUpdated)
val appId = getAppId(path)
val lastUpdated = {
val logFiles = fileSystem.listStatus(dir.getPath)
if (logFiles != null) logFiles.map(_.getModificationTime).max else dir.getModificationTime
}
if (!appIdToInfo.contains(appId) || appIdToInfo(appId).lastUpdated < lastUpdated) {
maybeRenderUI(appId, path, lastUpdated)
}
}

// Remove any outdated SparkUIs
val logPaths = logDirs.map(_.getPath.toString)
logPathToUI.foreach { case (path, ui) =>
if (!logPaths.contains(path)) {
detachUI(ui)
logPathToUI.remove(path)
logPathToLastUpdated.remove(path)
val appIds = logDirs.map { dir => getAppId(dir.getPath.toString) }
appIdToInfo.foreach { case (appId, info) =>
if (!appIds.contains(appId)) {
detachUI(info.ui)
appIdToInfo.remove(appId)
}
}
}

/** Attempt to render a new SparkUI from event logs residing in the given log directory. */
def maybeRenderUI(logPath: String, lastUpdated: Long) {
val appName = getAppName(logPath)
private def maybeRenderUI(appId: String, logPath: String, lastUpdated: Long) {
val replayBus = new ReplayListenerBus(conf)
val ui = new SparkUI(conf, replayBus, appName, "/history/%s".format(appName))
val appListener = new ApplicationListener
replayBus.addListener(appListener)
val ui = new SparkUI(conf, replayBus, appId, "/history/%s".format(appId))

// Do not call ui.bind() to avoid creating a new server for each application
ui.start()
val success = replayBus.replay(logPath)
if (success) {
attachUI(ui)
logPathToUI(logPath) = ui
logPathToLastUpdated(logPath) = lastUpdated
if (!appListener.started) {
logWarning("Application has event logs but has not started: %s".format(appId))
}
val appName = appListener.appName
val startTime = appListener.startTime
val endTime = appListener.endTime
val info = ApplicationHistoryInfo(appName, startTime, endTime, lastUpdated, logPath, ui)

// If the UI already exists, terminate it and replace it
appIdToInfo.remove(appId).foreach { info => detachUI(info.ui) }
appIdToInfo(appId) = info

// Use mnemonic original app name rather than app ID
val originalAppName = "%s (history)".format(appName)
ui.setAppName(originalAppName)
}
}

/** Parse app name from the given log path. */
def getAppName(logPath: String): String = logPath.split("/").last

/** Return the address of this server. */
def getAddress = "http://" + host + ":" + boundPort

}

object HistoryServer {
Expand All @@ -147,3 +159,14 @@ object HistoryServer {
while(true) { Thread.sleep(Int.MaxValue) }
}
}

private[spark] case class ApplicationHistoryInfo(
name: String,
startTime: Long,
endTime: Long,
lastUpdated: Long,
logPath: String,
ui: SparkUI) {
def started = startTime != -1
def finished = endTime != -1
}
47 changes: 30 additions & 17 deletions core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,18 @@ import javax.servlet.http.HttpServletRequest

import scala.xml.Node

import org.apache.spark.ui.{SparkUI, UIUtils}
import org.apache.spark.deploy.DeployWebUI
import org.apache.spark.ui.UIUtils

private[spark] class IndexPage(parent: HistoryServer) {
val dateFmt = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
private val dateFmt = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")

def render(request: HttpServletRequest): Seq[Node] = {
// Check if logs have been updated
parent.checkForLogs()

// Populate app table, with most recently modified first
val appRows = parent.logPathToLastUpdated.toSeq
.sortBy { case (path, lastUpdated) => -lastUpdated }
.map { case (path, lastUpdated) =>
// (appName, lastUpdated, UI)
(parent.getAppName(path), lastUpdated, parent.logPathToUI(path))
}
// Populate app table, with most recently modified app first
val appRows = parent.appIdToInfo.values.toSeq.sortBy { app => -app.lastUpdated }
val appTable = UIUtils.listingTable(appHeader, appRow, appRows)

val content =
Expand All @@ -54,14 +50,31 @@ private[spark] class IndexPage(parent: HistoryServer) {
UIUtils.basicSparkPage(content, "History Server")
}

private val appHeader = Seq[String]("App Name", "Last Updated")
private val appHeader = Seq(
"App Name",
"Started",
"Finished",
"Duration",
"Log Directory",
"Last Updated")

private def appRow(info: (String, Long, SparkUI)): Seq[Node] = {
info match { case (appName, lastUpdated, ui) =>
<tr>
<td><a href={parent.getAddress + ui.basePath}>{appName}</a></td>
<td>{dateFmt.format(new Date(lastUpdated))}</td>
</tr>
}
private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
val appName = if (info.started) info.name else parent.getAppId(info.logPath)
val uiAddress = parent.getAddress + info.ui.basePath
val startTime = if (info.started) dateFmt.format(new Date(info.startTime)) else "Not started"
val endTime = if (info.finished) dateFmt.format(new Date(info.endTime)) else "Not finished"
val difference = if (info.started && info.finished) info.endTime - info.startTime else -1L
val duration = if (difference > 0) DeployWebUI.formatDuration(difference) else "---"
val logDirectory = parent.getAppId(info.logPath)
val lastUpdated = dateFmt.format(new Date(info.lastUpdated))

<tr>
<td><a href={uiAddress}>{appName}</a></td>
<td>{startTime}</td>
<td>{endTime}</td>
<td>{duration}</td>
<td>{logDirectory}</td>
<td>{lastUpdated}</td>
</tr>
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler

/**
* A simple listener for application events.
*
* This listener assumes at most one of each of SparkListenerApplicationStart and
* SparkListenerApplicationEnd will be received. Otherwise, only the latest event
* of each type will take effect.
*/
private[spark] class ApplicationListener extends SparkListener {
var appName = "<Not Started>"
var startTime = -1L
var endTime = -1L

def started = startTime != -1

def finished = endTime != -1

def duration: Long = {
val difference = endTime - startTime
if (started && finished && difference > 0) difference else -1L
}

override def onApplicationStart(applicationStart: SparkListenerApplicationStart) {
appName = applicationStart.appName
startTime = applicationStart.time
}

override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
endTime = applicationEnd.time
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ private[spark] class EventLoggingListener(appName: String, conf: SparkConf)
logEvent(event, flushLogger = true)
override def onUnpersistRDD(event: SparkListenerUnpersistRDD) =
logEvent(event, flushLogger = true)
override def onApplicationStart(event: SparkListenerApplicationStart) =
logEvent(event, flushLogger = true)
override def onApplicationEnd(event: SparkListenerApplicationEnd) =
logEvent(event, flushLogger = true)

def stop() = logger.stop()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,18 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
}.start()
}

def post(event: SparkListenerEvent) {
val eventAdded = eventQueue.offer(event)
if (!eventAdded && !queueFullErrorMessageLogged) {
logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
"This likely means one of the SparkListeners is too slow and cannot keep up with the " +
"rate at which tasks are being started by the scheduler.")
queueFullErrorMessageLogged = true
def post(event: SparkListenerEvent, blocking: Boolean = false) {
if (!blocking) {
val eventAdded = eventQueue.offer(event)
if (!eventAdded && !queueFullErrorMessageLogged) {
logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
"This likely means one of the SparkListeners is too slow and cannot keep up with the " +
"rate at which tasks are being started by the scheduler.")
queueFullErrorMessageLogged = true
}
} else {
// Bypass the event queue and post to all attached listeners immediately
postToAll(event)
}
}

Expand Down
14 changes: 14 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId)

case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent

case class SparkListenerApplicationStart(appName: String, time: Long) extends SparkListenerEvent

case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent

/** An event used in the listener to shutdown the listener daemon thread. */
private[spark] case object SparkListenerShutdown extends SparkListenerEvent

Expand Down Expand Up @@ -125,6 +129,16 @@ trait SparkListener {
* Called when an RDD is manually unpersisted by the application
*/
def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) { }

/**
* Called when the application starts
*/
def onApplicationStart(applicationStart: SparkListenerApplicationStart) { }

/**
* Called when the application ends
*/
def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { }
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ private[spark] trait SparkListenerBus {
sparkListeners.foreach(_.onBlockManagerRemoved(blockManagerRemoved))
case unpersistRDD: SparkListenerUnpersistRDD =>
sparkListeners.foreach(_.onUnpersistRDD(unpersistRDD))
case applicationStart: SparkListenerApplicationStart =>
sparkListeners.foreach(_.onApplicationStart(applicationStart))
case applicationEnd: SparkListenerApplicationEnd =>
sparkListeners.foreach(_.onApplicationEnd(applicationEnd))
case SparkListenerShutdown =>
}
}
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ private[spark] class SparkUI(
val sc: SparkContext,
conf: SparkConf,
val listenerBus: SparkListenerBus,
val appName: String,
var appName: String,
val basePath: String = "")
extends WebUI("SparkUI") with Logging {

Expand Down Expand Up @@ -75,6 +75,8 @@ private[spark] class SparkUI(
// Maintain executor storage status through Spark events
val storageStatusListener = new StorageStatusListener

def setAppName(name: String) = appName = name

/** Initialize all components of the server */
def start() {
storage.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ import org.apache.spark.ui.JettyUtils._
import org.apache.spark.ui.Page.Environment

private[ui] class EnvironmentUI(parent: SparkUI) {
private val appName = parent.appName
private val basePath = parent.basePath
private var _listener: Option[EnvironmentListener] = None

private def appName = parent.appName

lazy val listener = _listener.get

def start() {
Expand Down
Loading

0 comments on commit 7584418

Please sign in to comment.