From 1b7106b867bc0aa4d64b669d79b646f862acaf47 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 29 Apr 2015 18:22:14 -0700 Subject: [PATCH] [SPARK-6862] [STREAMING] [WEBUI] Add BatchPage to display details of a batch This is an initial commit for SPARK-6862. Once SPARK-6796 is merged, I will add the links to StreamingPage so that the user can jump to BatchPage. Screenshots: ![success](https://cloud.githubusercontent.com/assets/1000778/7102439/bbe75406-e0b3-11e4-84fe-3e6de629a49a.png) ![failure](https://cloud.githubusercontent.com/assets/1000778/7102440/bc124454-e0b3-11e4-921a-c8b39d6b61bc.png) Author: zsxwing Closes #5473 from zsxwing/SPARK-6862 and squashes the following commits: 0727d35 [zsxwing] Change BatchUIData to a case class b380cfb [zsxwing] Add createJobStart to eliminate duplicate codes 9a3083d [zsxwing] Rename XxxDatas -> XxxData 087ba98 [zsxwing] Refactor BatchInfo to store only necessary fields cb62e4f [zsxwing] Use Seq[(OutputOpId, SparkJobId)] to store the id relations 72f8e7e [zsxwing] Add unit tests for BatchPage 1282b10 [zsxwing] Handle some corner cases and add tests for StreamingJobProgressListener 77a69ae [zsxwing] Refactor codes as per TD's comments 35ffd80 [zsxwing] Merge branch 'master' into SPARK-6862 15bdf9b [zsxwing] Add batch links and unit tests 4bf66b6 [zsxwing] Merge branch 'master' into SPARK-6862 7168807 [zsxwing] Limit the max width of the error message and fix nits in the UI 0b226f9 [zsxwing] Change 'Last Error' to 'Error' fc98a43 [zsxwing] Put clearing local properties to finally and remove redundant private[streaming] 0c7b2eb [zsxwing] Add BatchPage to display details of a batch --- .../org/apache/spark/ui/jobs/UIData.scala | 2 +- .../spark/streaming/dstream/DStream.scala | 2 +- .../spark/streaming/scheduler/Job.scala | 44 ++- .../streaming/scheduler/JobScheduler.scala | 28 +- .../spark/streaming/scheduler/JobSet.scala | 2 +- .../spark/streaming/ui/AllBatchesTable.scala | 26 +- .../apache/spark/streaming/ui/BatchPage.scala | 264 ++++++++++++++++++ .../spark/streaming/ui/BatchUIData.scala | 75 +++++ .../ui/StreamingJobProgressListener.scala | 161 ++++++++--- .../spark/streaming/ui/StreamingTab.scala | 4 +- .../spark/streaming/UISeleniumSuite.scala | 83 +++++- .../StreamingJobProgressListenerSuite.scala | 100 ++++++- 12 files changed, 710 insertions(+), 81 deletions(-) create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index 711a3697bda15..935c8a4f80e7b 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -24,7 +24,7 @@ import org.apache.spark.util.collection.OpenHashSet import scala.collection.mutable.HashMap -private[jobs] object UIData { +private[spark] object UIData { class ExecutorSummary { var taskTime : Long = 0 diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 24f99a2b929f5..83d41f5762444 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -626,7 +626,7 @@ abstract class DStream[T: ClassTag] ( println("Time: " + time) println("-------------------------------------------") firstNum.take(num).foreach(println) - if (firstNum.size > num) println("...") + if (firstNum.length > num) println("...") println() } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala index 30cf87f5b7dd1..3c481bf3491f9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala @@ -25,15 +25,49 @@ import scala.util.Try */ private[streaming] class Job(val time: Time, func: () => _) { - var id: String = _ - var result: Try[_] = null + private var _id: String = _ + private var _outputOpId: Int = _ + private var isSet = false + private var _result: Try[_] = null def run() { - result = Try(func()) + _result = Try(func()) } - def setId(number: Int) { - id = "streaming job " + time + "." + number + def result: Try[_] = { + if (_result == null) { + throw new IllegalStateException("Cannot access result before job finishes") + } + _result + } + + /** + * @return the global unique id of this Job. + */ + def id: String = { + if (!isSet) { + throw new IllegalStateException("Cannot access id before calling setId") + } + _id + } + + /** + * @return the output op id of this Job. Each Job has a unique output op id in the same JobSet. + */ + def outputOpId: Int = { + if (!isSet) { + throw new IllegalStateException("Cannot access number before calling setId") + } + _outputOpId + } + + def setOutputOpId(outputOpId: Int) { + if (isSet) { + throw new IllegalStateException("Cannot call setOutputOpId more than once") + } + isSet = true + _id = s"streaming job $time.$outputOpId" + _outputOpId = outputOpId } override def toString: String = id diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 508b89278dcba..c7a2c1141a128 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -172,16 +172,28 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { ssc.waiter.notifyError(e) } - private class JobHandler(job: Job) extends Runnable { + private class JobHandler(job: Job) extends Runnable with Logging { def run() { - eventLoop.post(JobStarted(job)) - // Disable checks for existing output directories in jobs launched by the streaming scheduler, - // since we may need to write output to an existing directory during checkpoint recovery; - // see SPARK-4835 for more details. - PairRDDFunctions.disableOutputSpecValidation.withValue(true) { - job.run() + ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString) + ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString) + try { + eventLoop.post(JobStarted(job)) + // Disable checks for existing output directories in jobs launched by the streaming + // scheduler, since we may need to write output to an existing directory during checkpoint + // recovery; see SPARK-4835 for more details. + PairRDDFunctions.disableOutputSpecValidation.withValue(true) { + job.run() + } + eventLoop.post(JobCompleted(job)) + } finally { + ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null) + ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null) } - eventLoop.post(JobCompleted(job)) } } } + +private[streaming] object JobScheduler { + val BATCH_TIME_PROPERTY_KEY = "spark.streaming.internal.batchTime" + val OUTPUT_OP_ID_PROPERTY_KEY = "spark.streaming.internal.outputOpId" +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala index 5b134877d0b2d..24b3794236ea5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala @@ -35,7 +35,7 @@ case class JobSet( private var processingStartTime = -1L // when the first job of this jobset started processing private var processingEndTime = -1L // when the last job of this jobset finished processing - jobs.zipWithIndex.foreach { case (job, i) => job.setId(i) } + jobs.zipWithIndex.foreach { case (job, i) => job.setOutputOpId(i) } incompleteJobs ++= jobs def handleJobStart(job: Job) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala index df1c0a10704c3..e219e27785533 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala @@ -19,7 +19,6 @@ package org.apache.spark.streaming.ui import scala.xml.Node -import org.apache.spark.streaming.scheduler.BatchInfo import org.apache.spark.ui.UIUtils private[ui] abstract class BatchTableBase(tableId: String) { @@ -31,18 +30,20 @@ private[ui] abstract class BatchTableBase(tableId: String) { Processing Time } - protected def baseRow(batch: BatchInfo): Seq[Node] = { + protected def baseRow(batch: BatchUIData): Seq[Node] = { val batchTime = batch.batchTime.milliseconds val formattedBatchTime = UIUtils.formatDate(batch.batchTime.milliseconds) - val eventCount = batch.receivedBlockInfo.values.map { - receivers => receivers.map(_.numRecords).sum - }.sum + val eventCount = batch.numRecords val schedulingDelay = batch.schedulingDelay val formattedSchedulingDelay = schedulingDelay.map(UIUtils.formatDuration).getOrElse("-") val processingTime = batch.processingDelay val formattedProcessingTime = processingTime.map(UIUtils.formatDuration).getOrElse("-") - {formattedBatchTime} + + + {formattedBatchTime} + + {eventCount.toString} events {formattedSchedulingDelay} @@ -73,8 +74,9 @@ private[ui] abstract class BatchTableBase(tableId: String) { protected def renderRows: Seq[Node] } -private[ui] class ActiveBatchTable(runningBatches: Seq[BatchInfo], waitingBatches: Seq[BatchInfo]) - extends BatchTableBase("active-batches-table") { +private[ui] class ActiveBatchTable( + runningBatches: Seq[BatchUIData], + waitingBatches: Seq[BatchUIData]) extends BatchTableBase("active-batches-table") { override protected def columns: Seq[Node] = super.columns ++ Status @@ -85,16 +87,16 @@ private[ui] class ActiveBatchTable(runningBatches: Seq[BatchInfo], waitingBatche runningBatches.flatMap(batch => {runningBatchRow(batch)}) } - private def runningBatchRow(batch: BatchInfo): Seq[Node] = { + private def runningBatchRow(batch: BatchUIData): Seq[Node] = { baseRow(batch) ++ processing } - private def waitingBatchRow(batch: BatchInfo): Seq[Node] = { + private def waitingBatchRow(batch: BatchUIData): Seq[Node] = { baseRow(batch) ++ queued } } -private[ui] class CompletedBatchTable(batches: Seq[BatchInfo]) +private[ui] class CompletedBatchTable(batches: Seq[BatchUIData]) extends BatchTableBase("completed-batches-table") { override protected def columns: Seq[Node] = super.columns ++ Total Delay @@ -103,7 +105,7 @@ private[ui] class CompletedBatchTable(batches: Seq[BatchInfo]) batches.flatMap(batch => {completedBatchRow(batch)}) } - private def completedBatchRow(batch: BatchInfo): Seq[Node] = { + private def completedBatchRow(batch: BatchUIData): Seq[Node] = { val totalDelay = batch.totalDelay val formattedTotalDelay = totalDelay.map(UIUtils.formatDuration).getOrElse("-") baseRow(batch) ++ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala new file mode 100644 index 0000000000000..2da9a29e2529e --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.ui + +import javax.servlet.http.HttpServletRequest + +import scala.xml.{NodeSeq, Node} + +import org.apache.commons.lang3.StringEscapeUtils + +import org.apache.spark.streaming.Time +import org.apache.spark.ui.{UIUtils, WebUIPage} +import org.apache.spark.streaming.ui.StreamingJobProgressListener.{SparkJobId, OutputOpId} +import org.apache.spark.ui.jobs.UIData.JobUIData + + +private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { + private val streamingListener = parent.listener + private val sparkListener = parent.ssc.sc.jobProgressListener + + private def columns: Seq[Node] = { + Output Op Id + Description + Duration + Job Id + Duration + Stages: Succeeded/Total + Tasks (for all stages): Succeeded/Total + Error + } + + /** + * Generate a row for a Spark Job. Because duplicated output op infos needs to be collapsed into + * one cell, we use "rowspan" for the first row of a output op. + */ + def generateJobRow( + outputOpId: OutputOpId, + formattedOutputOpDuration: String, + numSparkJobRowsInOutputOp: Int, + isFirstRow: Boolean, + sparkJob: JobUIData): Seq[Node] = { + val lastStageInfo = Option(sparkJob.stageIds) + .filter(_.nonEmpty) + .flatMap { ids => sparkListener.stageIdToInfo.get(ids.max) } + val lastStageData = lastStageInfo.flatMap { s => + sparkListener.stageIdToData.get((s.stageId, s.attemptId)) + } + + val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)") + val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("") + val duration: Option[Long] = { + sparkJob.submissionTime.map { start => + val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis()) + end - start + } + } + val lastFailureReason = + sparkJob.stageIds.sorted.reverse.flatMap(sparkListener.stageIdToInfo.get). + dropWhile(_.failureReason == None).take(1). // get the first info that contains failure + flatMap(info => info.failureReason).headOption.getOrElse("") + val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("-") + val detailUrl = s"${UIUtils.prependBaseUri(parent.basePath)}/jobs/job?id=${sparkJob.jobId}" + + // In the first row, output op id and its information needs to be shown. In other rows, these + // cells will be taken up due to "rowspan". + // scalastyle:off + val prefixCells = + if (isFirstRow) { + {outputOpId.toString} + + + {lastStageDescription} + {lastStageName} + + {formattedOutputOpDuration} + } else { + Nil + } + // scalastyle:on + + + {prefixCells} + + + {sparkJob.jobId}{sparkJob.jobGroup.map(id => s"($id)").getOrElse("")} + + + + {formattedDuration} + + + {sparkJob.completedStageIndices.size}/{sparkJob.stageIds.size - sparkJob.numSkippedStages} + {if (sparkJob.numFailedStages > 0) s"(${sparkJob.numFailedStages} failed)"} + {if (sparkJob.numSkippedStages > 0) s"(${sparkJob.numSkippedStages} skipped)"} + + + { + UIUtils.makeProgressBar( + started = sparkJob.numActiveTasks, + completed = sparkJob.numCompletedTasks, + failed = sparkJob.numFailedTasks, + skipped = sparkJob.numSkippedTasks, + total = sparkJob.numTasks - sparkJob.numSkippedTasks) + } + + {failureReasonCell(lastFailureReason)} + + } + + private def generateOutputOpIdRow( + outputOpId: OutputOpId, sparkJobs: Seq[JobUIData]): Seq[Node] = { + val sparkjobDurations = sparkJobs.map(sparkJob => { + sparkJob.submissionTime.map { start => + val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis()) + end - start + } + }) + val formattedOutputOpDuration = + if (sparkjobDurations.exists(_ == None)) { + // If any job does not finish, set "formattedOutputOpDuration" to "-" + "-" + } else { + UIUtils.formatDuration(sparkjobDurations.flatMap(x => x).sum) + } + generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++ + sparkJobs.tail.map { sparkJob => + generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, false, sparkJob) + }.flatMap(x => x) + } + + private def failureReasonCell(failureReason: String): Seq[Node] = { + val isMultiline = failureReason.indexOf('\n') >= 0 + // Display the first line by default + val failureReasonSummary = StringEscapeUtils.escapeHtml4( + if (isMultiline) { + failureReason.substring(0, failureReason.indexOf('\n')) + } else { + failureReason + }) + val details = if (isMultiline) { + // scalastyle:off + + +details + ++ + + // scalastyle:on + } else { + "" + } + {failureReasonSummary}{details} + } + + private def getJobData(sparkJobId: SparkJobId): Option[JobUIData] = { + sparkListener.activeJobs.get(sparkJobId).orElse { + sparkListener.completedJobs.find(_.jobId == sparkJobId).orElse { + sparkListener.failedJobs.find(_.jobId == sparkJobId) + } + } + } + + /** + * Generate the job table for the batch. + */ + private def generateJobTable(batchUIData: BatchUIData): Seq[Node] = { + val outputOpIdToSparkJobIds = batchUIData.outputOpIdSparkJobIdPairs.groupBy(_.outputOpId).toSeq. + sortBy(_._1). // sorted by OutputOpId + map { case (outputOpId, outputOpIdAndSparkJobIds) => + // sort SparkJobIds for each OutputOpId + (outputOpId, outputOpIdAndSparkJobIds.map(_.sparkJobId).sorted) + } + sparkListener.synchronized { + val outputOpIdWithJobs: Seq[(OutputOpId, Seq[JobUIData])] = + outputOpIdToSparkJobIds.map { case (outputOpId, sparkJobIds) => + // Filter out spark Job ids that don't exist in sparkListener + (outputOpId, sparkJobIds.flatMap(getJobData)) + } + + + + {columns} + + + { + outputOpIdWithJobs.map { + case (outputOpId, jobs) => generateOutputOpIdRow(outputOpId, jobs) + } + } + +
+ } + } + + def render(request: HttpServletRequest): Seq[Node] = { + val batchTime = Option(request.getParameter("id")).map(id => Time(id.toLong)).getOrElse { + throw new IllegalArgumentException(s"Missing id parameter") + } + val formattedBatchTime = UIUtils.formatDate(batchTime.milliseconds) + + val batchUIData = streamingListener.getBatchUIData(batchTime).getOrElse { + throw new IllegalArgumentException(s"Batch $formattedBatchTime does not exist") + } + + val formattedSchedulingDelay = + batchUIData.schedulingDelay.map(UIUtils.formatDuration).getOrElse("-") + val formattedProcessingTime = + batchUIData.processingDelay.map(UIUtils.formatDuration).getOrElse("-") + val formattedTotalDelay = batchUIData.totalDelay.map(UIUtils.formatDuration).getOrElse("-") + + val summary: NodeSeq = +
+
    +
  • + Batch Duration: + {UIUtils.formatDuration(streamingListener.batchDuration)} +
  • +
  • + Input data size: + {batchUIData.numRecords} records +
  • +
  • + Scheduling delay: + {formattedSchedulingDelay} +
  • +
  • + Processing time: + {formattedProcessingTime} +
  • +
  • + Total delay: + {formattedTotalDelay} +
  • +
+
+ + val jobTable = + if (batchUIData.outputOpIdSparkJobIdPairs.isEmpty) { +
Cannot find any job for Batch {formattedBatchTime}.
+ } else { + generateJobTable(batchUIData) + } + + val content = summary ++ jobTable + + UIUtils.headerSparkPage(s"Details of batch at $formattedBatchTime", content, parent) + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala new file mode 100644 index 0000000000000..f45c291b7c0fe --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.streaming.ui + +import org.apache.spark.streaming.Time +import org.apache.spark.streaming.scheduler.BatchInfo +import org.apache.spark.streaming.ui.StreamingJobProgressListener._ + +private[ui] case class OutputOpIdAndSparkJobId(outputOpId: OutputOpId, sparkJobId: SparkJobId) + +private[ui] case class BatchUIData( + val batchTime: Time, + val receiverNumRecords: Map[Int, Long], + val submissionTime: Long, + val processingStartTime: Option[Long], + val processingEndTime: Option[Long], + var outputOpIdSparkJobIdPairs: Seq[OutputOpIdAndSparkJobId] = Seq.empty) { + + /** + * Time taken for the first job of this batch to start processing from the time this batch + * was submitted to the streaming scheduler. Essentially, it is + * `processingStartTime` - `submissionTime`. + */ + def schedulingDelay: Option[Long] = processingStartTime.map(_ - submissionTime) + + /** + * Time taken for the all jobs of this batch to finish processing from the time they started + * processing. Essentially, it is `processingEndTime` - `processingStartTime`. + */ + def processingDelay: Option[Long] = { + for (start <- processingStartTime; + end <- processingEndTime) + yield end - start + } + + /** + * Time taken for all the jobs of this batch to finish processing from the time they + * were submitted. Essentially, it is `processingDelay` + `schedulingDelay`. + */ + def totalDelay: Option[Long] = processingEndTime.map(_ - submissionTime) + + /** + * The number of recorders received by the receivers in this batch. + */ + def numRecords: Long = receiverNumRecords.map(_._2).sum +} + +private[ui] object BatchUIData { + + def apply(batchInfo: BatchInfo): BatchUIData = { + new BatchUIData( + batchInfo.batchTime, + batchInfo.receivedBlockInfo.mapValues(_.map(_.numRecords).sum), + batchInfo.submissionTime, + batchInfo.processingStartTime, + batchInfo.processingEndTime + ) + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala index be1e8686cf9fa..34b55717a1db2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala @@ -17,29 +17,58 @@ package org.apache.spark.streaming.ui -import scala.collection.mutable.{Queue, HashMap} +import java.util.LinkedHashMap +import java.util.{Map => JMap} +import java.util.Properties +import scala.collection.mutable.{ArrayBuffer, Queue, HashMap, SynchronizedBuffer} + +import org.apache.spark.scheduler._ import org.apache.spark.streaming.{Time, StreamingContext} import org.apache.spark.streaming.scheduler._ import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted -import org.apache.spark.streaming.scheduler.BatchInfo import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted import org.apache.spark.util.Distribution private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) - extends StreamingListener { + extends StreamingListener with SparkListener { - private val waitingBatchInfos = new HashMap[Time, BatchInfo] - private val runningBatchInfos = new HashMap[Time, BatchInfo] - private val completedBatchInfos = new Queue[BatchInfo] - private val batchInfoLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100) + private val waitingBatchUIData = new HashMap[Time, BatchUIData] + private val runningBatchUIData = new HashMap[Time, BatchUIData] + private val completedBatchUIData = new Queue[BatchUIData] + private val batchUIDataLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100) private var totalCompletedBatches = 0L private var totalReceivedRecords = 0L private var totalProcessedRecords = 0L private val receiverInfos = new HashMap[Int, ReceiverInfo] + // Because onJobStart and onBatchXXX messages are processed in different threads, + // we may not be able to get the corresponding BatchUIData when receiving onJobStart. So here we + // cannot use a map of (Time, BatchUIData). + private[ui] val batchTimeToOutputOpIdSparkJobIdPair = + new LinkedHashMap[Time, SynchronizedBuffer[OutputOpIdAndSparkJobId]] { + override def removeEldestEntry( + p1: JMap.Entry[Time, SynchronizedBuffer[OutputOpIdAndSparkJobId]]): Boolean = { + // If a lot of "onBatchCompleted"s happen before "onJobStart" (image if + // SparkContext.listenerBus is very slow), "batchTimeToOutputOpIdToSparkJobIds" + // may add some information for a removed batch when processing "onJobStart". It will be a + // memory leak. + // + // To avoid the memory leak, we control the size of "batchTimeToOutputOpIdToSparkJobIds" and + // evict the eldest one. + // + // Note: if "onJobStart" happens before "onBatchSubmitted", the size of + // "batchTimeToOutputOpIdToSparkJobIds" may be greater than the number of the retained + // batches temporarily, so here we use "10" to handle such case. This is not a perfect + // solution, but at least it can handle most of cases. + size() > + waitingBatchUIData.size + runningBatchUIData.size + completedBatchUIData.size + 10 + } + } + + val batchDuration = ssc.graph.batchDuration.milliseconds override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { @@ -62,37 +91,62 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = { synchronized { - waitingBatchInfos(batchSubmitted.batchInfo.batchTime) = batchSubmitted.batchInfo + waitingBatchUIData(batchSubmitted.batchInfo.batchTime) = + BatchUIData(batchSubmitted.batchInfo) } } override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = synchronized { - runningBatchInfos(batchStarted.batchInfo.batchTime) = batchStarted.batchInfo - waitingBatchInfos.remove(batchStarted.batchInfo.batchTime) + val batchUIData = BatchUIData(batchStarted.batchInfo) + runningBatchUIData(batchStarted.batchInfo.batchTime) = BatchUIData(batchStarted.batchInfo) + waitingBatchUIData.remove(batchStarted.batchInfo.batchTime) - batchStarted.batchInfo.receivedBlockInfo.foreach { case (_, infos) => - totalReceivedRecords += infos.map(_.numRecords).sum - } + totalReceivedRecords += batchUIData.numRecords } override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = { synchronized { - waitingBatchInfos.remove(batchCompleted.batchInfo.batchTime) - runningBatchInfos.remove(batchCompleted.batchInfo.batchTime) - completedBatchInfos.enqueue(batchCompleted.batchInfo) - if (completedBatchInfos.size > batchInfoLimit) completedBatchInfos.dequeue() + waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime) + runningBatchUIData.remove(batchCompleted.batchInfo.batchTime) + val batchUIData = BatchUIData(batchCompleted.batchInfo) + completedBatchUIData.enqueue(batchUIData) + if (completedBatchUIData.size > batchUIDataLimit) { + val removedBatch = completedBatchUIData.dequeue() + batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime) + } totalCompletedBatches += 1L - batchCompleted.batchInfo.receivedBlockInfo.foreach { case (_, infos) => - totalProcessedRecords += infos.map(_.numRecords).sum + totalProcessedRecords += batchUIData.numRecords + } + } + + override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized { + getBatchTimeAndOutputOpId(jobStart.properties).foreach { case (batchTime, outputOpId) => + var outputOpIdToSparkJobIds = batchTimeToOutputOpIdSparkJobIdPair.get(batchTime) + if (outputOpIdToSparkJobIds == null) { + outputOpIdToSparkJobIds = + new ArrayBuffer[OutputOpIdAndSparkJobId]() + with SynchronizedBuffer[OutputOpIdAndSparkJobId] + batchTimeToOutputOpIdSparkJobIdPair.put(batchTime, outputOpIdToSparkJobIds) } + outputOpIdToSparkJobIds += OutputOpIdAndSparkJobId(outputOpId, jobStart.jobId) } } - def numReceivers: Int = synchronized { - ssc.graph.getReceiverInputStreams().size + private def getBatchTimeAndOutputOpId(properties: Properties): Option[(Time, Int)] = { + val batchTime = properties.getProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY) + if (batchTime == null) { + // Not submitted from JobScheduler + None + } else { + val outputOpId = properties.getProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY) + assert(outputOpId != null) + Some(Time(batchTime.toLong) -> outputOpId.toInt) + } } + def numReceivers: Int = ssc.graph.getReceiverInputStreams().size + def numTotalCompletedBatches: Long = synchronized { totalCompletedBatches } @@ -106,19 +160,19 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) } def numUnprocessedBatches: Long = synchronized { - waitingBatchInfos.size + runningBatchInfos.size + waitingBatchUIData.size + runningBatchUIData.size } - def waitingBatches: Seq[BatchInfo] = synchronized { - waitingBatchInfos.values.toSeq + def waitingBatches: Seq[BatchUIData] = synchronized { + waitingBatchUIData.values.toSeq } - def runningBatches: Seq[BatchInfo] = synchronized { - runningBatchInfos.values.toSeq + def runningBatches: Seq[BatchUIData] = synchronized { + runningBatchUIData.values.toSeq } - def retainedCompletedBatches: Seq[BatchInfo] = synchronized { - completedBatchInfos.toSeq + def retainedCompletedBatches: Seq[BatchUIData] = synchronized { + completedBatchUIData.toSeq } def processingDelayDistribution: Option[Distribution] = synchronized { @@ -134,15 +188,11 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) } def receivedRecordsDistributions: Map[Int, Option[Distribution]] = synchronized { - val latestBatchInfos = retainedBatches.reverse.take(batchInfoLimit) - val latestBlockInfos = latestBatchInfos.map(_.receivedBlockInfo) + val latestBatches = retainedBatches.reverse.take(batchUIDataLimit) (0 until numReceivers).map { receiverId => - val blockInfoOfParticularReceiver = latestBlockInfos.map { batchInfo => - batchInfo.get(receiverId).getOrElse(Array.empty) - } - val recordsOfParticularReceiver = blockInfoOfParticularReceiver.map { blockInfo => - // calculate records per second for each batch - blockInfo.map(_.numRecords).sum.toDouble * 1000 / batchDuration + val recordsOfParticularReceiver = latestBatches.map { batch => + // calculate records per second for each batch + batch.receiverNumRecords.get(receiverId).sum.toDouble * 1000 / batchDuration } val distributionOption = Distribution(recordsOfParticularReceiver) (receiverId, distributionOption) @@ -150,10 +200,10 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) } def lastReceivedBatchRecords: Map[Int, Long] = synchronized { - val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.receivedBlockInfo) + val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.receiverNumRecords) lastReceivedBlockInfoOption.map { lastReceivedBlockInfo => (0 until numReceivers).map { receiverId => - (receiverId, lastReceivedBlockInfo(receiverId).map(_.numRecords).sum) + (receiverId, lastReceivedBlockInfo.getOrElse(receiverId, 0L)) }.toMap }.getOrElse { (0 until numReceivers).map(receiverId => (receiverId, 0L)).toMap @@ -164,20 +214,39 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) receiverInfos.get(receiverId) } - def lastCompletedBatch: Option[BatchInfo] = synchronized { - completedBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption + def lastCompletedBatch: Option[BatchUIData] = synchronized { + completedBatchUIData.sortBy(_.batchTime)(Time.ordering).lastOption } - def lastReceivedBatch: Option[BatchInfo] = synchronized { + def lastReceivedBatch: Option[BatchUIData] = synchronized { retainedBatches.lastOption } - private def retainedBatches: Seq[BatchInfo] = { - (waitingBatchInfos.values.toSeq ++ - runningBatchInfos.values.toSeq ++ completedBatchInfos).sortBy(_.batchTime)(Time.ordering) + private def retainedBatches: Seq[BatchUIData] = { + (waitingBatchUIData.values.toSeq ++ + runningBatchUIData.values.toSeq ++ completedBatchUIData).sortBy(_.batchTime)(Time.ordering) + } + + private def extractDistribution(getMetric: BatchUIData => Option[Long]): Option[Distribution] = { + Distribution(completedBatchUIData.flatMap(getMetric(_)).map(_.toDouble)) } - private def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = { - Distribution(completedBatchInfos.flatMap(getMetric(_)).map(_.toDouble)) + def getBatchUIData(batchTime: Time): Option[BatchUIData] = synchronized { + val batchUIData = waitingBatchUIData.get(batchTime).orElse { + runningBatchUIData.get(batchTime).orElse { + completedBatchUIData.find(batch => batch.batchTime == batchTime) + } + } + batchUIData.foreach { _batchUIData => + val outputOpIdToSparkJobIds = + Option(batchTimeToOutputOpIdSparkJobIdPair.get(batchTime)).getOrElse(Seq.empty) + _batchUIData.outputOpIdSparkJobIdPairs = outputOpIdToSparkJobIds + } + batchUIData } } + +private[streaming] object StreamingJobProgressListener { + type SparkJobId = Int + type OutputOpId = Int +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala index 9a860ea4a6c68..e4039639adbad 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala @@ -27,14 +27,16 @@ import StreamingTab._ * Spark Web UI tab that shows statistics of a streaming job. * This assumes the given SparkContext has enabled its SparkUI. */ -private[spark] class StreamingTab(ssc: StreamingContext) +private[spark] class StreamingTab(val ssc: StreamingContext) extends SparkUITab(getSparkUI(ssc), "streaming") with Logging { val parent = getSparkUI(ssc) val listener = ssc.progressListener ssc.addStreamingListener(listener) + ssc.sc.addSparkListener(listener) attachPage(new StreamingPage(this)) + attachPage(new BatchPage(this)) parent.attachTab(this) def detach() { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala index 205ddf6dbe9b0..8de43baabc21d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.streaming +import scala.collection.mutable.Queue + import org.openqa.selenium.WebDriver import org.openqa.selenium.htmlunit.HtmlUnitDriver import org.scalatest._ @@ -60,8 +62,28 @@ class UISeleniumSuite ssc } + private def setupStreams(ssc: StreamingContext): Unit = { + val rdds = Queue(ssc.sc.parallelize(1 to 4, 4)) + val inputStream = ssc.queueStream(rdds) + inputStream.foreachRDD { rdd => + rdd.foreach(_ => {}) + rdd.foreach(_ => {}) + } + inputStream.foreachRDD { rdd => + rdd.foreach(_ => {}) + try { + rdd.foreach(_ => throw new RuntimeException("Oops")) + } catch { + case e: SparkException if e.getMessage.contains("Oops") => + } + } + } + test("attaching and detaching a Streaming tab") { withStreamingContext(newSparkStreamingContext()) { ssc => + setupStreams(ssc) + ssc.start() + val sparkUI = ssc.sparkContext.ui.get eventually(timeout(10 seconds), interval(50 milliseconds)) { @@ -77,8 +99,8 @@ class UISeleniumSuite statisticText should contain("Batch interval:") val h4Text = findAll(cssSelector("h4")).map(_.text).toSeq - h4Text should contain("Active Batches (0)") - h4Text should contain("Completed Batches (last 0 out of 0)") + h4Text.exists(_.matches("Active Batches \\(\\d+\\)")) should be (true) + h4Text.exists(_.matches("Completed Batches \\(last \\d+ out of \\d+\\)")) should be (true) findAll(cssSelector("""#active-batches-table th""")).map(_.text).toSeq should be { List("Batch Time", "Input Size", "Scheduling Delay", "Processing Time", "Status") @@ -86,6 +108,63 @@ class UISeleniumSuite findAll(cssSelector("""#completed-batches-table th""")).map(_.text).toSeq should be { List("Batch Time", "Input Size", "Scheduling Delay", "Processing Time", "Total Delay") } + + val batchLinks = + findAll(cssSelector("""#completed-batches-table a""")).flatMap(_.attribute("href")).toSeq + batchLinks.size should be >= 1 + + // Check a normal batch page + go to (batchLinks.last) // Last should be the first batch, so it will have some jobs + val summaryText = findAll(cssSelector("li strong")).map(_.text).toSeq + summaryText should contain ("Batch Duration:") + summaryText should contain ("Input data size:") + summaryText should contain ("Scheduling delay:") + summaryText should contain ("Processing time:") + summaryText should contain ("Total delay:") + + findAll(cssSelector("""#batch-job-table th""")).map(_.text).toSeq should be { + List("Output Op Id", "Description", "Duration", "Job Id", "Duration", + "Stages: Succeeded/Total", "Tasks (for all stages): Succeeded/Total", "Error") + } + + // Check we have 2 output op ids + val outputOpIds = findAll(cssSelector(".output-op-id-cell")).toSeq + outputOpIds.map(_.attribute("rowspan")) should be (List(Some("2"), Some("2"))) + outputOpIds.map(_.text) should be (List("0", "1")) + + // Check job ids + val jobIdCells = findAll(cssSelector( """#batch-job-table a""")).toSeq + jobIdCells.map(_.text) should be (List("0", "1", "2", "3")) + + val jobLinks = jobIdCells.flatMap(_.attribute("href")) + jobLinks.size should be (4) + + // Check stage progress + findAll(cssSelector(""".stage-progress-cell""")).map(_.text).toSeq should be + (List("1/1", "1/1", "1/1", "0/1 (1 failed)")) + + // Check job progress + findAll(cssSelector(""".progress-cell""")).map(_.text).toSeq should be + (List("1/1", "1/1", "1/1", "0/1 (1 failed)")) + + // Check stacktrace + val errorCells = findAll(cssSelector(""".stacktrace-details""")).map(_.text).toSeq + errorCells should have size 1 + errorCells(0) should include("java.lang.RuntimeException: Oops") + + // Check the job link in the batch page is right + go to (jobLinks(0)) + val jobDetails = findAll(cssSelector("li strong")).map(_.text).toSeq + jobDetails should contain("Status:") + jobDetails should contain("Completed Stages:") + + // Check a batch page without id + go to (sparkUI.appUIAddress.stripSuffix("/") + "/streaming/batch/") + webDriver.getPageSource should include ("Missing id parameter") + + // Check a non-exist batch + go to (sparkUI.appUIAddress.stripSuffix("/") + "/streaming/batch/?id=12345") + webDriver.getPageSource should include ("does not exist") } ssc.stop(false) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala index 94b1985116feb..fa89536de4054 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala @@ -17,8 +17,11 @@ package org.apache.spark.streaming.ui +import java.util.Properties + import org.scalatest.Matchers +import org.apache.spark.scheduler.SparkListenerJobStart import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.scheduler._ import org.apache.spark.streaming.{Duration, Time, Milliseconds, TestSuiteBase} @@ -28,6 +31,17 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { val input = (1 to 4).map(Seq(_)).toSeq val operation = (d: DStream[Int]) => d.map(x => x) + private def createJobStart( + batchTime: Time, outputOpId: Int, jobId: Int): SparkListenerJobStart = { + val properties = new Properties() + properties.setProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, batchTime.milliseconds.toString) + properties.setProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, outputOpId.toString) + SparkListenerJobStart(jobId = jobId, + 0L, // unused + Nil, // unused + properties) + } + override def batchDuration: Duration = Milliseconds(100) test("onBatchSubmitted, onBatchStarted, onBatchCompleted, " + @@ -43,7 +57,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { // onBatchSubmitted val batchInfoSubmitted = BatchInfo(Time(1000), receivedBlockInfo, 1000, None, None) listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted)) - listener.waitingBatches should be (List(batchInfoSubmitted)) + listener.waitingBatches should be (List(BatchUIData(batchInfoSubmitted))) listener.runningBatches should be (Nil) listener.retainedCompletedBatches should be (Nil) listener.lastCompletedBatch should be (None) @@ -56,7 +70,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { val batchInfoStarted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None) listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted)) listener.waitingBatches should be (Nil) - listener.runningBatches should be (List(batchInfoStarted)) + listener.runningBatches should be (List(BatchUIData(batchInfoStarted))) listener.retainedCompletedBatches should be (Nil) listener.lastCompletedBatch should be (None) listener.numUnprocessedBatches should be (1) @@ -64,13 +78,40 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { listener.numTotalProcessedRecords should be (0) listener.numTotalReceivedRecords should be (600) + // onJobStart + val jobStart1 = createJobStart(Time(1000), outputOpId = 0, jobId = 0) + listener.onJobStart(jobStart1) + + val jobStart2 = createJobStart(Time(1000), outputOpId = 0, jobId = 1) + listener.onJobStart(jobStart2) + + val jobStart3 = createJobStart(Time(1000), outputOpId = 1, jobId = 0) + listener.onJobStart(jobStart3) + + val jobStart4 = createJobStart(Time(1000), outputOpId = 1, jobId = 1) + listener.onJobStart(jobStart4) + + val batchUIData = listener.getBatchUIData(Time(1000)) + batchUIData should not be None + batchUIData.get.batchTime should be (batchInfoStarted.batchTime) + batchUIData.get.schedulingDelay should be (batchInfoStarted.schedulingDelay) + batchUIData.get.processingDelay should be (batchInfoStarted.processingDelay) + batchUIData.get.totalDelay should be (batchInfoStarted.totalDelay) + batchUIData.get.receiverNumRecords should be (Map(0 -> 300L, 1 -> 300L)) + batchUIData.get.numRecords should be(600) + batchUIData.get.outputOpIdSparkJobIdPairs should be + Seq(OutputOpIdAndSparkJobId(0, 0), + OutputOpIdAndSparkJobId(0, 1), + OutputOpIdAndSparkJobId(1, 0), + OutputOpIdAndSparkJobId(1, 1)) + // onBatchCompleted val batchInfoCompleted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None) listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted)) listener.waitingBatches should be (Nil) listener.runningBatches should be (Nil) - listener.retainedCompletedBatches should be (List(batchInfoCompleted)) - listener.lastCompletedBatch should be (Some(batchInfoCompleted)) + listener.retainedCompletedBatches should be (List(BatchUIData(batchInfoCompleted))) + listener.lastCompletedBatch should be (Some(BatchUIData(batchInfoCompleted))) listener.numUnprocessedBatches should be (0) listener.numTotalCompletedBatches should be (1) listener.numTotalProcessedRecords should be (600) @@ -116,4 +157,55 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { listener.retainedCompletedBatches.size should be (limit) listener.numTotalCompletedBatches should be(limit + 10) } + + test("out-of-order onJobStart and onBatchXXX") { + val ssc = setupStreams(input, operation) + val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100) + val listener = new StreamingJobProgressListener(ssc) + + // fulfill completedBatchInfos + for(i <- 0 until limit) { + val batchInfoCompleted = + BatchInfo(Time(1000 + i * 100), Map.empty, 1000 + i * 100, Some(2000 + i * 100), None) + listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted)) + val jobStart = createJobStart(Time(1000 + i * 100), outputOpId = 0, jobId = 1) + listener.onJobStart(jobStart) + } + + // onJobStart happens before onBatchSubmitted + val jobStart = createJobStart(Time(1000 + limit * 100), outputOpId = 0, jobId = 0) + listener.onJobStart(jobStart) + + val batchInfoSubmitted = + BatchInfo(Time(1000 + limit * 100), Map.empty, (1000 + limit * 100), None, None) + listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted)) + + // We still can see the info retrieved from onJobStart + val batchUIData = listener.getBatchUIData(Time(1000 + limit * 100)) + batchUIData should not be None + batchUIData.get.batchTime should be (batchInfoSubmitted.batchTime) + batchUIData.get.schedulingDelay should be (batchInfoSubmitted.schedulingDelay) + batchUIData.get.processingDelay should be (batchInfoSubmitted.processingDelay) + batchUIData.get.totalDelay should be (batchInfoSubmitted.totalDelay) + batchUIData.get.receiverNumRecords should be (Map.empty) + batchUIData.get.numRecords should be (0) + batchUIData.get.outputOpIdSparkJobIdPairs should be (Seq(OutputOpIdAndSparkJobId(0, 0))) + + // A lot of "onBatchCompleted"s happen before "onJobStart" + for(i <- limit + 1 to limit * 2) { + val batchInfoCompleted = + BatchInfo(Time(1000 + i * 100), Map.empty, 1000 + i * 100, Some(2000 + i * 100), None) + listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted)) + } + + for(i <- limit + 1 to limit * 2) { + val jobStart = createJobStart(Time(1000 + i * 100), outputOpId = 0, jobId = 1) + listener.onJobStart(jobStart) + } + + // We should not leak memory + listener.batchTimeToOutputOpIdSparkJobIdPair.size() should be <= + (listener.waitingBatches.size + listener.runningBatches.size + + listener.retainedCompletedBatches.size + 10) + } }