Skip to content

Commit

Permalink
[SPARK-27071][CORE] Expose additional metrics in status.api.v1.StageData
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This PR exposes additional metrics in `status.api.v1.StageData`. These metrics were already computed for `LiveStage`, but they were never exposed to the user. This includes extra metrics about the JVM GC, executor (de)serialization times, shuffle reads and writes, and more.

## How was this patch tested?

Existing tests.

cc hvanhovell

Closes #24011 from tomvanbussel/SPARK-27071.

Authored-by: Tom van Bussel <tom.vanbussel@databricks.com>
Signed-off-by: herman <herman@databricks.com>
  • Loading branch information
tomvanbussel authored and hvanhovell committed May 27, 2019
1 parent 32461d4 commit 00a8c85
Show file tree
Hide file tree
Showing 14 changed files with 424 additions and 152 deletions.
81 changes: 47 additions & 34 deletions core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -462,40 +462,53 @@ private[spark] class AppStatusStore(
.toMap

new v1.StageData(
stage.status,
stage.stageId,
stage.attemptId,
stage.numTasks,
stage.numActiveTasks,
stage.numCompleteTasks,
stage.numFailedTasks,
stage.numKilledTasks,
stage.numCompletedIndices,
stage.executorRunTime,
stage.executorCpuTime,
stage.submissionTime,
stage.firstTaskLaunchedTime,
stage.completionTime,
stage.failureReason,
stage.inputBytes,
stage.inputRecords,
stage.outputBytes,
stage.outputRecords,
stage.shuffleReadBytes,
stage.shuffleReadRecords,
stage.shuffleWriteBytes,
stage.shuffleWriteRecords,
stage.memoryBytesSpilled,
stage.diskBytesSpilled,
stage.name,
stage.description,
stage.details,
stage.schedulingPool,
stage.rddIds,
stage.accumulatorUpdates,
Some(tasks),
Some(executorSummary(stage.stageId, stage.attemptId)),
stage.killedTasksSummary)
status = stage.status,
stageId = stage.stageId,
attemptId = stage.attemptId,
numTasks = stage.numTasks,
numActiveTasks = stage.numActiveTasks,
numCompleteTasks = stage.numCompleteTasks,
numFailedTasks = stage.numFailedTasks,
numKilledTasks = stage.numKilledTasks,
numCompletedIndices = stage.numCompletedIndices,
submissionTime = stage.submissionTime,
firstTaskLaunchedTime = stage.firstTaskLaunchedTime,
completionTime = stage.completionTime,
failureReason = stage.failureReason,
executorDeserializeTime = stage.executorDeserializeTime,
executorDeserializeCpuTime = stage.executorDeserializeCpuTime,
executorRunTime = stage.executorRunTime,
executorCpuTime = stage.executorCpuTime,
resultSize = stage.resultSize,
jvmGcTime = stage.jvmGcTime,
resultSerializationTime = stage.resultSerializationTime,
memoryBytesSpilled = stage.memoryBytesSpilled,
diskBytesSpilled = stage.diskBytesSpilled,
peakExecutionMemory = stage.peakExecutionMemory,
inputBytes = stage.inputBytes,
inputRecords = stage.inputRecords,
outputBytes = stage.outputBytes,
outputRecords = stage.outputRecords,
shuffleRemoteBlocksFetched = stage.shuffleRemoteBlocksFetched,
shuffleLocalBlocksFetched = stage.shuffleLocalBlocksFetched,
shuffleFetchWaitTime = stage.shuffleFetchWaitTime,
shuffleRemoteBytesRead = stage.shuffleRemoteBytesRead,
shuffleRemoteBytesReadToDisk = stage.shuffleRemoteBytesReadToDisk,
shuffleLocalBytesRead = stage.shuffleLocalBytesRead,
shuffleReadBytes = stage.shuffleReadBytes,
shuffleReadRecords = stage.shuffleReadRecords,
shuffleWriteBytes = stage.shuffleWriteBytes,
shuffleWriteTime = stage.shuffleWriteTime,
shuffleWriteRecords = stage.shuffleWriteRecords,
name = stage.name,
description = stage.description,
details = stage.details,
schedulingPool = stage.schedulingPool,
rddIds = stage.rddIds,
accumulatorUpdates = stage.accumulatorUpdates,
tasks = Some(tasks),
executorSummary = Some(executorSummary(stage.stageId, stage.attemptId)),
killedTasksSummary = stage.killedTasksSummary)
}

def rdd(rddId: Int): v1.RDDStorageInfo = {
Expand Down
92 changes: 53 additions & 39 deletions core/src/main/scala/org/apache/spark/status/LiveEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -394,45 +394,59 @@ private class LiveStage extends LiveEntity {

def toApi(): v1.StageData = {
new v1.StageData(
status,
info.stageId,
info.attemptNumber,

info.numTasks,
activeTasks,
completedTasks,
failedTasks,
killedTasks,
completedIndices.size,

metrics.executorRunTime,
metrics.executorCpuTime,
info.submissionTime.map(new Date(_)),
if (firstLaunchTime < Long.MaxValue) Some(new Date(firstLaunchTime)) else None,
info.completionTime.map(new Date(_)),
info.failureReason,

metrics.inputMetrics.bytesRead,
metrics.inputMetrics.recordsRead,
metrics.outputMetrics.bytesWritten,
metrics.outputMetrics.recordsWritten,
metrics.shuffleReadMetrics.localBytesRead + metrics.shuffleReadMetrics.remoteBytesRead,
metrics.shuffleReadMetrics.recordsRead,
metrics.shuffleWriteMetrics.bytesWritten,
metrics.shuffleWriteMetrics.recordsWritten,
metrics.memoryBytesSpilled,
metrics.diskBytesSpilled,

info.name,
description,
info.details,
schedulingPool,

info.rddInfos.map(_.id),
newAccumulatorInfos(info.accumulables.values),
None,
None,
killedSummary)
status = status,
stageId = info.stageId,
attemptId = info.attemptNumber,
numTasks = info.numTasks,
numActiveTasks = activeTasks,
numCompleteTasks = completedTasks,
numFailedTasks = failedTasks,
numKilledTasks = killedTasks,
numCompletedIndices = completedIndices.size,

submissionTime = info.submissionTime.map(new Date(_)),
firstTaskLaunchedTime =
if (firstLaunchTime < Long.MaxValue) Some(new Date(firstLaunchTime)) else None,
completionTime = info.completionTime.map(new Date(_)),
failureReason = info.failureReason,

executorDeserializeTime = metrics.executorDeserializeTime,
executorDeserializeCpuTime = metrics.executorDeserializeCpuTime,
executorRunTime = metrics.executorRunTime,
executorCpuTime = metrics.executorCpuTime,
resultSize = metrics.resultSize,
jvmGcTime = metrics.jvmGcTime,
resultSerializationTime = metrics.resultSerializationTime,
memoryBytesSpilled = metrics.memoryBytesSpilled,
diskBytesSpilled = metrics.diskBytesSpilled,
peakExecutionMemory = metrics.peakExecutionMemory,
inputBytes = metrics.inputMetrics.bytesRead,
inputRecords = metrics.inputMetrics.recordsRead,
outputBytes = metrics.outputMetrics.bytesWritten,
outputRecords = metrics.outputMetrics.recordsWritten,
shuffleRemoteBlocksFetched = metrics.shuffleReadMetrics.remoteBlocksFetched,
shuffleLocalBlocksFetched = metrics.shuffleReadMetrics.localBlocksFetched,
shuffleFetchWaitTime = metrics.shuffleReadMetrics.fetchWaitTime,
shuffleRemoteBytesRead = metrics.shuffleReadMetrics.remoteBytesRead,
shuffleRemoteBytesReadToDisk = metrics.shuffleReadMetrics.remoteBytesReadToDisk,
shuffleLocalBytesRead = metrics.shuffleReadMetrics.localBytesRead,
shuffleReadBytes =
metrics.shuffleReadMetrics.localBytesRead + metrics.shuffleReadMetrics.remoteBytesRead,
shuffleReadRecords = metrics.shuffleReadMetrics.recordsRead,
shuffleWriteBytes = metrics.shuffleWriteMetrics.bytesWritten,
shuffleWriteTime = metrics.shuffleWriteMetrics.writeTime,
shuffleWriteRecords = metrics.shuffleWriteMetrics.recordsWritten,

name = info.name,
description = description,
details = info.details,
schedulingPool = schedulingPool,

rddIds = info.rddInfos.map(_.id),
accumulatorUpdates = newAccumulatorInfos(info.accumulables.values),
tasks = None,
executorSummary = None,
killedTasksSummary = killedSummary)
}

override protected def doUpdate(): Any = {
Expand Down
21 changes: 17 additions & 4 deletions core/src/main/scala/org/apache/spark/status/api/v1/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -210,23 +210,36 @@ class StageData private[spark](
val numKilledTasks: Int,
val numCompletedIndices: Int,

val executorRunTime: Long,
val executorCpuTime: Long,
val submissionTime: Option[Date],
val firstTaskLaunchedTime: Option[Date],
val completionTime: Option[Date],
val failureReason: Option[String],

val executorDeserializeTime: Long,
val executorDeserializeCpuTime: Long,
val executorRunTime: Long,
val executorCpuTime: Long,
val resultSize: Long,
val jvmGcTime: Long,
val resultSerializationTime: Long,
val memoryBytesSpilled: Long,
val diskBytesSpilled: Long,
val peakExecutionMemory: Long,
val inputBytes: Long,
val inputRecords: Long,
val outputBytes: Long,
val outputRecords: Long,
val shuffleRemoteBlocksFetched: Long,
val shuffleLocalBlocksFetched: Long,
val shuffleFetchWaitTime: Long,
val shuffleRemoteBytesRead: Long,
val shuffleRemoteBytesReadToDisk: Long,
val shuffleLocalBytesRead: Long,
val shuffleReadBytes: Long,
val shuffleReadRecords: Long,
val shuffleWriteBytes: Long,
val shuffleWriteTime: Long,
val shuffleWriteRecords: Long,
val memoryBytesSpilled: Long,
val diskBytesSpilled: Long,

val name: String,
val description: Option[String],
Expand Down
65 changes: 51 additions & 14 deletions core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -203,20 +203,57 @@ private[ui] class JobPage(parent: JobsTab, store: AppStatusStore) extends WebUIP
// stage or if the stage information has been garbage collected
store.asOption(store.lastStageAttempt(stageId)).getOrElse {
new v1.StageData(
v1.StageStatus.PENDING,
stageId,
0, 0, 0, 0, 0, 0, 0,
0L, 0L, None, None, None, None,
0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L,
"Unknown",
None,
"Unknown",
null,
Nil,
Nil,
None,
None,
Map())
status = v1.StageStatus.PENDING,
stageId = stageId,
attemptId = 0,
numTasks = 0,
numActiveTasks = 0,
numCompleteTasks = 0,
numFailedTasks = 0,
numKilledTasks = 0,
numCompletedIndices = 0,

submissionTime = None,
firstTaskLaunchedTime = None,
completionTime = None,
failureReason = None,

executorDeserializeTime = 0L,
executorDeserializeCpuTime = 0L,
executorRunTime = 0L,
executorCpuTime = 0L,
resultSize = 0L,
jvmGcTime = 0L,
resultSerializationTime = 0L,
memoryBytesSpilled = 0L,
diskBytesSpilled = 0L,
peakExecutionMemory = 0L,
inputBytes = 0L,
inputRecords = 0L,
outputBytes = 0L,
outputRecords = 0L,
shuffleRemoteBlocksFetched = 0L,
shuffleLocalBlocksFetched = 0L,
shuffleFetchWaitTime = 0L,
shuffleRemoteBytesRead = 0L,
shuffleRemoteBytesReadToDisk = 0L,
shuffleLocalBytesRead = 0L,
shuffleReadBytes = 0L,
shuffleReadRecords = 0L,
shuffleWriteBytes = 0L,
shuffleWriteTime = 0L,
shuffleWriteRecords = 0L,

name = "Unknown",
description = None,
details = "Unknown",
schedulingPool = null,

rddIds = Nil,
accumulatorUpdates = Nil,
tasks = None,
executorSummary = None,
killedTasksSummary = Map())
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,34 @@
"numFailedTasks" : 2,
"numKilledTasks" : 0,
"numCompletedIndices" : 10,
"executorRunTime" : 761,
"executorCpuTime" : 269916000,
"submissionTime" : "2018-01-09T10:21:18.152GMT",
"firstTaskLaunchedTime" : "2018-01-09T10:21:18.347GMT",
"completionTime" : "2018-01-09T10:21:19.062GMT",
"executorDeserializeTime" : 327,
"executorDeserializeCpuTime" : 225900000,
"executorRunTime" : 761,
"executorCpuTime" : 269916000,
"resultSize" : 10376,
"jvmGcTime" : 27,
"resultSerializationTime" : 1,
"memoryBytesSpilled" : 0,
"diskBytesSpilled" : 0,
"peakExecutionMemory" : 0,
"inputBytes" : 0,
"inputRecords" : 0,
"outputBytes" : 0,
"outputRecords" : 0,
"shuffleRemoteBlocksFetched" : 0,
"shuffleLocalBlocksFetched" : 0,
"shuffleFetchWaitTime" : 0,
"shuffleRemoteBytesRead" : 0,
"shuffleRemoteBytesReadToDisk" : 0,
"shuffleLocalBytesRead" : 0,
"shuffleReadBytes" : 0,
"shuffleReadRecords" : 0,
"shuffleWriteBytes" : 460,
"shuffleWriteTime" : 8711515,
"shuffleWriteRecords" : 10,
"memoryBytesSpilled" : 0,
"diskBytesSpilled" : 0,
"name" : "map at <console>:26",
"details" : "org.apache.spark.rdd.RDD.map(RDD.scala:370)\n$line17.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:26)\n$line17.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:34)\n$line17.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:36)\n$line17.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:38)\n$line17.$read$$iw$$iw$$iw$$iw.<init>(<console>:40)\n$line17.$read$$iw$$iw$$iw.<init>(<console>:42)\n$line17.$read$$iw$$iw.<init>(<console>:44)\n$line17.$read$$iw.<init>(<console>:46)\n$line17.$read.<init>(<console>:48)\n$line17.$read$.<init>(<console>:52)\n$line17.$read$.<clinit>(<console>)\n$line17.$eval$.$print$lzycompute(<console>:7)\n$line17.$eval$.$print(<console>:6)\n$line17.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:498)\nscala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)",
"schedulingPool" : "default",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,34 @@
"numFailedTasks" : 4,
"numKilledTasks" : 0,
"numCompletedIndices" : 10,
"executorRunTime" : 5080,
"executorCpuTime" : 1163210819,
"submissionTime" : "2018-01-18T18:33:12.658GMT",
"firstTaskLaunchedTime" : "2018-01-18T18:33:12.816GMT",
"completionTime" : "2018-01-18T18:33:15.279GMT",
"executorDeserializeTime" : 3679,
"executorDeserializeCpuTime" : 1029819716,
"executorRunTime" : 5080,
"executorCpuTime" : 1163210819,
"resultSize" : 10824,
"jvmGcTime" : 370,
"resultSerializationTime" : 5,
"memoryBytesSpilled" : 0,
"diskBytesSpilled" : 0,
"peakExecutionMemory" : 0,
"inputBytes" : 0,
"inputRecords" : 0,
"outputBytes" : 0,
"outputRecords" : 0,
"shuffleRemoteBlocksFetched" : 0,
"shuffleLocalBlocksFetched" : 0,
"shuffleFetchWaitTime" : 0,
"shuffleRemoteBytesRead" : 0,
"shuffleRemoteBytesReadToDisk" : 0,
"shuffleLocalBytesRead" : 0,
"shuffleReadBytes" : 0,
"shuffleReadRecords" : 0,
"shuffleWriteBytes" : 1461,
"shuffleWriteTime" : 33251697,
"shuffleWriteRecords" : 30,
"memoryBytesSpilled" : 0,
"diskBytesSpilled" : 0,
"name" : "map at <console>:27",
"details" : "org.apache.spark.rdd.RDD.map(RDD.scala:370)\n$line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:27)\n$line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:35)\n$line15.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:37)\n$line15.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:39)\n$line15.$read$$iw$$iw$$iw$$iw.<init>(<console>:41)\n$line15.$read$$iw$$iw$$iw.<init>(<console>:43)\n$line15.$read$$iw$$iw.<init>(<console>:45)\n$line15.$read$$iw.<init>(<console>:47)\n$line15.$read.<init>(<console>:49)\n$line15.$read$.<init>(<console>:53)\n$line15.$read$.<clinit>(<console>)\n$line15.$eval$.$print$lzycompute(<console>:7)\n$line15.$eval$.$print(<console>:6)\n$line15.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:498)\nscala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)",
"schedulingPool" : "default",
Expand Down Expand Up @@ -864,4 +877,4 @@
}
},
"killedTasksSummary" : { }
}
}
Loading

0 comments on commit 00a8c85

Please sign in to comment.