- val range = Bytes {startByte.toString} - {endByte.toString} of {logLength}
-
- val backButton =
- if (startByte > 0) {
-
-
-
- }
- else {
-
- }
-
- val nextButton =
- if (endByte < logLength) {
-
-
-
- }
- else {
-
- }
-
- val content =
-
-
- {linkToMaster}
-
-
{backButton}
-
{range}
-
{nextButton}
-
-
-
-
{logText}
-
-
-
- UIUtils.basicSparkPage(content, logType + " log page for " + appId)
- }
-
- /** Determine the byte range for a log or log page. */
- private def getByteRange(path: String, offset: Option[Long], byteLength: Int): (Long, Long) = {
- val defaultBytes = 100 * 1024
- val maxBytes = 1024 * 1024
- val file = new File(path)
- val logLength = file.length()
- val getOffset = offset.getOrElse(logLength - defaultBytes)
- val startByte =
- if (getOffset < 0) 0L
- else if (getOffset > logLength) logLength
- else getOffset
- val logPageLength = math.min(byteLength, maxBytes)
- val endByte = math.min(startByte + logPageLength, logLength)
- (startByte, endByte)
- }
-
- def stop() {
- assert(serverInfo.isDefined, "Attempted to stop a Worker UI that was not bound to a server!")
- serverInfo.get.server.stop()
- }
}
private[spark] object WorkerWebUI {
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index ef1ad872c8ef7..d8ea1b13362e3 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -17,103 +17,78 @@
package org.apache.spark.ui
-import org.eclipse.jetty.servlet.ServletContextHandler
-
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.scheduler._
import org.apache.spark.storage.StorageStatusListener
import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.ui.env.EnvironmentUI
-import org.apache.spark.ui.exec.ExecutorsUI
-import org.apache.spark.ui.jobs.JobProgressUI
-import org.apache.spark.ui.storage.BlockManagerUI
+import org.apache.spark.ui.env.EnvironmentTab
+import org.apache.spark.ui.exec.ExecutorsTab
+import org.apache.spark.ui.jobs.JobProgressTab
+import org.apache.spark.ui.storage.BlockManagerTab
import org.apache.spark.util.Utils
-/** Top level user interface for Spark */
+/**
+ * Top level user interface for Spark.
+ */
private[spark] class SparkUI(
val sc: SparkContext,
conf: SparkConf,
+ val securityManager: SecurityManager,
val listenerBus: SparkListenerBus,
val appName: String,
val basePath: String = "")
- extends Logging {
+ extends WebUI(securityManager, basePath) with Logging {
- def this(sc: SparkContext) = this(sc, sc.conf, sc.listenerBus, sc.appName)
+ def this(sc: SparkContext) = this(sc, sc.conf, sc.env.securityManager, sc.listenerBus, sc.appName)
def this(conf: SparkConf, listenerBus: SparkListenerBus, appName: String, basePath: String) =
- this(null, conf, listenerBus, appName, basePath)
+ this(null, conf, new SecurityManager(conf), listenerBus, appName, basePath)
// If SparkContext is not provided, assume the associated application is not live
val live = sc != null
- val securityManager = if (live) sc.env.securityManager else new SecurityManager(conf)
-
private val bindHost = Utils.localHostName()
private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost)
private val port = conf.get("spark.ui.port", SparkUI.DEFAULT_PORT).toInt
- private var serverInfo: Option[ServerInfo] = None
- private val storage = new BlockManagerUI(this)
- private val jobs = new JobProgressUI(this)
- private val env = new EnvironmentUI(this)
- private val exec = new ExecutorsUI(this)
+ // Maintain executor storage status through Spark events
+ val storageStatusListener = new StorageStatusListener
- val handlers: Seq[ServletContextHandler] = {
- val metricsServletHandlers = if (live) {
- SparkEnv.get.metricsSystem.getServletHandlers
- } else {
- Array[ServletContextHandler]()
+ /** Initialize all components of the server. Must be called before bind(). */
+ def start() {
+ attachTab(new BlockManagerTab(this))
+ attachTab(new JobProgressTab(this))
+ attachTab(new EnvironmentTab(this))
+ attachTab(new ExecutorsTab(this))
+ attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
+ attachHandler(createRedirectHandler("/", "/stages", basePath))
+ if (live) {
+ sc.env.metricsSystem.getServletHandlers.foreach(attachHandler)
}
- storage.getHandlers ++
- jobs.getHandlers ++
- env.getHandlers ++
- exec.getHandlers ++
- metricsServletHandlers ++
- Seq[ServletContextHandler] (
- createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"),
- createRedirectHandler("/", "/stages", basePath)
- )
+ // Storage status listener must receive events first, as other listeners depend on its state
+ listenerBus.addListener(storageStatusListener)
+ getListeners.foreach(listenerBus.addListener)
}
- // Maintain executor storage status through Spark events
- val storageStatusListener = new StorageStatusListener
-
- /** Bind the HTTP server which backs this web interface */
+ /** Bind to the HTTP server behind this web interface. */
def bind() {
+ assert(!handlers.isEmpty, "SparkUI has not started yet!")
try {
serverInfo = Some(startJettyServer(bindHost, port, handlers, sc.conf))
- logInfo("Started Spark Web UI at http://%s:%d".format(publicHost, boundPort))
+ logInfo("Started Spark web UI at http://%s:%d".format(publicHost, boundPort))
} catch {
case e: Exception =>
- logError("Failed to create Spark JettyUtils", e)
+ logError("Failed to create Spark web UI", e)
System.exit(1)
}
}
- def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)
-
- /** Initialize all components of the server */
- def start() {
- storage.start()
- jobs.start()
- env.start()
- exec.start()
-
- // Storage status listener must receive events first, as other listeners depend on its state
- listenerBus.addListener(storageStatusListener)
- listenerBus.addListener(storage.listener)
- listenerBus.addListener(jobs.listener)
- listenerBus.addListener(env.listener)
- listenerBus.addListener(exec.listener)
- }
-
- def stop() {
- assert(serverInfo.isDefined, "Attempted to stop a SparkUI that was not bound to a server!")
- serverInfo.get.server.stop()
- logInfo("Stopped Spark Web UI at %s".format(appUIAddress))
+ /** Stop the server behind this web interface. Only valid after bind(). */
+ override def stop() {
+ super.stop()
+ logInfo("Stopped Spark web UI at %s".format(appUIAddress))
}
private[spark] def appUIAddress = "http://" + publicHost + ":" + boundPort
-
}
private[spark] object SparkUI {
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index a487924effbff..de4216849dc7d 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -17,6 +17,9 @@
package org.apache.spark.ui
+import java.text.SimpleDateFormat
+import java.util.Date
+
import scala.xml.Node
/** Utility functions for generating XML pages with spark content. */
@@ -24,9 +27,32 @@ private[spark] object UIUtils {
import Page._
+ // SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use.
+ private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
+ override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
+ }
+
+ def formatDate(date: Date): String = dateFormat.get.format(date)
+
+ def formatDate(timestamp: Long): String = dateFormat.get.format(new Date(timestamp))
+
+ def formatDuration(milliseconds: Long): String = {
+ val seconds = milliseconds.toDouble / 1000
+ if (seconds < 60) {
+ return "%.0f s".format(seconds)
+ }
+ val minutes = seconds / 60
+ if (minutes < 10) {
+ return "%.1f min".format(minutes)
+ } else if (minutes < 60) {
+ return "%.0f min".format(minutes)
+ }
+ val hours = minutes / 60
+ "%.1f h".format(hours)
+ }
+
// Yarn has to go through a proxy so the base uri is provided and has to be on all links
- private[spark] val uiRoot : String = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")).
- getOrElse("")
+ val uiRoot : String = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")).getOrElse("")
def prependBaseUri(basePath: String = "", resource: String = "") = uiRoot + basePath + resource
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
index a7b872f3445a4..6f7385086b534 100644
--- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -17,34 +17,100 @@
package org.apache.spark.ui
-import java.text.SimpleDateFormat
-import java.util.Date
+import javax.servlet.http.HttpServletRequest
+
+import org.eclipse.jetty.servlet.ServletContextHandler
+
+import org.apache.spark.SecurityManager
+import org.apache.spark.ui.JettyUtils._
+import org.apache.spark.util.Utils
+import scala.collection.mutable.ArrayBuffer
+import org.apache.spark.scheduler.SparkListener
+import scala.xml.Node
+import org.json4s.JsonAST.{JNothing, JValue}
/**
- * Utilities used throughout the web UI.
+ * The top level component of the UI hierarchy that contains the server.
+ *
+ * Each WebUI represents a collection of tabs, each of which in turn represents a collection of
+ * pages. The use of tabs is optional, however; a WebUI may choose to include pages directly.
+ * All tabs and pages must be attached before bind()'ing the server.
*/
-private[spark] object WebUI {
- // SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use.
- private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
- override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
+private[spark] abstract class WebUI(securityManager: SecurityManager, basePath: String = "") {
+ protected val tabs = ArrayBuffer[UITab]()
+ protected val handlers = ArrayBuffer[ServletContextHandler]()
+ protected var serverInfo: Option[ServerInfo] = None
+
+ /** Attach a tab to this UI, along with all of its attached pages. Only valid before bind(). */
+ def attachTab(tab: UITab) {
+ tab.start()
+ tab.pages.foreach(attachPage)
+ tabs += tab
}
- def formatDate(date: Date): String = dateFormat.get.format(date)
+ /** Attach a page to this UI. Only valid before bind(). */
+ def attachPage(page: UIPage) {
+ val pagePath = "/" + page.prefix
+ attachHandler(createServletHandler(pagePath,
+ (request: HttpServletRequest) => page.render(request), securityManager, basePath))
+ if (page.includeJson) {
+ attachHandler(createServletHandler(pagePath.stripSuffix("/") + "/json",
+ (request: HttpServletRequest) => page.renderJson(request), securityManager, basePath))
+ }
+ }
- def formatDate(timestamp: Long): String = dateFormat.get.format(new Date(timestamp))
+ /** Attach a handler to this UI. Only valid before bind(). */
+ def attachHandler(handler: ServletContextHandler) {
+ handlers += handler
+ }
- def formatDuration(milliseconds: Long): String = {
- val seconds = milliseconds.toDouble / 1000
- if (seconds < 60) {
- return "%.0f s".format(seconds)
- }
- val minutes = seconds / 60
- if (minutes < 10) {
- return "%.1f min".format(minutes)
- } else if (minutes < 60) {
- return "%.0f min".format(minutes)
- }
- val hours = minutes / 60
- return "%.1f h".format(hours)
+ /** Return a list of listeners attached to this UI. */
+ def getListeners = tabs.flatMap(_.listener)
+
+ /** Return a list of handlers attached to this UI. */
+ def getHandlers = handlers.toSeq
+
+ /**
+ * Bind to the HTTP server behind this web interface.
+ * Overridden implementation should set serverInfo.
+ */
+ def bind()
+
+ /** Return the actual port to which this server is bound. Only valid after bind(). */
+ def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)
+
+ /** Stop the server behind this web interface. Only valid after bind(). */
+ def stop() {
+ assert(serverInfo.isDefined,
+ "Attempted to stop %s before binding to a server!".format(Utils.getFormattedClassName(this)))
+ serverInfo.get.server.stop()
}
}
+
+/**
+ * A tab that represents a collection of pages and a unit of listening for Spark events.
+ * Associating each tab with a listener is arbitrary and need not be the case.
+ */
+private[spark] abstract class UITab(val prefix: String) {
+ val pages = ArrayBuffer[UIPage]()
+ var listener: Option[SparkListener] = None
+
+ /** Attach a page to this tab. This prepends the page's prefix with the tab's own prefix. */
+ def attachPage(page: UIPage) {
+ page.prefix = (prefix + "/" + page.prefix).stripSuffix("/")
+ pages += page
+ }
+
+ def start()
+}
+
+/**
+ * A page that represents the leaf node in the UI hierarchy.
+ *
+ * If includeJson is true, the parent WebUI (direct or indirect) creates handlers for both the
+ * HTML and the JSON content, rather than just the former.
+ */
+private[spark] abstract class UIPage(var prefix: String, val includeJson: Boolean = false) {
+ def render(request: HttpServletRequest): Seq[Node] = Seq[Node]()
+ def renderJson(request: HttpServletRequest): JValue = JNothing
+}
diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala
new file mode 100644
index 0000000000000..dd4ea2a2332a2
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.env
+
+import org.apache.spark.scheduler._
+import org.apache.spark.ui._
+
+private[ui] class EnvironmentTab(parent: SparkUI) extends UITab("environment") {
+ val appName = parent.appName
+ val basePath = parent.basePath
+
+ def start() {
+ listener = Some(new EnvironmentListener)
+ attachPage(new IndexPage(this))
+ }
+
+ def environmentListener = {
+ assert(listener.isDefined, "EnvironmentTab has not started yet!")
+ listener.get.asInstanceOf[EnvironmentListener]
+ }
+}
+
+/**
+ * A SparkListener that prepares information to be displayed on the EnvironmentTab
+ */
+private[ui] class EnvironmentListener extends SparkListener {
+ var jvmInformation = Seq[(String, String)]()
+ var sparkProperties = Seq[(String, String)]()
+ var systemProperties = Seq[(String, String)]()
+ var classpathEntries = Seq[(String, String)]()
+
+ override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) {
+ synchronized {
+ val environmentDetails = environmentUpdate.environmentDetails
+ jvmInformation = environmentDetails("JVM Information")
+ sparkProperties = environmentDetails("Spark Properties")
+ systemProperties = environmentDetails("System Properties")
+ classpathEntries = environmentDetails("Classpath Entries")
+ }
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala b/core/src/main/scala/org/apache/spark/ui/env/IndexPage.scala
similarity index 62%
rename from core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
rename to core/src/main/scala/org/apache/spark/ui/env/IndexPage.scala
index 23e90c34d5b33..bf1872f18d54e 100644
--- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/env/IndexPage.scala
@@ -21,30 +21,15 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node
-import org.eclipse.jetty.servlet.ServletContextHandler
-
-import org.apache.spark.scheduler._
-import org.apache.spark.ui._
-import org.apache.spark.ui.JettyUtils._
+import org.apache.spark.ui.{UIUtils, UIPage}
import org.apache.spark.ui.Page.Environment
-private[ui] class EnvironmentUI(parent: SparkUI) {
+private[ui] class IndexPage(parent: EnvironmentTab) extends UIPage("") {
private val appName = parent.appName
private val basePath = parent.basePath
- private var _listener: Option[EnvironmentListener] = None
-
- lazy val listener = _listener.get
-
- def start() {
- _listener = Some(new EnvironmentListener)
- }
-
- def getHandlers = Seq[ServletContextHandler](
- createServletHandler("/environment",
- (request: HttpServletRequest) => render(request), parent.securityManager, basePath)
- )
+ private val listener = parent.environmentListener
- def render(request: HttpServletRequest): Seq[Node] = {
+ override def render(request: HttpServletRequest): Seq[Node] = {
val runtimeInformationTable = UIUtils.listingTable(
propertyHeader, jvmRow, listener.jvmInformation, fixedWidth = true)
val sparkPropertiesTable = UIUtils.listingTable(
@@ -70,23 +55,3 @@ private[ui] class EnvironmentUI(parent: SparkUI) {
private def propertyRow(kv: (String, String)) =
}
-
-/**
- * A SparkListener that prepares information to be displayed on the EnvironmentUI
- */
-private[ui] class EnvironmentListener extends SparkListener {
- var jvmInformation = Seq[(String, String)]()
- var sparkProperties = Seq[(String, String)]()
- var systemProperties = Seq[(String, String)]()
- var classpathEntries = Seq[(String, String)]()
-
- override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) {
- synchronized {
- val environmentDetails = environmentUpdate.environmentDetails
- jvmInformation = environmentDetails("JVM Information")
- sparkProperties = environmentDetails("Spark Properties")
- systemProperties = environmentDetails("System Properties")
- classpathEntries = environmentDetails("Classpath Entries")
- }
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
new file mode 100644
index 0000000000000..2b833c58c8e44
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.exec
+
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.ExceptionFailure
+import org.apache.spark.scheduler._
+import org.apache.spark.storage.StorageStatusListener
+import org.apache.spark.ui.{SparkUI, UITab}
+
+private[ui] class ExecutorsTab(parent: SparkUI) extends UITab("executors") {
+ val appName = parent.appName
+ val basePath = parent.basePath
+
+ def start() {
+ listener = Some(new ExecutorsListener(parent.storageStatusListener))
+ attachPage(new IndexPage(this))
+ }
+
+ def executorsListener = {
+ assert(listener.isDefined, "ExecutorsTab has not started yet!")
+ listener.get.asInstanceOf[ExecutorsListener]
+ }
+}
+
+/**
+ * A SparkListener that prepares information to be displayed on the ExecutorsTab
+ */
+private[ui] class ExecutorsListener(storageStatusListener: StorageStatusListener)
+ extends SparkListener {
+
+ val executorToTasksActive = HashMap[String, Int]()
+ val executorToTasksComplete = HashMap[String, Int]()
+ val executorToTasksFailed = HashMap[String, Int]()
+ val executorToDuration = HashMap[String, Long]()
+ val executorToShuffleRead = HashMap[String, Long]()
+ val executorToShuffleWrite = HashMap[String, Long]()
+
+ def storageStatusList = storageStatusListener.storageStatusList
+
+ override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
+ val eid = formatExecutorId(taskStart.taskInfo.executorId)
+ executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1
+ }
+
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
+ val info = taskEnd.taskInfo
+ if (info != null) {
+ val eid = formatExecutorId(info.executorId)
+ executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 1) - 1
+ executorToDuration(eid) = executorToDuration.getOrElse(eid, 0L) + info.duration
+ taskEnd.reason match {
+ case e: ExceptionFailure =>
+ executorToTasksFailed(eid) = executorToTasksFailed.getOrElse(eid, 0) + 1
+ case _ =>
+ executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
+ }
+
+ // Update shuffle read/write
+ val metrics = taskEnd.taskMetrics
+ if (metrics != null) {
+ metrics.shuffleReadMetrics.foreach { shuffleRead =>
+ executorToShuffleRead(eid) =
+ executorToShuffleRead.getOrElse(eid, 0L) + shuffleRead.remoteBytesRead
+ }
+ metrics.shuffleWriteMetrics.foreach { shuffleWrite =>
+ executorToShuffleWrite(eid) =
+ executorToShuffleWrite.getOrElse(eid, 0L) + shuffleWrite.shuffleBytesWritten
+ }
+ }
+ }
+ }
+
+ // This addresses executor ID inconsistencies in the local mode
+ private def formatExecutorId(execId: String) = storageStatusListener.formatExecutorId(execId)
+}
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/IndexPage.scala
similarity index 60%
rename from core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
rename to core/src/main/scala/org/apache/spark/ui/exec/IndexPage.scala
index 031ed88a493a8..fbbba2f63878f 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/IndexPage.scala
@@ -11,7 +11,7 @@
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+* See the License for the specific language governing permissions and
* limitations under the License.
*/
@@ -19,36 +19,18 @@ package org.apache.spark.ui.exec
import javax.servlet.http.HttpServletRequest
-import scala.collection.mutable.HashMap
import scala.xml.Node
-import org.eclipse.jetty.servlet.ServletContextHandler
-
-import org.apache.spark.ExceptionFailure
-import org.apache.spark.scheduler._
-import org.apache.spark.storage.StorageStatusListener
-import org.apache.spark.ui.JettyUtils._
+import org.apache.spark.ui.{UIPage, UIUtils}
import org.apache.spark.ui.Page.Executors
-import org.apache.spark.ui.{SparkUI, UIUtils}
import org.apache.spark.util.Utils
-private[ui] class ExecutorsUI(parent: SparkUI) {
+private[ui] class IndexPage(parent: ExecutorsTab) extends UIPage("") {
private val appName = parent.appName
private val basePath = parent.basePath
- private var _listener: Option[ExecutorsListener] = None
-
- lazy val listener = _listener.get
-
- def start() {
- _listener = Some(new ExecutorsListener(parent.storageStatusListener))
- }
-
- def getHandlers = Seq[ServletContextHandler](
- createServletHandler("/executors",
- (request: HttpServletRequest) => render(request), parent.securityManager, basePath)
- )
+ private val listener = parent.executorsListener
- def render(request: HttpServletRequest): Seq[Node] = {
+ override def render(request: HttpServletRequest): Seq[Node] = {
val storageStatusList = listener.storageStatusList
val maxMem = storageStatusList.map(_.maxMem).fold(0L)(_ + _)
val memUsed = storageStatusList.map(_.memUsed()).fold(0L)(_ + _)
@@ -68,11 +50,11 @@ private[ui] class ExecutorsUI(parent: SparkUI) {
-
-
- {execTable}
-
-
;
+
+
+ {execTable}
+
+
;
UIUtils.headerSparkPage(
content, basePath, appName, "Executors (" + execInfo.size + ")", Executors)
@@ -158,55 +140,3 @@ private[ui] class ExecutorsUI(parent: SparkUI) {
execFields.zip(execValues).toMap
}
}
-
-/**
- * A SparkListener that prepares information to be displayed on the ExecutorsUI
- */
-private[ui] class ExecutorsListener(storageStatusListener: StorageStatusListener)
- extends SparkListener {
-
- val executorToTasksActive = HashMap[String, Int]()
- val executorToTasksComplete = HashMap[String, Int]()
- val executorToTasksFailed = HashMap[String, Int]()
- val executorToDuration = HashMap[String, Long]()
- val executorToShuffleRead = HashMap[String, Long]()
- val executorToShuffleWrite = HashMap[String, Long]()
-
- def storageStatusList = storageStatusListener.storageStatusList
-
- override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
- val eid = formatExecutorId(taskStart.taskInfo.executorId)
- executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1
- }
-
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
- val info = taskEnd.taskInfo
- if (info != null) {
- val eid = formatExecutorId(info.executorId)
- executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 1) - 1
- executorToDuration(eid) = executorToDuration.getOrElse(eid, 0L) + info.duration
- taskEnd.reason match {
- case e: ExceptionFailure =>
- executorToTasksFailed(eid) = executorToTasksFailed.getOrElse(eid, 0) + 1
- case _ =>
- executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
- }
-
- // Update shuffle read/write
- val metrics = taskEnd.taskMetrics
- if (metrics != null) {
- metrics.shuffleReadMetrics.foreach { shuffleRead =>
- executorToShuffleRead(eid) =
- executorToShuffleRead.getOrElse(eid, 0L) + shuffleRead.remoteBytesRead
- }
- metrics.shuffleWriteMetrics.foreach { shuffleWrite =>
- executorToShuffleWrite(eid) =
- executorToShuffleWrite.getOrElse(eid, 0L) + shuffleWrite.shuffleBytesWritten
- }
- }
- }
- }
-
- // This addresses executor ID inconsistencies in the local mode
- private def formatExecutorId(execId: String) = storageStatusListener.formatExecutorId(execId)
-}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index 73861ae6746da..31173e48d7a1e 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -20,11 +20,12 @@ package org.apache.spark.ui.jobs
import scala.collection.mutable
import scala.xml.Node
+import org.apache.spark.ui.UIUtils
import org.apache.spark.util.Utils
/** Page showing executor summary */
-private[ui] class ExecutorTable(stageId: Int, parent: JobProgressUI) {
- private lazy val listener = parent.listener
+private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) {
+ private lazy val listener = parent.jobProgressListener
def toNodeSeq: Seq[Node] = {
listener.synchronized {
@@ -69,7 +70,7 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressUI) {
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
index 70d62b66a4829..c600e58af004d 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
@@ -23,23 +23,23 @@ import scala.xml.{Node, NodeSeq}
import org.apache.spark.scheduler.Schedulable
import org.apache.spark.ui.Page._
-import org.apache.spark.ui.UIUtils
+import org.apache.spark.ui.{UIPage, UIUtils}
/** Page showing list of all ongoing and recently finished stages and pools */
-private[ui] class IndexPage(parent: JobProgressUI) {
+private[ui] class IndexPage(parent: JobProgressTab) extends UIPage("") {
private val appName = parent.appName
private val basePath = parent.basePath
private val live = parent.live
private val sc = parent.sc
- private lazy val listener = parent.listener
+ private lazy val listener = parent.jobProgressListener
private lazy val isFairScheduler = parent.isFairScheduler
- def render(request: HttpServletRequest): Seq[Node] = {
+ override def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
val activeStages = listener.activeStages.values.toSeq
val completedStages = listener.completedStages.reverse.toSeq
val failedStages = listener.failedStages.reverse.toSeq
- val now = System.currentTimeMillis()
+ val now = System.currentTimeMillis
val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent)
val completedStagesTable =
@@ -57,7 +57,7 @@ private[ui] class IndexPage(parent: JobProgressUI) {
// Total duration is not meaningful unless the UI is live
Total Duration:
- {parent.formatDuration(now - sc.startTime)}
+ {UIUtils.formatDuration(now - sc.startTime)}
}}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala
similarity index 50%
rename from core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
rename to core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala
index b2c67381cc3da..c40b75d684510 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala
@@ -17,44 +17,29 @@
package org.apache.spark.ui.jobs
-import javax.servlet.http.HttpServletRequest
-
-import org.eclipse.jetty.servlet.ServletContextHandler
-
import org.apache.spark.SparkConf
import org.apache.spark.scheduler.SchedulingMode
-import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.Utils
+import org.apache.spark.ui.{SparkUI, UITab}
/** Web UI showing progress status of all jobs in the given SparkContext. */
-private[ui] class JobProgressUI(parent: SparkUI) {
+private[ui] class JobProgressTab(parent: SparkUI) extends UITab("stages") {
val appName = parent.appName
val basePath = parent.basePath
val live = parent.live
val sc = parent.sc
- lazy val listener = _listener.get
- lazy val isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR)
-
- private val indexPage = new IndexPage(this)
- private val stagePage = new StagePage(this)
- private val poolPage = new PoolPage(this)
- private var _listener: Option[JobProgressListener] = None
-
def start() {
val conf = if (live) sc.conf else new SparkConf
- _listener = Some(new JobProgressListener(conf))
+ listener = Some(new JobProgressListener(conf))
+ attachPage(new IndexPage(this))
+ attachPage(new StagePage(this))
+ attachPage(new PoolPage(this))
}
- def formatDuration(ms: Long) = Utils.msDurationToString(ms)
+ def jobProgressListener = {
+ assert(listener.isDefined, "JobProgressTab has not started yet!")
+ listener.get.asInstanceOf[JobProgressListener]
+ }
- def getHandlers = Seq[ServletContextHandler](
- createServletHandler("/stages/stage",
- (request: HttpServletRequest) => stagePage.render(request), parent.securityManager, basePath),
- createServletHandler("/stages/pool",
- (request: HttpServletRequest) => poolPage.render(request), parent.securityManager, basePath),
- createServletHandler("/stages",
- (request: HttpServletRequest) => indexPage.render(request), parent.securityManager, basePath)
- )
+ def isFairScheduler = jobProgressListener.schedulingMode.exists(_ == SchedulingMode.FAIR)
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
index bd33182b70059..53200ecdd4fee 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
@@ -23,17 +23,17 @@ import scala.xml.Node
import org.apache.spark.scheduler.{Schedulable, StageInfo}
import org.apache.spark.ui.Page._
-import org.apache.spark.ui.UIUtils
+import org.apache.spark.ui.{UIPage, UIUtils}
/** Page showing specific pool details */
-private[ui] class PoolPage(parent: JobProgressUI) {
+private[ui] class PoolPage(parent: JobProgressTab) extends UIPage("pool") {
private val appName = parent.appName
private val basePath = parent.basePath
private val live = parent.live
private val sc = parent.sc
- private lazy val listener = parent.listener
+ private lazy val listener = parent.jobProgressListener
- def render(request: HttpServletRequest): Seq[Node] = {
+ override def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
val poolName = request.getParameter("poolname")
val poolToActiveStages = listener.poolToActiveStages
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
index c5c8d8668740b..bb7a9c14f7761 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
@@ -24,10 +24,9 @@ import org.apache.spark.scheduler.{Schedulable, StageInfo}
import org.apache.spark.ui.UIUtils
/** Table showing list of pools */
-private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressUI) {
+private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressTab) {
private val basePath = parent.basePath
- private val poolToActiveStages = listener.poolToActiveStages
- private lazy val listener = parent.listener
+ private lazy val listener = parent.jobProgressListener
def toNodeSeq: Seq[Node] = {
listener.synchronized {
@@ -48,7 +47,7 @@ private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressUI) {
SchedulingMode
- {rows.map(r => makeRow(r, poolToActiveStages))}
+ {rows.map(r => makeRow(r, listener.poolToActiveStages))}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 0c55f2ee7e944..bd3d878b1567f 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -23,16 +23,16 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node
import org.apache.spark.ui.Page._
-import org.apache.spark.ui.{WebUI, UIUtils}
+import org.apache.spark.ui.{UIPage, UIUtils}
import org.apache.spark.util.{Utils, Distribution}
/** Page showing statistics and task list for a given stage */
-private[ui] class StagePage(parent: JobProgressUI) {
+private[ui] class StagePage(parent: JobProgressTab) extends UIPage("stage") {
private val appName = parent.appName
private val basePath = parent.basePath
- private lazy val listener = parent.listener
+ private lazy val listener = parent.jobProgressListener
- def render(request: HttpServletRequest): Seq[Node] = {
+ override def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
val stageId = request.getParameter("id").toInt
@@ -58,7 +58,7 @@ private[ui] class StagePage(parent: JobProgressUI) {
val hasBytesSpilled = memoryBytesSpilled > 0 && diskBytesSpilled > 0
var activeTime = 0L
- val now = System.currentTimeMillis()
+ val now = System.currentTimeMillis
val tasksActive = listener.stageIdToTasksActive(stageId).values
tasksActive.foreach(activeTime += _.timeRunning(now))
@@ -68,7 +68,7 @@ private[ui] class StagePage(parent: JobProgressUI) {
Total task time across all tasks:
- {parent.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)}
+ {UIUtils.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)}
{if (hasShuffleRead)
@@ -119,13 +119,13 @@ private[ui] class StagePage(parent: JobProgressUI) {
}
val serializationQuantiles =
"Result serialization time" +: Distribution(serializationTimes).
- get.getQuantiles().map(ms => parent.formatDuration(ms.toLong))
+ get.getQuantiles().map(ms => UIUtils.formatDuration(ms.toLong))
val serviceTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.executorRunTime.toDouble
}
val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles()
- .map(ms => parent.formatDuration(ms.toLong))
+ .map(ms => UIUtils.formatDuration(ms.toLong))
val gettingResultTimes = validTasks.map { case TaskUIData(info, _, _) =>
if (info.gettingResultTime > 0) {
@@ -136,7 +136,7 @@ private[ui] class StagePage(parent: JobProgressUI) {
}
val gettingResultQuantiles = "Time spent fetching task results" +:
Distribution(gettingResultTimes).get.getQuantiles().map { millis =>
- parent.formatDuration(millis.toLong)
+ UIUtils.formatDuration(millis.toLong)
}
// The scheduler delay includes the network delay to send the task to the worker
// machine and to send back the result (but not the time to fetch the task result,
@@ -153,7 +153,7 @@ private[ui] class StagePage(parent: JobProgressUI) {
}
val schedulerDelayQuantiles = "Scheduler delay" +:
Distribution(schedulerDelays).get.getQuantiles().map { millis =>
- parent.formatDuration(millis.toLong)
+ UIUtils.formatDuration(millis.toLong)
}
def getQuantileCols(data: Seq[Double]) =
@@ -217,8 +217,8 @@ private[ui] class StagePage(parent: JobProgressUI) {
taskData match { case TaskUIData(info, metrics, exception) =>
val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis())
else metrics.map(_.executorRunTime).getOrElse(1L)
- val formatDuration = if (info.status == "RUNNING") parent.formatDuration(duration)
- else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("")
+ val formatDuration = if (info.status == "RUNNING") UIUtils.formatDuration(duration)
+ else metrics.map(m => UIUtils.formatDuration(m.executorRunTime)).getOrElse("")
val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L)
val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L)
@@ -233,8 +233,8 @@ private[ui] class StagePage(parent: JobProgressUI) {
val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime)
val writeTimeSortable = maybeWriteTime.map(_.toString).getOrElse("")
- val writeTimeReadable = maybeWriteTime.map( t => t / (1000 * 1000)).map { ms =>
- if (ms == 0) "" else parent.formatDuration(ms)
+ val writeTimeReadable = maybeWriteTime.map(t => t / (1000 * 1000)).map { ms =>
+ if (ms == 0) "" else UIUtils.formatDuration(ms)
}.getOrElse("")
val maybeMemoryBytesSpilled = metrics.map(_.memoryBytesSpilled)
@@ -252,15 +252,15 @@ private[ui] class StagePage(parent: JobProgressUI) {