Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27071][CORE] Expose additional metrics in status.api.v1.StageData #24011

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 47 additions & 34 deletions core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -462,40 +462,53 @@ private[spark] class AppStatusStore(
.toMap

new v1.StageData(
stage.status,
stage.stageId,
stage.attemptId,
stage.numTasks,
stage.numActiveTasks,
stage.numCompleteTasks,
stage.numFailedTasks,
stage.numKilledTasks,
stage.numCompletedIndices,
stage.executorRunTime,
stage.executorCpuTime,
stage.submissionTime,
stage.firstTaskLaunchedTime,
stage.completionTime,
stage.failureReason,
stage.inputBytes,
stage.inputRecords,
stage.outputBytes,
stage.outputRecords,
stage.shuffleReadBytes,
stage.shuffleReadRecords,
stage.shuffleWriteBytes,
stage.shuffleWriteRecords,
stage.memoryBytesSpilled,
stage.diskBytesSpilled,
stage.name,
stage.description,
stage.details,
stage.schedulingPool,
stage.rddIds,
stage.accumulatorUpdates,
Some(tasks),
Some(executorSummary(stage.stageId, stage.attemptId)),
stage.killedTasksSummary)
status = stage.status,
stageId = stage.stageId,
attemptId = stage.attemptId,
numTasks = stage.numTasks,
numActiveTasks = stage.numActiveTasks,
numCompleteTasks = stage.numCompleteTasks,
numFailedTasks = stage.numFailedTasks,
numKilledTasks = stage.numKilledTasks,
numCompletedIndices = stage.numCompletedIndices,
submissionTime = stage.submissionTime,
firstTaskLaunchedTime = stage.firstTaskLaunchedTime,
completionTime = stage.completionTime,
failureReason = stage.failureReason,
executorDeserializeTime = stage.executorDeserializeTime,
executorDeserializeCpuTime = stage.executorDeserializeCpuTime,
executorRunTime = stage.executorRunTime,
executorCpuTime = stage.executorCpuTime,
resultSize = stage.resultSize,
jvmGcTime = stage.jvmGcTime,
resultSerializationTime = stage.resultSerializationTime,
memoryBytesSpilled = stage.memoryBytesSpilled,
diskBytesSpilled = stage.diskBytesSpilled,
peakExecutionMemory = stage.peakExecutionMemory,
inputBytes = stage.inputBytes,
inputRecords = stage.inputRecords,
outputBytes = stage.outputBytes,
outputRecords = stage.outputRecords,
shuffleRemoteBlocksFetched = stage.shuffleRemoteBlocksFetched,
shuffleLocalBlocksFetched = stage.shuffleLocalBlocksFetched,
shuffleFetchWaitTime = stage.shuffleFetchWaitTime,
shuffleRemoteBytesRead = stage.shuffleRemoteBytesRead,
shuffleRemoteBytesReadToDisk = stage.shuffleRemoteBytesReadToDisk,
shuffleLocalBytesRead = stage.shuffleLocalBytesRead,
shuffleReadBytes = stage.shuffleReadBytes,
shuffleReadRecords = stage.shuffleReadRecords,
shuffleWriteBytes = stage.shuffleWriteBytes,
shuffleWriteTime = stage.shuffleWriteTime,
shuffleWriteRecords = stage.shuffleWriteRecords,
name = stage.name,
description = stage.description,
details = stage.details,
schedulingPool = stage.schedulingPool,
rddIds = stage.rddIds,
accumulatorUpdates = stage.accumulatorUpdates,
tasks = Some(tasks),
executorSummary = Some(executorSummary(stage.stageId, stage.attemptId)),
killedTasksSummary = stage.killedTasksSummary)
}

def rdd(rddId: Int): v1.RDDStorageInfo = {
Expand Down
92 changes: 53 additions & 39 deletions core/src/main/scala/org/apache/spark/status/LiveEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -394,45 +394,59 @@ private class LiveStage extends LiveEntity {

def toApi(): v1.StageData = {
new v1.StageData(
status,
info.stageId,
info.attemptNumber,

info.numTasks,
activeTasks,
completedTasks,
failedTasks,
killedTasks,
completedIndices.size,

metrics.executorRunTime,
metrics.executorCpuTime,
info.submissionTime.map(new Date(_)),
if (firstLaunchTime < Long.MaxValue) Some(new Date(firstLaunchTime)) else None,
info.completionTime.map(new Date(_)),
info.failureReason,

metrics.inputMetrics.bytesRead,
metrics.inputMetrics.recordsRead,
metrics.outputMetrics.bytesWritten,
metrics.outputMetrics.recordsWritten,
metrics.shuffleReadMetrics.localBytesRead + metrics.shuffleReadMetrics.remoteBytesRead,
metrics.shuffleReadMetrics.recordsRead,
metrics.shuffleWriteMetrics.bytesWritten,
metrics.shuffleWriteMetrics.recordsWritten,
metrics.memoryBytesSpilled,
metrics.diskBytesSpilled,

info.name,
description,
info.details,
schedulingPool,

info.rddInfos.map(_.id),
newAccumulatorInfos(info.accumulables.values),
None,
None,
killedSummary)
status = status,
stageId = info.stageId,
attemptId = info.attemptNumber,
numTasks = info.numTasks,
numActiveTasks = activeTasks,
numCompleteTasks = completedTasks,
numFailedTasks = failedTasks,
numKilledTasks = killedTasks,
numCompletedIndices = completedIndices.size,

submissionTime = info.submissionTime.map(new Date(_)),
firstTaskLaunchedTime =
if (firstLaunchTime < Long.MaxValue) Some(new Date(firstLaunchTime)) else None,
completionTime = info.completionTime.map(new Date(_)),
failureReason = info.failureReason,

executorDeserializeTime = metrics.executorDeserializeTime,
executorDeserializeCpuTime = metrics.executorDeserializeCpuTime,
executorRunTime = metrics.executorRunTime,
executorCpuTime = metrics.executorCpuTime,
resultSize = metrics.resultSize,
jvmGcTime = metrics.jvmGcTime,
resultSerializationTime = metrics.resultSerializationTime,
memoryBytesSpilled = metrics.memoryBytesSpilled,
diskBytesSpilled = metrics.diskBytesSpilled,
peakExecutionMemory = metrics.peakExecutionMemory,
inputBytes = metrics.inputMetrics.bytesRead,
inputRecords = metrics.inputMetrics.recordsRead,
outputBytes = metrics.outputMetrics.bytesWritten,
outputRecords = metrics.outputMetrics.recordsWritten,
shuffleRemoteBlocksFetched = metrics.shuffleReadMetrics.remoteBlocksFetched,
shuffleLocalBlocksFetched = metrics.shuffleReadMetrics.localBlocksFetched,
shuffleFetchWaitTime = metrics.shuffleReadMetrics.fetchWaitTime,
shuffleRemoteBytesRead = metrics.shuffleReadMetrics.remoteBytesRead,
shuffleRemoteBytesReadToDisk = metrics.shuffleReadMetrics.remoteBytesReadToDisk,
shuffleLocalBytesRead = metrics.shuffleReadMetrics.localBytesRead,
shuffleReadBytes =
metrics.shuffleReadMetrics.localBytesRead + metrics.shuffleReadMetrics.remoteBytesRead,
shuffleReadRecords = metrics.shuffleReadMetrics.recordsRead,
shuffleWriteBytes = metrics.shuffleWriteMetrics.bytesWritten,
shuffleWriteTime = metrics.shuffleWriteMetrics.writeTime,
shuffleWriteRecords = metrics.shuffleWriteMetrics.recordsWritten,

name = info.name,
description = description,
details = info.details,
schedulingPool = schedulingPool,

rddIds = info.rddInfos.map(_.id),
accumulatorUpdates = newAccumulatorInfos(info.accumulables.values),
tasks = None,
executorSummary = None,
killedTasksSummary = killedSummary)
}

override protected def doUpdate(): Any = {
Expand Down
21 changes: 17 additions & 4 deletions core/src/main/scala/org/apache/spark/status/api/v1/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -210,23 +210,36 @@ class StageData private[spark](
val numKilledTasks: Int,
val numCompletedIndices: Int,

val executorRunTime: Long,
val executorCpuTime: Long,
val submissionTime: Option[Date],
val firstTaskLaunchedTime: Option[Date],
val completionTime: Option[Date],
val failureReason: Option[String],

val executorDeserializeTime: Long,
val executorDeserializeCpuTime: Long,
val executorRunTime: Long,
val executorCpuTime: Long,
val resultSize: Long,
val jvmGcTime: Long,
val resultSerializationTime: Long,
val memoryBytesSpilled: Long,
val diskBytesSpilled: Long,
val peakExecutionMemory: Long,
val inputBytes: Long,
val inputRecords: Long,
val outputBytes: Long,
val outputRecords: Long,
val shuffleRemoteBlocksFetched: Long,
val shuffleLocalBlocksFetched: Long,
val shuffleFetchWaitTime: Long,
val shuffleRemoteBytesRead: Long,
val shuffleRemoteBytesReadToDisk: Long,
val shuffleLocalBytesRead: Long,
val shuffleReadBytes: Long,
val shuffleReadRecords: Long,
val shuffleWriteBytes: Long,
val shuffleWriteTime: Long,
val shuffleWriteRecords: Long,
val memoryBytesSpilled: Long,
val diskBytesSpilled: Long,

val name: String,
val description: Option[String],
Expand Down
65 changes: 51 additions & 14 deletions core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -203,20 +203,57 @@ private[ui] class JobPage(parent: JobsTab, store: AppStatusStore) extends WebUIP
// stage or if the stage information has been garbage collected
store.asOption(store.lastStageAttempt(stageId)).getOrElse {
new v1.StageData(
v1.StageStatus.PENDING,
stageId,
0, 0, 0, 0, 0, 0, 0,
0L, 0L, None, None, None, None,
0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L,
"Unknown",
None,
"Unknown",
null,
Nil,
Nil,
None,
None,
Map())
status = v1.StageStatus.PENDING,
stageId = stageId,
attemptId = 0,
numTasks = 0,
numActiveTasks = 0,
numCompleteTasks = 0,
numFailedTasks = 0,
numKilledTasks = 0,
numCompletedIndices = 0,

submissionTime = None,
firstTaskLaunchedTime = None,
completionTime = None,
failureReason = None,

executorDeserializeTime = 0L,
executorDeserializeCpuTime = 0L,
executorRunTime = 0L,
executorCpuTime = 0L,
resultSize = 0L,
jvmGcTime = 0L,
resultSerializationTime = 0L,
memoryBytesSpilled = 0L,
diskBytesSpilled = 0L,
peakExecutionMemory = 0L,
inputBytes = 0L,
inputRecords = 0L,
outputBytes = 0L,
outputRecords = 0L,
shuffleRemoteBlocksFetched = 0L,
shuffleLocalBlocksFetched = 0L,
shuffleFetchWaitTime = 0L,
shuffleRemoteBytesRead = 0L,
shuffleRemoteBytesReadToDisk = 0L,
shuffleLocalBytesRead = 0L,
shuffleReadBytes = 0L,
shuffleReadRecords = 0L,
shuffleWriteBytes = 0L,
shuffleWriteTime = 0L,
shuffleWriteRecords = 0L,

name = "Unknown",
description = None,
details = "Unknown",
schedulingPool = null,

rddIds = Nil,
accumulatorUpdates = Nil,
tasks = None,
executorSummary = None,
killedTasksSummary = Map())
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,34 @@
"numFailedTasks" : 2,
"numKilledTasks" : 0,
"numCompletedIndices" : 10,
"executorRunTime" : 761,
"executorCpuTime" : 269916000,
"submissionTime" : "2018-01-09T10:21:18.152GMT",
"firstTaskLaunchedTime" : "2018-01-09T10:21:18.347GMT",
"completionTime" : "2018-01-09T10:21:19.062GMT",
"executorDeserializeTime" : 327,
"executorDeserializeCpuTime" : 225900000,
"executorRunTime" : 761,
"executorCpuTime" : 269916000,
"resultSize" : 10376,
"jvmGcTime" : 27,
"resultSerializationTime" : 1,
"memoryBytesSpilled" : 0,
"diskBytesSpilled" : 0,
"peakExecutionMemory" : 0,
"inputBytes" : 0,
"inputRecords" : 0,
"outputBytes" : 0,
"outputRecords" : 0,
"shuffleRemoteBlocksFetched" : 0,
"shuffleLocalBlocksFetched" : 0,
"shuffleFetchWaitTime" : 0,
"shuffleRemoteBytesRead" : 0,
"shuffleRemoteBytesReadToDisk" : 0,
"shuffleLocalBytesRead" : 0,
"shuffleReadBytes" : 0,
"shuffleReadRecords" : 0,
"shuffleWriteBytes" : 460,
"shuffleWriteTime" : 8711515,
"shuffleWriteRecords" : 10,
"memoryBytesSpilled" : 0,
"diskBytesSpilled" : 0,
"name" : "map at <console>:26",
"details" : "org.apache.spark.rdd.RDD.map(RDD.scala:370)\n$line17.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:26)\n$line17.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:34)\n$line17.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:36)\n$line17.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:38)\n$line17.$read$$iw$$iw$$iw$$iw.<init>(<console>:40)\n$line17.$read$$iw$$iw$$iw.<init>(<console>:42)\n$line17.$read$$iw$$iw.<init>(<console>:44)\n$line17.$read$$iw.<init>(<console>:46)\n$line17.$read.<init>(<console>:48)\n$line17.$read$.<init>(<console>:52)\n$line17.$read$.<clinit>(<console>)\n$line17.$eval$.$print$lzycompute(<console>:7)\n$line17.$eval$.$print(<console>:6)\n$line17.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:498)\nscala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)",
"schedulingPool" : "default",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,34 @@
"numFailedTasks" : 4,
"numKilledTasks" : 0,
"numCompletedIndices" : 10,
"executorRunTime" : 5080,
"executorCpuTime" : 1163210819,
"submissionTime" : "2018-01-18T18:33:12.658GMT",
"firstTaskLaunchedTime" : "2018-01-18T18:33:12.816GMT",
"completionTime" : "2018-01-18T18:33:15.279GMT",
"executorDeserializeTime" : 3679,
"executorDeserializeCpuTime" : 1029819716,
"executorRunTime" : 5080,
"executorCpuTime" : 1163210819,
"resultSize" : 10824,
"jvmGcTime" : 370,
"resultSerializationTime" : 5,
"memoryBytesSpilled" : 0,
"diskBytesSpilled" : 0,
"peakExecutionMemory" : 0,
"inputBytes" : 0,
"inputRecords" : 0,
"outputBytes" : 0,
"outputRecords" : 0,
"shuffleRemoteBlocksFetched" : 0,
"shuffleLocalBlocksFetched" : 0,
"shuffleFetchWaitTime" : 0,
"shuffleRemoteBytesRead" : 0,
"shuffleRemoteBytesReadToDisk" : 0,
"shuffleLocalBytesRead" : 0,
"shuffleReadBytes" : 0,
"shuffleReadRecords" : 0,
"shuffleWriteBytes" : 1461,
"shuffleWriteTime" : 33251697,
"shuffleWriteRecords" : 30,
"memoryBytesSpilled" : 0,
"diskBytesSpilled" : 0,
"name" : "map at <console>:27",
"details" : "org.apache.spark.rdd.RDD.map(RDD.scala:370)\n$line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:27)\n$line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:35)\n$line15.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:37)\n$line15.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:39)\n$line15.$read$$iw$$iw$$iw$$iw.<init>(<console>:41)\n$line15.$read$$iw$$iw$$iw.<init>(<console>:43)\n$line15.$read$$iw$$iw.<init>(<console>:45)\n$line15.$read$$iw.<init>(<console>:47)\n$line15.$read.<init>(<console>:49)\n$line15.$read$.<init>(<console>:53)\n$line15.$read$.<clinit>(<console>)\n$line15.$eval$.$print$lzycompute(<console>:7)\n$line15.$eval$.$print(<console>:6)\n$line15.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:498)\nscala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)",
"schedulingPool" : "default",
Expand Down Expand Up @@ -864,4 +877,4 @@
}
},
"killedTasksSummary" : { }
}
}
Loading