Skip to content

Commit

Permalink
Other minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
pwendell committed Aug 5, 2014
1 parent cc43f68 commit 93fbe0f
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -924,9 +924,9 @@ class DAGScheduler(
AccumulableInfo(id, name, Some(stringPartialValue), stringValue)
}
}
listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
event.taskMetrics))
}
listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
event.taskMetrics))
stage.pendingTasks -= task
task match {
case rt: ResultTask[_, _] =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized {
val stage = stageCompleted.stageInfo
val stageId = stage.stageId

val stageData = stageIdToData.getOrElseUpdate(stageId, {
logWarning("Stage completed for unknown stage " + stageId)
new StageUIData
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,16 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
Some(UIUtils.listingTable(quantileHeaders, quantileRow, listings, fixedWidth = true))
}
val executorTable = new ExecutorTable(stageId, parent)

val maybeAccumulableTable: Seq[Node] =
if (accumulables.size > 0) { <h4>Accumulators</h4> ++ accumulableTable } else Seq()

val content =
summary ++
<h4>Summary Metrics for {numCompleted} Completed Tasks</h4> ++
<div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++
<h4>Aggregated Metrics by Executor</h4> ++ executorTable.toNodeSeq ++
<h4>Accumulators</h4> ++ accumulableTable ++
maybeAccumulableTable ++
<h4>Tasks</h4> ++ taskTable

UIUtils.headerSparkPage(content, basePath, appName, "Details for Stage %d".format(stageId),
Expand Down
5 changes: 3 additions & 2 deletions docs/programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1174,8 +1174,9 @@ value of the broadcast variable (e.g. if the variable is shipped to a new node l
Accumulators are variables that are only "added" to through an associative operation and can
therefore be efficiently supported in parallel. They can be used to implement counters (as in
MapReduce) or sums. Spark natively supports accumulators of numeric types, and programmers
can add support for new types. Accumulator values are displayed in Spark's UI and can be
useful for understanding the progress of running stages.
can add support for new types. If accumulators are created with a name, they will be
displayed in Spark's UI. This can can be useful for understanding the progress of
running stages (NOTE: this is not yet supported in Python).

An accumulator is created from an initial value `v` by calling `SparkContext.accumulator(v)`. Tasks
running on the cluster can then add to it using the `add` method or the `+=` operator (in Scala and Python).
Expand Down

0 comments on commit 93fbe0f

Please sign in to comment.