@@ -121,13 +118,13 @@ private[ui] class StagePage(parent: JobProgressUI) {
}
val serializationQuantiles =
"Result serialization time" +: Distribution(serializationTimes).
- get.getQuantiles().map(ms => parent.formatDuration(ms.toLong))
+ get.getQuantiles().map(ms => UIUtils.formatDuration(ms.toLong))
val serviceTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.executorRunTime.toDouble
}
val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles()
- .map(ms => parent.formatDuration(ms.toLong))
+ .map(ms => UIUtils.formatDuration(ms.toLong))
val gettingResultTimes = validTasks.map { case TaskUIData(info, _, _) =>
if (info.gettingResultTime > 0) {
@@ -138,7 +135,7 @@ private[ui] class StagePage(parent: JobProgressUI) {
}
val gettingResultQuantiles = "Time spent fetching task results" +:
Distribution(gettingResultTimes).get.getQuantiles().map { millis =>
- parent.formatDuration(millis.toLong)
+ UIUtils.formatDuration(millis.toLong)
}
// The scheduler delay includes the network delay to send the task to the worker
// machine and to send back the result (but not the time to fetch the task result,
@@ -155,7 +152,7 @@ private[ui] class StagePage(parent: JobProgressUI) {
}
val schedulerDelayQuantiles = "Scheduler delay" +:
Distribution(schedulerDelays).get.getQuantiles().map { millis =>
- parent.formatDuration(millis.toLong)
+ UIUtils.formatDuration(millis.toLong)
}
def getQuantileCols(data: Seq[Double]) =
@@ -206,8 +203,8 @@ private[ui] class StagePage(parent: JobProgressUI) {
Aggregated Metrics by Executor
++ executorTable.toNodeSeq ++
Tasks
++ taskTable
- UIUtils.headerSparkPage(
- content, basePath, appName, "Details for Stage %d".format(stageId), Stages)
+ UIUtils.headerSparkPage(content, basePath, appName, "Details for Stage %d".format(stageId),
+ parent.headerTabs, parent)
}
}
@@ -219,8 +216,8 @@ private[ui] class StagePage(parent: JobProgressUI) {
taskData match { case TaskUIData(info, metrics, exception) =>
val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis())
else metrics.map(_.executorRunTime).getOrElse(1L)
- val formatDuration = if (info.status == "RUNNING") parent.formatDuration(duration)
- else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("")
+ val formatDuration = if (info.status == "RUNNING") UIUtils.formatDuration(duration)
+ else metrics.map(m => UIUtils.formatDuration(m.executorRunTime)).getOrElse("")
val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L)
val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L)
@@ -235,8 +232,8 @@ private[ui] class StagePage(parent: JobProgressUI) {
val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime)
val writeTimeSortable = maybeWriteTime.map(_.toString).getOrElse("")
- val writeTimeReadable = maybeWriteTime.map( t => t / (1000 * 1000)).map { ms =>
- if (ms == 0) "" else parent.formatDuration(ms)
+ val writeTimeReadable = maybeWriteTime.map(t => t / (1000 * 1000)).map { ms =>
+ if (ms == 0) "" else UIUtils.formatDuration(ms)
}.getOrElse("")
val maybeMemoryBytesSpilled = metrics.map(_.memoryBytesSpilled)
@@ -254,15 +251,15 @@ private[ui] class StagePage(parent: JobProgressUI) {
{info.status} |
{info.taskLocality} |
{info.host} |
- {WebUI.formatDate(new Date(info.launchTime))} |
+ {UIUtils.formatDate(new Date(info.launchTime))} |
{formatDuration}
|
- {if (gcTime > 0) parent.formatDuration(gcTime) else ""}
+ {if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""}
|
- {if (serializationTime > 0) parent.formatDuration(serializationTime) else ""}
+ {if (serializationTime > 0) UIUtils.formatDuration(serializationTime) else ""}
|
{if (shuffleRead) {
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index e419fae5a6589..8c5b1f55fd2dc 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -23,17 +23,17 @@ import scala.collection.mutable.HashMap
import scala.xml.Node
import org.apache.spark.scheduler.{StageInfo, TaskInfo}
-import org.apache.spark.ui.{WebUI, UIUtils}
+import org.apache.spark.ui.UIUtils
import org.apache.spark.util.Utils
/** Page showing list of all ongoing and recently finished stages */
private[ui] class StageTable(
- stages: Seq[StageInfo],
- parent: JobProgressUI,
- killEnabled: Boolean = false) {
+ stages: Seq[StageInfo],
+ parent: JobProgressTab,
+ killEnabled: Boolean = false) {
private val basePath = parent.basePath
- private lazy val listener = parent.listener
+ private val listener = parent.listener
private lazy val isFairScheduler = parent.isFairScheduler
def toNodeSeq: Seq[Node] = {
@@ -89,25 +89,23 @@ private[ui] class StageTable(
{s.name}
- val description = listener.stageIdToDescription.get(s.stageId)
+ listener.stageIdToDescription.get(s.stageId)
.map(d => {d} {nameLink} {killLink} )
.getOrElse( {killLink}{nameLink} )
-
- return description
}
/** Render an HTML row that represents a stage */
private def stageRow(s: StageInfo): Seq[Node] = {
val poolName = listener.stageIdToPool.get(s.stageId)
val submissionTime = s.submissionTime match {
- case Some(t) => WebUI.formatDate(new Date(t))
+ case Some(t) => UIUtils.formatDate(new Date(t))
case None => "Unknown"
}
val finishTime = s.completionTime.getOrElse(System.currentTimeMillis)
val duration = s.submissionTime.map { t =>
if (finishTime > t) finishTime - t else System.currentTimeMillis - t
}
- val formattedDuration = duration.map(d => parent.formatDuration(d)).getOrElse("Unknown")
+ val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown")
val startedTasks =
listener.stageIdToTasksActive.getOrElse(s.stageId, HashMap[Long, TaskInfo]()).size
val completedTasks = listener.stageIdToTasksComplete.getOrElse(s.stageId, 0)
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
index 75ee9976d7b5f..d07f1c9b20fcf 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
@@ -22,23 +22,22 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node
import org.apache.spark.storage.{BlockId, BlockStatus, StorageStatus, StorageUtils}
-import org.apache.spark.ui.Page._
-import org.apache.spark.ui.UIUtils
+import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.util.Utils
/** Page showing storage details for a given RDD */
-private[ui] class RDDPage(parent: BlockManagerUI) {
+private[ui] class RddPage(parent: StorageTab) extends WebUIPage("rdd") {
+ private val appName = parent.appName
private val basePath = parent.basePath
- private lazy val listener = parent.listener
-
- private def appName = parent.appName
+ private val listener = parent.listener
def render(request: HttpServletRequest): Seq[Node] = {
val rddId = request.getParameter("id").toInt
val storageStatusList = listener.storageStatusList
val rddInfo = listener.rddInfoList.find(_.id == rddId).getOrElse {
// Rather than crashing, render an "RDD Not Found" page
- return UIUtils.headerSparkPage(Seq[Node](), basePath, appName, "RDD Not Found", Storage)
+ return UIUtils.headerSparkPage(Seq[Node](), basePath, appName, "RDD Not Found",
+ parent.headerTabs, parent)
}
// Worker table
@@ -96,8 +95,8 @@ private[ui] class RDDPage(parent: BlockManagerUI) {
;
- UIUtils.headerSparkPage(
- content, basePath, appName, "RDD Storage Info for " + rddInfo.name, Storage)
+ UIUtils.headerSparkPage(content, basePath, appName, "RDD Storage Info for " + rddInfo.name,
+ parent.headerTabs, parent)
}
/** Header fields for the worker table */
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
similarity index 90%
rename from core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
rename to core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
index 4f6acc30a88c4..b66edd91f56c0 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
@@ -22,22 +22,19 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node
import org.apache.spark.storage.RDDInfo
-import org.apache.spark.ui.Page._
-import org.apache.spark.ui.UIUtils
+import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.util.Utils
/** Page showing list of RDD's currently stored in the cluster */
-private[ui] class IndexPage(parent: BlockManagerUI) {
+private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
+ private val appName = parent.appName
private val basePath = parent.basePath
- private lazy val listener = parent.listener
-
- private def appName = parent.appName
+ private val listener = parent.listener
def render(request: HttpServletRequest): Seq[Node] = {
-
val rdds = listener.rddInfoList
val content = UIUtils.listingTable(rddHeader, rddRow, rdds)
- UIUtils.headerSparkPage(content, basePath, appName, "Storage ", Storage)
+ UIUtils.headerSparkPage(content, basePath, appName, "Storage ", parent.headerTabs, parent)
}
/** Header fields for the RDD table */
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
similarity index 75%
rename from core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
rename to core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
index 16996a2da1e72..56429f6c07fcd 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
@@ -17,45 +17,27 @@
package org.apache.spark.ui.storage
-import javax.servlet.http.HttpServletRequest
-
import scala.collection.mutable
-import org.eclipse.jetty.servlet.ServletContextHandler
-
import org.apache.spark.ui._
-import org.apache.spark.ui.JettyUtils._
import org.apache.spark.scheduler._
import org.apache.spark.storage.{RDDInfo, StorageStatusListener, StorageUtils}
/** Web UI showing storage status of all RDD's in the given SparkContext. */
-private[ui] class BlockManagerUI(parent: SparkUI) {
+private[ui] class StorageTab(parent: SparkUI) extends WebUITab(parent, "storage") {
+ val appName = parent.appName
val basePath = parent.basePath
+ val listener = new StorageListener(parent.storageStatusListener)
- private val indexPage = new IndexPage(this)
- private val rddPage = new RDDPage(this)
- private var _listener: Option[BlockManagerListener] = None
-
- lazy val listener = _listener.get
-
- def appName = parent.appName
-
- def start() {
- _listener = Some(new BlockManagerListener(parent.storageStatusListener))
- }
-
- def getHandlers = Seq[ServletContextHandler](
- createServletHandler("/storage/rdd",
- (request: HttpServletRequest) => rddPage.render(request), parent.securityManager, basePath),
- createServletHandler("/storage",
- (request: HttpServletRequest) => indexPage.render(request), parent.securityManager, basePath)
- )
+ attachPage(new StoragePage(this))
+ attachPage(new RddPage(this))
+ parent.registerListener(listener)
}
/**
* A SparkListener that prepares information to be displayed on the BlockManagerUI
*/
-private[ui] class BlockManagerListener(storageStatusListener: StorageStatusListener)
+private[ui] class StorageListener(storageStatusListener: StorageStatusListener)
extends SparkListener {
private val _rddInfoMap = mutable.Map[Int, RDDInfo]()
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index f2396f7c80a35..465835ea7fe29 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -88,30 +88,27 @@ private[spark] object JsonProtocol {
def taskStartToJson(taskStart: SparkListenerTaskStart): JValue = {
val taskInfo = taskStart.taskInfo
- val taskInfoJson = if (taskInfo != null) taskInfoToJson(taskInfo) else JNothing
("Event" -> Utils.getFormattedClassName(taskStart)) ~
("Stage ID" -> taskStart.stageId) ~
- ("Task Info" -> taskInfoJson)
+ ("Task Info" -> taskInfoToJson(taskInfo))
}
def taskGettingResultToJson(taskGettingResult: SparkListenerTaskGettingResult): JValue = {
val taskInfo = taskGettingResult.taskInfo
- val taskInfoJson = if (taskInfo != null) taskInfoToJson(taskInfo) else JNothing
("Event" -> Utils.getFormattedClassName(taskGettingResult)) ~
- ("Task Info" -> taskInfoJson)
+ ("Task Info" -> taskInfoToJson(taskInfo))
}
def taskEndToJson(taskEnd: SparkListenerTaskEnd): JValue = {
val taskEndReason = taskEndReasonToJson(taskEnd.reason)
val taskInfo = taskEnd.taskInfo
- val taskInfoJson = if (taskInfo != null) taskInfoToJson(taskInfo) else JNothing
val taskMetrics = taskEnd.taskMetrics
val taskMetricsJson = if (taskMetrics != null) taskMetricsToJson(taskMetrics) else JNothing
("Event" -> Utils.getFormattedClassName(taskEnd)) ~
("Stage ID" -> taskEnd.stageId) ~
("Task Type" -> taskEnd.taskType) ~
("Task End Reason" -> taskEndReason) ~
- ("Task Info" -> taskInfoJson) ~
+ ("Task Info" -> taskInfoToJson(taskInfo)) ~
("Task Metrics" -> taskMetricsJson)
}
@@ -505,6 +502,9 @@ private[spark] object JsonProtocol {
}
def taskMetricsFromJson(json: JValue): TaskMetrics = {
+ if (json == JNothing) {
+ return TaskMetrics.empty
+ }
val metrics = new TaskMetrics
metrics.hostname = (json \ "Host Name").extract[String]
metrics.executorDeserializeTime = (json \ "Executor Deserialize Time").extract[Long]
diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
index 2f9739f940dc6..b85c483ca2a08 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -18,16 +18,81 @@
package org.apache.spark.ui
import java.net.ServerSocket
+import javax.servlet.http.HttpServletRequest
+import scala.io.Source
import scala.util.{Failure, Success, Try}
import org.eclipse.jetty.server.Server
import org.eclipse.jetty.servlet.ServletContextHandler
import org.scalatest.FunSuite
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkContext, SparkConf}
+import org.apache.spark.LocalSparkContext._
+import scala.xml.Node
class UISuite extends FunSuite {
+
+ test("basic ui visibility") {
+ withSpark(new SparkContext("local", "test")) { sc =>
+ // test if the ui is visible, and all the expected tabs are visible
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ val html = Source.fromURL(sc.ui.appUIAddress).mkString
+ assert(!html.contains("random data that should not be present"))
+ assert(html.toLowerCase.contains("stages"))
+ assert(html.toLowerCase.contains("storage"))
+ assert(html.toLowerCase.contains("environment"))
+ assert(html.toLowerCase.contains("executors"))
+ }
+ }
+ }
+
+ test("visibility at localhost:4040") {
+ withSpark(new SparkContext("local", "test")) { sc =>
+ // test if visible from http://localhost:4040
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ val html = Source.fromURL("http://localhost:4040").mkString
+ assert(html.toLowerCase.contains("stages"))
+ }
+ }
+ }
+
+ test("attaching a new tab") {
+ withSpark(new SparkContext("local", "test")) { sc =>
+ val sparkUI = sc.ui
+
+ val newTab = new WebUITab(sparkUI, "foo") {
+ attachPage(new WebUIPage("") {
+ def render(request: HttpServletRequest): Seq[Node] = {
+ "html magic"
+ }
+ })
+ }
+ sparkUI.attachTab(newTab)
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ val html = Source.fromURL(sc.ui.appUIAddress).mkString
+ assert(!html.contains("random data that should not be present"))
+
+ // check whether new page exists
+ assert(html.toLowerCase.contains("foo"))
+
+ // check whether other pages still exist
+ assert(html.toLowerCase.contains("stages"))
+ assert(html.toLowerCase.contains("storage"))
+ assert(html.toLowerCase.contains("environment"))
+ assert(html.toLowerCase.contains("executors"))
+ }
+
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ val html = Source.fromURL(sc.ui.appUIAddress.stripSuffix("/") + "/foo").mkString
+ // check whether new page exists
+ assert(html.contains("magic"))
+ }
+ }
+ }
+
test("jetty port increases under contention") {
val startPort = 4040
val server = new Server(startPort)
@@ -60,4 +125,18 @@ class UISuite extends FunSuite {
case Failure(e) =>
}
}
+
+ test("verify appUIAddress contains the scheme") {
+ withSpark(new SparkContext("local", "test")) { sc =>
+ val uiAddress = sc.ui.appUIAddress
+ assert(uiAddress.equals("http://" + sc.ui.appUIHostPort))
+ }
+ }
+
+ test("verify appUIAddress contains the port") {
+ withSpark(new SparkContext("local", "test")) { sc =>
+ val splitUIAddress = sc.ui.appUIAddress.split(':')
+ assert(splitUIAddress(2).toInt == sc.ui.boundPort)
+ }
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index f75297a02dc8b..16470bb7bf60d 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -523,8 +523,8 @@ class JsonProtocolSuite extends FunSuite {
700,"Fetch Wait Time":900,"Remote Bytes Read":1000},"Shuffle Write Metrics":
{"Shuffle Bytes Written":1200,"Shuffle Write Time":1500},"Updated Blocks":
[{"Block ID":{"Type":"RDDBlockId","RDD ID":0,"Split Index":0},"Status":
- {"Storage Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":false,
- "Replication":2},"Memory Size":0,"Disk Size":0,"Tachyon Size":0}}]}}
+ {"Storage Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false,
+ "Deserialized":false,"Replication":2},"Memory Size":0,"Disk Size":0,"Tachyon Size":0}}]}}
"""
private val jobStartJsonString =
diff --git a/project/MimaBuild.scala b/project/MimaBuild.scala
index 5ea4817bfde18..9cb31d70444ff 100644
--- a/project/MimaBuild.scala
+++ b/project/MimaBuild.scala
@@ -60,6 +60,7 @@ object MimaBuild {
Seq(
excludePackage("org.apache.spark.api.java"),
excludePackage("org.apache.spark.streaming.api.java"),
+ excludePackage("org.apache.spark.streaming.scheduler"),
excludePackage("org.apache.spark.mllib")
) ++
excludeSparkClass("rdd.ClassTags") ++
@@ -70,7 +71,12 @@ object MimaBuild {
excludeSparkClass("mllib.regression.LassoWithSGD") ++
excludeSparkClass("mllib.regression.LinearRegressionWithSGD") ++
excludeSparkClass("streaming.dstream.NetworkReceiver") ++
- excludeSparkClass("streaming.dstream.NetworkReceiver#NetworkReceiverActor")
+ excludeSparkClass("streaming.dstream.NetworkReceiver#NetworkReceiverActor") ++
+ excludeSparkClass("streaming.dstream.NetworkReceiver#BlockGenerator") ++
+ excludeSparkClass("streaming.dstream.NetworkReceiver#BlockGenerator#Block") ++
+ excludeSparkClass("streaming.dstream.ReportError") ++
+ excludeSparkClass("streaming.dstream.ReportBlock") ++
+ excludeSparkClass("streaming.dstream.DStream")
case _ => Seq()
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index a4e236c65ff86..ff5d0aaa3d0bd 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -17,29 +17,28 @@
package org.apache.spark.streaming
-import scala.collection.mutable.Queue
-import scala.collection.Map
-import scala.reflect.ClassTag
-
import java.io.InputStream
import java.util.concurrent.atomic.AtomicInteger
-import akka.actor.Props
-import akka.actor.SupervisorStrategy
-import org.apache.hadoop.io.LongWritable
-import org.apache.hadoop.io.Text
+import scala.collection.Map
+import scala.collection.mutable.Queue
+import scala.reflect.ClassTag
+
+import akka.actor.{Props, SupervisorStrategy}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
-import org.apache.hadoop.fs.Path
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.MetadataCleaner
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receivers._
import org.apache.spark.streaming.scheduler._
-import org.apache.hadoop.conf.Configuration
+import org.apache.spark.streaming.ui.StreamingTab
+import org.apache.spark.util.MetadataCleaner
/**
* Main entry point for Spark Streaming functionality. It provides methods used to create
@@ -158,6 +157,8 @@ class StreamingContext private[streaming] (
private[streaming] val waiter = new ContextWaiter
+ private[streaming] val uiTab = new StreamingTab(this)
+
/** Enumeration to identify current state of the StreamingContext */
private[streaming] object StreamingContextState extends Enumeration {
type CheckpointState = Value
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index d043200f71a0b..a7e5215437e54 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -353,15 +353,6 @@ abstract class DStream[T: ClassTag] (
dependencies.foreach(_.clearMetadata(time))
}
- /* Adds metadata to the Stream while it is running.
- * This method should be overwritten by sublcasses of InputDStream.
- */
- private[streaming] def addMetadata(metadata: Any) {
- if (metadata != null) {
- logInfo("Dropping Metadata: " + metadata.toString)
- }
- }
-
/**
* Refresh the list of checkpointed RDDs that will be saved along with checkpoint of
* this stream. This is an internal method that should not be called directly. This is
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
index d19a635fe8eca..5a249706b4d2f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -17,24 +17,23 @@
package org.apache.spark.streaming.dstream
-import java.util.concurrent.{TimeUnit, ArrayBlockingQueue}
import java.nio.ByteBuffer
+import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.concurrent.Await
-import scala.concurrent.duration._
import scala.reflect.ClassTag
-import akka.actor.{Props, Actor}
+import akka.actor.{Actor, Props}
import akka.pattern.ask
-import org.apache.spark.streaming.util.{RecurringTimer, SystemClock}
-import org.apache.spark.streaming._
import org.apache.spark.{Logging, SparkEnv}
-import org.apache.spark.rdd.{RDD, BlockRDD}
+import org.apache.spark.rdd.{BlockRDD, RDD}
import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId}
-import org.apache.spark.streaming.scheduler.{DeregisterReceiver, AddBlocks, RegisterReceiver}
-import org.apache.spark.util.AkkaUtils
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.scheduler.{AddBlock, DeregisterReceiver, ReceivedBlockInfo, RegisterReceiver}
+import org.apache.spark.streaming.util.{RecurringTimer, SystemClock}
+import org.apache.spark.util.{AkkaUtils, Utils}
/**
* Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
@@ -49,8 +48,10 @@ import org.apache.spark.util.AkkaUtils
abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingContext)
extends InputDStream[T](ssc_) {
- // This is an unique identifier that is used to match the network receiver with the
- // corresponding network input stream.
+ /** Keeps all received blocks information */
+ private lazy val receivedBlockInfo = new HashMap[Time, Array[ReceivedBlockInfo]]
+
+ /** This is an unique identifier for the network input stream. */
val id = ssc.getNewNetworkStreamId()
/**
@@ -65,25 +66,44 @@ abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingConte
def stop() {}
+ /** Ask NetworkInputTracker for received data blocks and generates RDDs with them. */
override def compute(validTime: Time): Option[RDD[T]] = {
// If this is called for any time before the start time of the context,
// then this returns an empty RDD. This may happen when recovering from a
// master failure
if (validTime >= graph.startTime) {
- val blockIds = ssc.scheduler.networkInputTracker.getBlocks(id, validTime)
+ val blockInfo = ssc.scheduler.networkInputTracker.getReceivedBlockInfo(id)
+ receivedBlockInfo(validTime) = blockInfo
+ val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
Some(new BlockRDD[T](ssc.sc, blockIds))
} else {
Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
}
}
+
+ /** Get information on received blocks. */
+ private[streaming] def getReceivedBlockInfo(time: Time) = {
+ receivedBlockInfo(time)
+ }
+
+ /**
+ * Clear metadata that are older than `rememberDuration` of this DStream.
+ * This is an internal method that should not be called directly. This
+ * implementation overrides the default implementation to clear received
+ * block information.
+ */
+ private[streaming] override def clearMetadata(time: Time) {
+ super.clearMetadata(time)
+ val oldReceivedBlocks = receivedBlockInfo.filter(_._1 <= (time - rememberDuration))
+ receivedBlockInfo --= oldReceivedBlocks.keys
+ logDebug("Cleared " + oldReceivedBlocks.size + " RDDs that were older than " +
+ (time - rememberDuration) + ": " + oldReceivedBlocks.keys.mkString(", "))
+ }
}
private[streaming] sealed trait NetworkReceiverMessage
-private[streaming] case class StopReceiver() extends NetworkReceiverMessage
-private[streaming] case class ReportBlock(blockId: BlockId, metadata: Any)
- extends NetworkReceiverMessage
-private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage
+private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverMessage
/**
* Abstract class of a receiver that can be run on worker nodes to receive external data. See
@@ -177,6 +197,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
case (e, i) => "Exception " + i + ": " + e.getMessage + "\n" + e.getStackTraceString
}.mkString("\n")
}
+
logInfo("Deregistering receiver " + streamId)
val future = trackerActor.ask(DeregisterReceiver(streamId, message))(askTimeout)
Await.result(future, askTimeout)
@@ -209,18 +230,28 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
/**
* Push a block (as an ArrayBuffer filled with data) into the block manager.
*/
- def pushBlock(blockId: BlockId, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) {
+ def pushBlock(
+ blockId: StreamBlockId,
+ arrayBuffer: ArrayBuffer[T],
+ metadata: Any,
+ level: StorageLevel
+ ) {
env.blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], level)
- trackerActor ! AddBlocks(streamId, Array(blockId), metadata)
+ trackerActor ! AddBlock(ReceivedBlockInfo(streamId, blockId, arrayBuffer.size, metadata))
logDebug("Pushed block " + blockId)
}
/**
* Push a block (as bytes) into the block manager.
*/
- def pushBlock(blockId: BlockId, bytes: ByteBuffer, metadata: Any, level: StorageLevel) {
+ def pushBlock(
+ blockId: StreamBlockId,
+ bytes: ByteBuffer,
+ metadata: Any,
+ level: StorageLevel
+ ) {
env.blockManager.putBytes(blockId, bytes, level)
- trackerActor ! AddBlocks(streamId, Array(blockId), metadata)
+ trackerActor ! AddBlock(ReceivedBlockInfo(streamId, blockId, -1, metadata))
}
/** Set the ID of the DStream that this receiver is associated with */
@@ -232,9 +263,11 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
private class NetworkReceiverActor extends Actor {
override def preStart() {
- logInfo("Registered receiver " + streamId)
- val future = trackerActor.ask(RegisterReceiver(streamId, self))(askTimeout)
+ val msg = RegisterReceiver(
+ streamId, NetworkReceiver.this.getClass.getSimpleName, Utils.localHostName(), self)
+ val future = trackerActor.ask(msg)(askTimeout)
Await.result(future, askTimeout)
+ logInfo("Registered receiver " + streamId)
}
override def receive() = {
@@ -253,7 +286,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
class BlockGenerator(storageLevel: StorageLevel)
extends Serializable with Logging {
- case class Block(id: BlockId, buffer: ArrayBuffer[T], metadata: Any = null)
+ case class Block(id: StreamBlockId, buffer: ArrayBuffer[T], metadata: Any = null)
val clock = new SystemClock()
val blockInterval = env.conf.getLong("spark.streaming.blockInterval", 200)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
index 7f3cd2f8eb1fd..9c69a2a4e21f5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
@@ -29,6 +29,7 @@ import org.apache.spark.streaming.Time
*/
case class BatchInfo(
batchTime: Time,
+ receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]],
submissionTime: Long,
processingStartTime: Option[Long],
processingEndTime: Option[Long]
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 92d885c4bc5a5..e564eccba2df5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -201,7 +201,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " +
timesToReschedule.mkString(", "))
timesToReschedule.foreach(time =>
- jobScheduler.runJobs(time, graph.generateJobs(time))
+ jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time)))
)
// Restart the timer
@@ -214,7 +214,12 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
SparkEnv.set(ssc.env)
Try(graph.generateJobs(time)) match {
case Success(jobs) =>
- jobScheduler.runJobs(time, jobs)
+ val receivedBlockInfo = graph.getNetworkInputStreams.map { stream =>
+ val streamId = stream.id
+ val receivedBlockInfo = stream.getReceivedBlockInfo(time)
+ (streamId, receivedBlockInfo)
+ }.toMap
+ jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 04e0a6a283cfb..d9ada99b472ac 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -100,14 +100,13 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
logInfo("Stopped JobScheduler")
}
- def runJobs(time: Time, jobs: Seq[Job]) {
- if (jobs.isEmpty) {
- logInfo("No jobs added for time " + time)
+ def submitJobSet(jobSet: JobSet) {
+ if (jobSet.jobs.isEmpty) {
+ logInfo("No jobs added for time " + jobSet.time)
} else {
- val jobSet = new JobSet(time, jobs)
- jobSets.put(time, jobSet)
+ jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
- logInfo("Added jobs for time " + time)
+ logInfo("Added jobs for time " + jobSet.time)
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
index fcf303aee6cd7..a69d74362173e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
@@ -24,7 +24,11 @@ import org.apache.spark.streaming.Time
* belong to the same batch.
*/
private[streaming]
-case class JobSet(time: Time, jobs: Seq[Job]) {
+case class JobSet(
+ time: Time,
+ jobs: Seq[Job],
+ receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]] = Map.empty
+ ) {
private val incompleteJobs = new HashSet[Job]()
private val submissionTime = System.currentTimeMillis() // when this jobset was submitted
@@ -60,6 +64,7 @@ case class JobSet(time: Time, jobs: Seq[Job]) {
def toBatchInfo: BatchInfo = {
new BatchInfo(
time,
+ receivedBlockInfo,
submissionTime,
if (processingStartTime >= 0 ) Some(processingStartTime) else None,
if (processingEndTime >= 0 ) Some(processingEndTime) else None
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
index 067e804202236..a1e6f5176825a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
@@ -17,20 +17,42 @@
package org.apache.spark.streaming.scheduler
-import scala.collection.mutable.{HashMap, Queue, SynchronizedMap}
+import scala.collection.mutable.{HashMap, SynchronizedMap, SynchronizedQueue}
import akka.actor._
+
import org.apache.spark.{Logging, SparkEnv, SparkException}
import org.apache.spark.SparkContext._
-import org.apache.spark.storage.BlockId
+import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.dstream.{NetworkReceiver, StopReceiver}
import org.apache.spark.util.AkkaUtils
+/** Information about receiver */
+case class ReceiverInfo(streamId: Int, typ: String, location: String) {
+ override def toString = s"$typ-$streamId"
+}
+
+/** Information about blocks received by the network receiver */
+case class ReceivedBlockInfo(
+ streamId: Int,
+ blockId: StreamBlockId,
+ numRecords: Long,
+ metadata: Any
+ )
+
+/**
+ * Messages used by the NetworkReceiver and the NetworkInputTracker to communicate
+ * with each other.
+ */
private[streaming] sealed trait NetworkInputTrackerMessage
-private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef)
- extends NetworkInputTrackerMessage
-private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[BlockId], metadata: Any)
+private[streaming] case class RegisterReceiver(
+ streamId: Int,
+ typ: String,
+ host: String,
+ receiverActor: ActorRef
+ ) extends NetworkInputTrackerMessage
+private[streaming] case class AddBlock(receivedBlockInfo: ReceivedBlockInfo)
extends NetworkInputTrackerMessage
private[streaming] case class DeregisterReceiver(streamId: Int, msg: String)
extends NetworkInputTrackerMessage
@@ -47,9 +69,10 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
val networkInputStreamMap = Map(networkInputStreams.map(x => (x.id, x)): _*)
val receiverExecutor = new ReceiverExecutor()
val receiverInfo = new HashMap[Int, ActorRef] with SynchronizedMap[Int, ActorRef]
- val receivedBlockIds = new HashMap[Int, Queue[BlockId]] with SynchronizedMap[Int, Queue[BlockId]]
+ val receivedBlockInfo = new HashMap[Int, SynchronizedQueue[ReceivedBlockInfo]]
+ with SynchronizedMap[Int, SynchronizedQueue[ReceivedBlockInfo]]
val timeout = AkkaUtils.askTimeout(ssc.conf)
-
+ val listenerBus = ssc.scheduler.listenerBus
// actor is created when generator starts.
// This not being null means the tracker has been started and not stopped
@@ -83,12 +106,32 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
}
}
+ /** Return all the blocks received from a receiver. */
+ def getReceivedBlockInfo(streamId: Int): Array[ReceivedBlockInfo] = {
+ val receivedBlockInfo = getReceivedBlockInfoQueue(streamId).dequeueAll(x => true)
+ logInfo("Stream " + streamId + " received " + receivedBlockInfo.size + " blocks")
+ receivedBlockInfo.toArray
+ }
+
+ private def getReceivedBlockInfoQueue(streamId: Int) = {
+ receivedBlockInfo.getOrElseUpdate(streamId, new SynchronizedQueue[ReceivedBlockInfo])
+ }
+
/** Register a receiver */
- def registerReceiver(streamId: Int, receiverActor: ActorRef, sender: ActorRef) {
+ def registerReceiver(
+ streamId: Int,
+ typ: String,
+ host: String,
+ receiverActor: ActorRef,
+ sender: ActorRef
+ ) {
if (!networkInputStreamMap.contains(streamId)) {
throw new Exception("Register received for unexpected id " + streamId)
}
receiverInfo += ((streamId, receiverActor))
+ ssc.scheduler.listenerBus.post(StreamingListenerReceiverStarted(
+ ReceiverInfo(streamId, typ, host)
+ ))
logInfo("Registered receiver for network stream " + streamId + " from " + sender.path.address)
}
@@ -98,35 +141,26 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
logError("Deregistered receiver for network stream " + streamId + " with message:\n" + message)
}
- /** Get all the received blocks for the given stream. */
- def getBlocks(streamId: Int, time: Time): Array[BlockId] = {
- val queue = receivedBlockIds.getOrElseUpdate(streamId, new Queue[BlockId]())
- val result = queue.dequeueAll(x => true).toArray
- logInfo("Stream " + streamId + " received " + result.size + " blocks")
- result
- }
-
/** Add new blocks for the given stream */
- def addBlocks(streamId: Int, blockIds: Seq[BlockId], metadata: Any) = {
- val queue = receivedBlockIds.getOrElseUpdate(streamId, new Queue[BlockId])
- queue ++= blockIds
- networkInputStreamMap(streamId).addMetadata(metadata)
- logDebug("Stream " + streamId + " received new blocks: " + blockIds.mkString("[", ", ", "]"))
+ def addBlocks(receivedBlockInfo: ReceivedBlockInfo) {
+ getReceivedBlockInfoQueue(receivedBlockInfo.streamId) += receivedBlockInfo
+ logDebug("Stream " + receivedBlockInfo.streamId + " received new blocks: " +
+ receivedBlockInfo.blockId)
}
/** Check if any blocks are left to be processed */
def hasMoreReceivedBlockIds: Boolean = {
- !receivedBlockIds.forall(_._2.isEmpty)
+ !receivedBlockInfo.values.forall(_.isEmpty)
}
/** Actor to receive messages from the receivers. */
private class NetworkInputTrackerActor extends Actor {
def receive = {
- case RegisterReceiver(streamId, receiverActor) =>
- registerReceiver(streamId, receiverActor, sender)
+ case RegisterReceiver(streamId, typ, host, receiverActor) =>
+ registerReceiver(streamId, typ, host, receiverActor, sender)
sender ! true
- case AddBlocks(streamId, blockIds, metadata) =>
- addBlocks(streamId, blockIds, metadata)
+ case AddBlock(receivedBlockInfo) =>
+ addBlocks(receivedBlockInfo)
case DeregisterReceiver(streamId, message) =>
deregisterReceiver(streamId, message)
sender ! true
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
index 461ea3506477f..5db40ebbeb1de 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
@@ -23,8 +23,11 @@ import org.apache.spark.util.Distribution
/** Base trait for events related to StreamingListener */
sealed trait StreamingListenerEvent
+case class StreamingListenerBatchSubmitted(batchInfo: BatchInfo) extends StreamingListenerEvent
case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent
case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent
+case class StreamingListenerReceiverStarted(receiverInfo: ReceiverInfo)
+ extends StreamingListenerEvent
/** An event used in the listener to shutdown the listener daemon thread. */
private[scheduler] case object StreamingListenerShutdown extends StreamingListenerEvent
@@ -34,14 +37,17 @@ private[scheduler] case object StreamingListenerShutdown extends StreamingListen
* computation.
*/
trait StreamingListener {
- /**
- * Called when processing of a batch has completed
- */
+
+ /** Called when a receiver has been started */
+ def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { }
+
+ /** Called when a batch of jobs has been submitted for processing. */
+ def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { }
+
+ /** Called when processing of a batch of jobs has completed. */
def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }
- /**
- * Called when processing of a batch has started
- */
+ /** Called when processing of a batch of jobs has started. */
def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
index 18811fc2b01d8..ea03dfc7bfeea 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
@@ -38,6 +38,10 @@ private[spark] class StreamingListenerBus() extends Logging {
while (true) {
val event = eventQueue.take
event match {
+ case receiverStarted: StreamingListenerReceiverStarted =>
+ listeners.foreach(_.onReceiverStarted(receiverStarted))
+ case batchSubmitted: StreamingListenerBatchSubmitted =>
+ listeners.foreach(_.onBatchSubmitted(batchSubmitted))
case batchStarted: StreamingListenerBatchStarted =>
listeners.foreach(_.onBatchStarted(batchStarted))
case batchCompleted: StreamingListenerBatchCompleted =>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
new file mode 100644
index 0000000000000..8b025b09ed34d
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.ui
+
+import org.apache.spark.streaming.{Time, StreamingContext}
+import org.apache.spark.streaming.scheduler._
+import scala.collection.mutable.{Queue, HashMap}
+import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted
+import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted
+import org.apache.spark.streaming.scheduler.BatchInfo
+import org.apache.spark.streaming.scheduler.ReceiverInfo
+import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted
+import org.apache.spark.util.Distribution
+
+
+private[ui] class StreamingJobProgressListener(ssc: StreamingContext) extends StreamingListener {
+
+ private val waitingBatchInfos = new HashMap[Time, BatchInfo]
+ private val runningBatchInfos = new HashMap[Time, BatchInfo]
+ private val completedaBatchInfos = new Queue[BatchInfo]
+ private val batchInfoLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
+ private var totalCompletedBatches = 0L
+ private val receiverInfos = new HashMap[Int, ReceiverInfo]
+
+ val batchDuration = ssc.graph.batchDuration.milliseconds
+
+ override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) = {
+ synchronized {
+ receiverInfos.put(receiverStarted.receiverInfo.streamId, receiverStarted.receiverInfo)
+ }
+ }
+
+ override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) = synchronized {
+ runningBatchInfos(batchSubmitted.batchInfo.batchTime) = batchSubmitted.batchInfo
+ }
+
+ override def onBatchStarted(batchStarted: StreamingListenerBatchStarted) = synchronized {
+ runningBatchInfos(batchStarted.batchInfo.batchTime) = batchStarted.batchInfo
+ waitingBatchInfos.remove(batchStarted.batchInfo.batchTime)
+ }
+
+ override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) = synchronized {
+ waitingBatchInfos.remove(batchCompleted.batchInfo.batchTime)
+ runningBatchInfos.remove(batchCompleted.batchInfo.batchTime)
+ completedaBatchInfos.enqueue(batchCompleted.batchInfo)
+ if (completedaBatchInfos.size > batchInfoLimit) completedaBatchInfos.dequeue()
+ totalCompletedBatches += 1L
+ }
+
+ def numNetworkReceivers = synchronized {
+ ssc.graph.getNetworkInputStreams().size
+ }
+
+ def numTotalCompletedBatches: Long = synchronized {
+ totalCompletedBatches
+ }
+
+ def numUnprocessedBatches: Long = synchronized {
+ waitingBatchInfos.size + runningBatchInfos.size
+ }
+
+ def waitingBatches: Seq[BatchInfo] = synchronized {
+ waitingBatchInfos.values.toSeq
+ }
+
+ def runningBatches: Seq[BatchInfo] = synchronized {
+ runningBatchInfos.values.toSeq
+ }
+
+ def retainedCompletedBatches: Seq[BatchInfo] = synchronized {
+ completedaBatchInfos.toSeq
+ }
+
+ def processingDelayDistribution: Option[Distribution] = synchronized {
+ extractDistribution(_.processingDelay)
+ }
+
+ def schedulingDelayDistribution: Option[Distribution] = synchronized {
+ extractDistribution(_.schedulingDelay)
+ }
+
+ def totalDelayDistribution: Option[Distribution] = synchronized {
+ extractDistribution(_.totalDelay)
+ }
+
+ def receivedRecordsDistributions: Map[Int, Option[Distribution]] = synchronized {
+ val latestBatchInfos = retainedBatches.reverse.take(batchInfoLimit)
+ val latestBlockInfos = latestBatchInfos.map(_.receivedBlockInfo)
+ (0 until numNetworkReceivers).map { receiverId =>
+ val blockInfoOfParticularReceiver = latestBlockInfos.map { batchInfo =>
+ batchInfo.get(receiverId).getOrElse(Array.empty)
+ }
+ val recordsOfParticularReceiver = blockInfoOfParticularReceiver.map { blockInfo =>
+ // calculate records per second for each batch
+ blockInfo.map(_.numRecords).sum.toDouble * 1000 / batchDuration
+ }
+ val distributionOption = Distribution(recordsOfParticularReceiver)
+ (receiverId, distributionOption)
+ }.toMap
+ }
+
+ def lastReceivedBatchRecords: Map[Int, Long] = {
+ val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.receivedBlockInfo)
+ lastReceivedBlockInfoOption.map { lastReceivedBlockInfo =>
+ (0 until numNetworkReceivers).map { receiverId =>
+ (receiverId, lastReceivedBlockInfo(receiverId).map(_.numRecords).sum)
+ }.toMap
+ }.getOrElse {
+ (0 until numNetworkReceivers).map(receiverId => (receiverId, 0L)).toMap
+ }
+ }
+
+ def receiverInfo(receiverId: Int): Option[ReceiverInfo] = {
+ receiverInfos.get(receiverId)
+ }
+
+ def lastCompletedBatch: Option[BatchInfo] = {
+ completedaBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption
+ }
+
+ def lastReceivedBatch: Option[BatchInfo] = {
+ retainedBatches.lastOption
+ }
+
+ private def retainedBatches: Seq[BatchInfo] = synchronized {
+ (waitingBatchInfos.values.toSeq ++
+ runningBatchInfos.values.toSeq ++ completedaBatchInfos).sortBy(_.batchTime)(Time.ordering)
+ }
+
+ private def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = {
+ Distribution(completedaBatchInfos.flatMap(getMetric(_)).map(_.toDouble))
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
new file mode 100644
index 0000000000000..6607437db560a
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.ui
+
+import java.util.Calendar
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.spark.Logging
+import org.apache.spark.ui._
+import org.apache.spark.ui.UIUtils._
+import org.apache.spark.util.Distribution
+
+/** Page for Spark Web UI that shows statistics of a streaming job */
+private[ui] class StreamingPage(parent: StreamingTab)
+ extends WebUIPage("") with Logging {
+
+ private val listener = parent.listener
+ private val startTime = Calendar.getInstance().getTime()
+ private val emptyCell = "-"
+
+ /** Render the page */
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val content =
+ generateBasicStats() ++ ++
+ Statistics over last {listener.retainedCompletedBatches.size} processed batches ++
+ generateNetworkStatsTable() ++
+ generateBatchStatsTable()
+ UIUtils.headerSparkPage(
+ content, parent.basePath, parent.appName, "Streaming", parent.headerTabs, parent, Some(5000))
+ }
+
+ /** Generate basic stats of the streaming program */
+ private def generateBasicStats(): Seq[Node] = {
+ val timeSinceStart = System.currentTimeMillis() - startTime.getTime
+
+ -
+ Started at: {startTime.toString}
+
+ -
+ Time since start: {formatDurationVerbose(timeSinceStart)}
+
+ -
+ Network receivers: {listener.numNetworkReceivers}
+
+ -
+ Batch interval: {formatDurationVerbose(listener.batchDuration)}
+
+ -
+ Processed batches: {listener.numTotalCompletedBatches}
+
+ -
+ Waiting batches: {listener.numUnprocessedBatches}
+
+
+ }
+
+ /** Generate stats of data received over the network the streaming program */
+ private def generateNetworkStatsTable(): Seq[Node] = {
+ val receivedRecordDistributions = listener.receivedRecordsDistributions
+ val lastBatchReceivedRecord = listener.lastReceivedBatchRecords
+ val table = if (receivedRecordDistributions.size > 0) {
+ val headerRow = Seq(
+ "Receiver",
+ "Location",
+ "Records in last batch\n[" + formatDate(Calendar.getInstance().getTime()) + "]",
+ "Minimum rate\n[records/sec]",
+ "25th percentile rate\n[records/sec]",
+ "Median rate\n[records/sec]",
+ "75th percentile rate\n[records/sec]",
+ "Maximum rate\n[records/sec]"
+ )
+ val dataRows = (0 until listener.numNetworkReceivers).map { receiverId =>
+ val receiverInfo = listener.receiverInfo(receiverId)
+ val receiverName = receiverInfo.map(_.toString).getOrElse(s"Receiver-$receiverId")
+ val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCell)
+ val receiverLastBatchRecords = formatDurationVerbose(lastBatchReceivedRecord(receiverId))
+ val receivedRecordStats = receivedRecordDistributions(receiverId).map { d =>
+ d.getQuantiles().map(r => formatDurationVerbose(r.toLong))
+ }.getOrElse {
+ Seq(emptyCell, emptyCell, emptyCell, emptyCell, emptyCell)
+ }
+ Seq(receiverName, receiverLocation, receiverLastBatchRecords) ++ receivedRecordStats
+ }
+ Some(listingTable(headerRow, dataRows))
+ } else {
+ None
+ }
+
+ val content =
+ Network Input Statistics ++
+ {table.getOrElse("No network receivers")}
+
+ content
+ }
+
+ /** Generate stats of batch jobs of the streaming program */
+ private def generateBatchStatsTable(): Seq[Node] = {
+ val numBatches = listener.retainedCompletedBatches.size
+ val lastCompletedBatch = listener.lastCompletedBatch
+ val table = if (numBatches > 0) {
+ val processingDelayQuantilesRow = {
+ Seq(
+ "Processing Time",
+ formatDurationOption(lastCompletedBatch.flatMap(_.processingDelay))
+ ) ++ getQuantiles(listener.processingDelayDistribution)
+ }
+ val schedulingDelayQuantilesRow = {
+ Seq(
+ "Scheduling Delay",
+ formatDurationOption(lastCompletedBatch.flatMap(_.schedulingDelay))
+ ) ++ getQuantiles(listener.schedulingDelayDistribution)
+ }
+ val totalDelayQuantilesRow = {
+ Seq(
+ "Total Delay",
+ formatDurationOption(lastCompletedBatch.flatMap(_.totalDelay))
+ ) ++ getQuantiles(listener.totalDelayDistribution)
+ }
+ val headerRow = Seq("Metric", "Last batch", "Minimum", "25th percentile",
+ "Median", "75th percentile", "Maximum")
+ val dataRows: Seq[Seq[String]] = Seq(
+ processingDelayQuantilesRow,
+ schedulingDelayQuantilesRow,
+ totalDelayQuantilesRow
+ )
+ Some(listingTable(headerRow, dataRows))
+ } else {
+ None
+ }
+
+ val content =
+ Batch Processing Statistics ++
+
+
+ {table.getOrElse("No statistics have been generated yet.")}
+
+
+
+ content
+ }
+
+
+ /**
+ * Returns a human-readable string representing a duration such as "5 second 35 ms"
+ */
+ private def formatDurationOption(msOption: Option[Long]): String = {
+ msOption.map(formatDurationVerbose).getOrElse(emptyCell)
+ }
+
+ /** Get quantiles for any time distribution */
+ private def getQuantiles(timeDistributionOption: Option[Distribution]) = {
+ timeDistributionOption.get.getQuantiles().map { ms => formatDurationVerbose(ms.toLong) }
+ }
+
+ /** Generate HTML table from string data */
+ private def listingTable(headers: Seq[String], data: Seq[Seq[String]]) = {
+ def generateDataRow(data: Seq[String]): Seq[Node] = {
+ | {data.map(d => {d} | )}
+ }
+ UIUtils.listingTable(headers, generateDataRow, data, fixedWidth = true)
+ }
+}
+
diff --git a/core/src/test/scala/org/apache/spark/SparkUISuite.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
similarity index 58%
rename from core/src/test/scala/org/apache/spark/SparkUISuite.scala
rename to streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
index d0d119c15081d..51448d15c6516 100644
--- a/core/src/test/scala/org/apache/spark/SparkUISuite.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
@@ -15,21 +15,22 @@
* limitations under the License.
*/
-package org.apache.spark
+package org.apache.spark.streaming.ui
-import java.net.URI
+import org.apache.spark.Logging
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.ui.WebUITab
-import org.scalatest.FunSuite
+/** Spark Web UI tab that shows statistics of a streaming job */
+private[spark] class StreamingTab(ssc: StreamingContext)
+ extends WebUITab(ssc.sc.ui, "streaming") with Logging {
-class SparkUISuite extends FunSuite with SharedSparkContext {
+ val parent = ssc.sc.ui
+ val appName = parent.appName
+ val basePath = parent.basePath
+ val listener = new StreamingJobProgressListener(ssc)
- test("verify appUIAddress contains the scheme") {
- val uiAddress = sc.ui.appUIAddress
- assert(uiAddress.equals("http://" + sc.ui.appUIHostPort))
- }
-
- test("verify appUIAddress contains the port") {
- val splitUIAddress = sc.ui.appUIAddress.split(':')
- assert(splitUIAddress(2).toInt == sc.ui.boundPort)
- }
+ ssc.addStreamingListener(listener)
+ attachPage(new StreamingPage(this))
+ parent.attachTab(this)
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 389b23d4d5e4b..952511d411a8e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -239,11 +239,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
/** This is a server to test the network input stream */
-class TestServer() extends Logging {
+class TestServer(portToBind: Int = 0) extends Logging {
val queue = new ArrayBlockingQueue[String](100)
- val serverSocket = new ServerSocket(0)
+ val serverSocket = new ServerSocket(portToBind)
val servingThread = new Thread() {
override def run() {
@@ -282,7 +282,7 @@ class TestServer() extends Logging {
def start() { servingThread.start() }
- def send(msg: String) { queue.add(msg) }
+ def send(msg: String) { queue.put(msg) }
def stop() { servingThread.interrupt() }
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 9cc27ef7f03b5..efd0d22ecb57a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -161,7 +161,6 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
}
}
-
test("stop only streaming context") {
ssc = new StreamingContext(master, appName, batchDuration)
sc = ssc.sparkContext
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
new file mode 100644
index 0000000000000..35538ec188f67
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.io.Source
+
+import org.scalatest.FunSuite
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
+
+class UISuite extends FunSuite {
+
+ test("streaming tab in spark UI") {
+ val ssc = new StreamingContext("local", "test", Seconds(1))
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ val html = Source.fromURL(ssc.sparkContext.ui.appUIAddress).mkString
+ assert(!html.contains("random data that should not be present"))
+ // test if streaming tab exist
+ assert(html.toLowerCase.contains("streaming"))
+ // test if other Spark tabs still exist
+ assert(html.toLowerCase.contains("stages"))
+ }
+
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ val html = Source.fromURL(
+ ssc.sparkContext.ui.appUIAddress.stripSuffix("/") + "/streaming").mkString
+ assert(html.toLowerCase.contains("batch"))
+ assert(html.toLowerCase.contains("network"))
+ }
+ }
+}