Skip to content

Commit

Permalink
Inlined TaskMetrics into StageData
Browse files Browse the repository at this point in the history
  • Loading branch information
tomvanbussel committed Apr 1, 2019
1 parent aa6752a commit 4974fa0
Show file tree
Hide file tree
Showing 14 changed files with 424 additions and 152 deletions.
81 changes: 47 additions & 34 deletions core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -462,40 +462,53 @@ private[spark] class AppStatusStore(
.toMap

new v1.StageData(
stage.status,
stage.stageId,
stage.attemptId,
stage.numTasks,
stage.numActiveTasks,
stage.numCompleteTasks,
stage.numFailedTasks,
stage.numKilledTasks,
stage.numCompletedIndices,
stage.executorRunTime,
stage.executorCpuTime,
stage.submissionTime,
stage.firstTaskLaunchedTime,
stage.completionTime,
stage.failureReason,
stage.inputBytes,
stage.inputRecords,
stage.outputBytes,
stage.outputRecords,
stage.shuffleReadBytes,
stage.shuffleReadRecords,
stage.shuffleWriteBytes,
stage.shuffleWriteRecords,
stage.memoryBytesSpilled,
stage.diskBytesSpilled,
stage.name,
stage.description,
stage.details,
stage.schedulingPool,
stage.rddIds,
stage.accumulatorUpdates,
Some(tasks),
Some(executorSummary(stage.stageId, stage.attemptId)),
stage.killedTasksSummary)
status = stage.status,
stageId = stage.stageId,
attemptId = stage.attemptId,
numTasks = stage.numTasks,
numActiveTasks = stage.numActiveTasks,
numCompleteTasks = stage.numCompleteTasks,
numFailedTasks = stage.numFailedTasks,
numKilledTasks = stage.numKilledTasks,
numCompletedIndices = stage.numCompletedIndices,
submissionTime = stage.submissionTime,
firstTaskLaunchedTime = stage.firstTaskLaunchedTime,
completionTime = stage.completionTime,
failureReason = stage.failureReason,
executorDeserializeTime = stage.executorDeserializeTime,
executorDeserializeCpuTime = stage.executorDeserializeCpuTime,
executorRunTime = stage.executorRunTime,
executorCpuTime = stage.executorCpuTime,
resultSize = stage.resultSize,
jvmGcTime = stage.jvmGcTime,
resultSerializationTime = stage.resultSerializationTime,
memoryBytesSpilled = stage.memoryBytesSpilled,
diskBytesSpilled = stage.diskBytesSpilled,
peakExecutionMemory = stage.peakExecutionMemory,
inputBytes = stage.inputBytes,
inputRecords = stage.inputRecords,
outputBytes = stage.outputBytes,
outputRecords = stage.outputRecords,
shuffleRemoteBlocksFetched = stage.shuffleRemoteBlocksFetched,
shuffleLocalBlocksFetched = stage.shuffleLocalBlocksFetched,
shuffleFetchWaitTime = stage.shuffleFetchWaitTime,
shuffleRemoteBytesRead = stage.shuffleRemoteBytesRead,
shuffleRemoteBytesReadToDisk = stage.shuffleRemoteBytesReadToDisk,
shuffleLocalBytesRead = stage.shuffleLocalBytesRead,
shuffleReadBytes = stage.shuffleReadBytes,
shuffleReadRecords = stage.shuffleReadRecords,
shuffleWriteBytes = stage.shuffleWriteBytes,
shuffleWriteTime = stage.shuffleWriteTime,
shuffleWriteRecords = stage.shuffleWriteRecords,
name = stage.name,
description = stage.description,
details = stage.details,
schedulingPool = stage.schedulingPool,
rddIds = stage.rddIds,
accumulatorUpdates = stage.accumulatorUpdates,
tasks = Some(tasks),
executorSummary = Some(executorSummary(stage.stageId, stage.attemptId)),
killedTasksSummary = stage.killedTasksSummary)
}

def rdd(rddId: Int): v1.RDDStorageInfo = {
Expand Down
92 changes: 53 additions & 39 deletions core/src/main/scala/org/apache/spark/status/LiveEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -394,45 +394,59 @@ private class LiveStage extends LiveEntity {

def toApi(): v1.StageData = {
new v1.StageData(
status,
info.stageId,
info.attemptNumber,

info.numTasks,
activeTasks,
completedTasks,
failedTasks,
killedTasks,
completedIndices.size,

metrics.executorRunTime,
metrics.executorCpuTime,
info.submissionTime.map(new Date(_)),
if (firstLaunchTime < Long.MaxValue) Some(new Date(firstLaunchTime)) else None,
info.completionTime.map(new Date(_)),
info.failureReason,

metrics.inputMetrics.bytesRead,
metrics.inputMetrics.recordsRead,
metrics.outputMetrics.bytesWritten,
metrics.outputMetrics.recordsWritten,
metrics.shuffleReadMetrics.localBytesRead + metrics.shuffleReadMetrics.remoteBytesRead,
metrics.shuffleReadMetrics.recordsRead,
metrics.shuffleWriteMetrics.bytesWritten,
metrics.shuffleWriteMetrics.recordsWritten,
metrics.memoryBytesSpilled,
metrics.diskBytesSpilled,

info.name,
description,
info.details,
schedulingPool,

info.rddInfos.map(_.id),
newAccumulatorInfos(info.accumulables.values),
None,
None,
killedSummary)
status = status,
stageId = info.stageId,
attemptId = info.attemptNumber,
numTasks = info.numTasks,
numActiveTasks = activeTasks,
numCompleteTasks = completedTasks,
numFailedTasks = failedTasks,
numKilledTasks = killedTasks,
numCompletedIndices = completedIndices.size,

submissionTime = info.submissionTime.map(new Date(_)),
firstTaskLaunchedTime =
if (firstLaunchTime < Long.MaxValue) Some(new Date(firstLaunchTime)) else None,
completionTime = info.completionTime.map(new Date(_)),
failureReason = info.failureReason,

executorDeserializeTime = metrics.executorDeserializeTime,
executorDeserializeCpuTime = metrics.executorDeserializeCpuTime,
executorRunTime = metrics.executorRunTime,
executorCpuTime = metrics.executorCpuTime,
resultSize = metrics.resultSize,
jvmGcTime = metrics.jvmGcTime,
resultSerializationTime = metrics.resultSerializationTime,
memoryBytesSpilled = metrics.memoryBytesSpilled,
diskBytesSpilled = metrics.diskBytesSpilled,
peakExecutionMemory = metrics.peakExecutionMemory,
inputBytes = metrics.inputMetrics.bytesRead,
inputRecords = metrics.inputMetrics.recordsRead,
outputBytes = metrics.outputMetrics.bytesWritten,
outputRecords = metrics.outputMetrics.recordsWritten,
shuffleRemoteBlocksFetched = metrics.shuffleReadMetrics.remoteBlocksFetched,
shuffleLocalBlocksFetched = metrics.shuffleReadMetrics.localBlocksFetched,
shuffleFetchWaitTime = metrics.shuffleReadMetrics.fetchWaitTime,
shuffleRemoteBytesRead = metrics.shuffleReadMetrics.remoteBytesRead,
shuffleRemoteBytesReadToDisk = metrics.shuffleReadMetrics.remoteBytesReadToDisk,
shuffleLocalBytesRead = metrics.shuffleReadMetrics.localBytesRead,
shuffleReadBytes =
metrics.shuffleReadMetrics.localBytesRead + metrics.shuffleReadMetrics.remoteBytesRead,
shuffleReadRecords = metrics.shuffleReadMetrics.recordsRead,
shuffleWriteBytes = metrics.shuffleWriteMetrics.bytesWritten,
shuffleWriteTime = metrics.shuffleWriteMetrics.writeTime,
shuffleWriteRecords = metrics.shuffleWriteMetrics.recordsWritten,

name = info.name,
description = description,
details = info.details,
schedulingPool = schedulingPool,

rddIds = info.rddInfos.map(_.id),
accumulatorUpdates = newAccumulatorInfos(info.accumulables.values),
tasks = None,
executorSummary = None,
killedTasksSummary = killedSummary)
}

override protected def doUpdate(): Any = {
Expand Down
21 changes: 17 additions & 4 deletions core/src/main/scala/org/apache/spark/status/api/v1/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -210,23 +210,36 @@ class StageData private[spark](
val numKilledTasks: Int,
val numCompletedIndices: Int,

val executorRunTime: Long,
val executorCpuTime: Long,
val submissionTime: Option[Date],
val firstTaskLaunchedTime: Option[Date],
val completionTime: Option[Date],
val failureReason: Option[String],

val executorDeserializeTime: Long,
val executorDeserializeCpuTime: Long,
val executorRunTime: Long,
val executorCpuTime: Long,
val resultSize: Long,
val jvmGcTime: Long,
val resultSerializationTime: Long,
val memoryBytesSpilled: Long,
val diskBytesSpilled: Long,
val peakExecutionMemory: Long,
val inputBytes: Long,
val inputRecords: Long,
val outputBytes: Long,
val outputRecords: Long,
val shuffleRemoteBlocksFetched: Long,
val shuffleLocalBlocksFetched: Long,
val shuffleFetchWaitTime: Long,
val shuffleRemoteBytesRead: Long,
val shuffleRemoteBytesReadToDisk: Long,
val shuffleLocalBytesRead: Long,
val shuffleReadBytes: Long,
val shuffleReadRecords: Long,
val shuffleWriteBytes: Long,
val shuffleWriteTime: Long,
val shuffleWriteRecords: Long,
val memoryBytesSpilled: Long,
val diskBytesSpilled: Long,

val name: String,
val description: Option[String],
Expand Down
65 changes: 51 additions & 14 deletions core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -203,20 +203,57 @@ private[ui] class JobPage(parent: JobsTab, store: AppStatusStore) extends WebUIP
// stage or if the stage information has been garbage collected
store.asOption(store.lastStageAttempt(stageId)).getOrElse {
new v1.StageData(
v1.StageStatus.PENDING,
stageId,
0, 0, 0, 0, 0, 0, 0,
0L, 0L, None, None, None, None,
0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L,
"Unknown",
None,
"Unknown",
null,
Nil,
Nil,
None,
None,
Map())
status = v1.StageStatus.PENDING,
stageId = stageId,
attemptId = 0,
numTasks = 0,
numActiveTasks = 0,
numCompleteTasks = 0,
numFailedTasks = 0,
numKilledTasks = 0,
numCompletedIndices = 0,

submissionTime = None,
firstTaskLaunchedTime = None,
completionTime = None,
failureReason = None,

executorDeserializeTime = 0L,
executorDeserializeCpuTime = 0L,
executorRunTime = 0L,
executorCpuTime = 0L,
resultSize = 0L,
jvmGcTime = 0L,
resultSerializationTime = 0L,
memoryBytesSpilled = 0L,
diskBytesSpilled = 0L,
peakExecutionMemory = 0L,
inputBytes = 0L,
inputRecords = 0L,
outputBytes = 0L,
outputRecords = 0L,
shuffleRemoteBlocksFetched = 0L,
shuffleLocalBlocksFetched = 0L,
shuffleFetchWaitTime = 0L,
shuffleRemoteBytesRead = 0L,
shuffleRemoteBytesReadToDisk = 0L,
shuffleLocalBytesRead = 0L,
shuffleReadBytes = 0L,
shuffleReadRecords = 0L,
shuffleWriteBytes = 0L,
shuffleWriteTime = 0L,
shuffleWriteRecords = 0L,

name = "Unknown",
description = None,
details = "Unknown",
schedulingPool = null,

rddIds = Nil,
accumulatorUpdates = Nil,
tasks = None,
executorSummary = None,
killedTasksSummary = Map())
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,34 @@
"numFailedTasks" : 2,
"numKilledTasks" : 0,
"numCompletedIndices" : 10,
"executorRunTime" : 761,
"executorCpuTime" : 269916000,
"submissionTime" : "2018-01-09T10:21:18.152GMT",
"firstTaskLaunchedTime" : "2018-01-09T10:21:18.347GMT",
"completionTime" : "2018-01-09T10:21:19.062GMT",
"executorDeserializeTime" : 327,
"executorDeserializeCpuTime" : 225900000,
"executorRunTime" : 761,
"executorCpuTime" : 269916000,
"resultSize" : 10376,
"jvmGcTime" : 27,
"resultSerializationTime" : 1,
"memoryBytesSpilled" : 0,
"diskBytesSpilled" : 0,
"peakExecutionMemory" : 0,
"inputBytes" : 0,
"inputRecords" : 0,
"outputBytes" : 0,
"outputRecords" : 0,
"shuffleRemoteBlocksFetched" : 0,
"shuffleLocalBlocksFetched" : 0,
"shuffleFetchWaitTime" : 0,
"shuffleRemoteBytesRead" : 0,
"shuffleRemoteBytesReadToDisk" : 0,
"shuffleLocalBytesRead" : 0,
"shuffleReadBytes" : 0,
"shuffleReadRecords" : 0,
"shuffleWriteBytes" : 460,
"shuffleWriteTime" : 8711515,
"shuffleWriteRecords" : 10,
"memoryBytesSpilled" : 0,
"diskBytesSpilled" : 0,
"name" : "map at <console>:26",
"details" : "org.apache.spark.rdd.RDD.map(RDD.scala:370)\n$line17.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:26)\n$line17.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:34)\n$line17.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:36)\n$line17.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:38)\n$line17.$read$$iw$$iw$$iw$$iw.<init>(<console>:40)\n$line17.$read$$iw$$iw$$iw.<init>(<console>:42)\n$line17.$read$$iw$$iw.<init>(<console>:44)\n$line17.$read$$iw.<init>(<console>:46)\n$line17.$read.<init>(<console>:48)\n$line17.$read$.<init>(<console>:52)\n$line17.$read$.<clinit>(<console>)\n$line17.$eval$.$print$lzycompute(<console>:7)\n$line17.$eval$.$print(<console>:6)\n$line17.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:498)\nscala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)",
"schedulingPool" : "default",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,34 @@
"numFailedTasks" : 4,
"numKilledTasks" : 0,
"numCompletedIndices" : 10,
"executorRunTime" : 5080,
"executorCpuTime" : 1163210819,
"submissionTime" : "2018-01-18T18:33:12.658GMT",
"firstTaskLaunchedTime" : "2018-01-18T18:33:12.816GMT",
"completionTime" : "2018-01-18T18:33:15.279GMT",
"executorDeserializeTime" : 3679,
"executorDeserializeCpuTime" : 1029819716,
"executorRunTime" : 5080,
"executorCpuTime" : 1163210819,
"resultSize" : 10824,
"jvmGcTime" : 370,
"resultSerializationTime" : 5,
"memoryBytesSpilled" : 0,
"diskBytesSpilled" : 0,
"peakExecutionMemory" : 0,
"inputBytes" : 0,
"inputRecords" : 0,
"outputBytes" : 0,
"outputRecords" : 0,
"shuffleRemoteBlocksFetched" : 0,
"shuffleLocalBlocksFetched" : 0,
"shuffleFetchWaitTime" : 0,
"shuffleRemoteBytesRead" : 0,
"shuffleRemoteBytesReadToDisk" : 0,
"shuffleLocalBytesRead" : 0,
"shuffleReadBytes" : 0,
"shuffleReadRecords" : 0,
"shuffleWriteBytes" : 1461,
"shuffleWriteTime" : 33251697,
"shuffleWriteRecords" : 30,
"memoryBytesSpilled" : 0,
"diskBytesSpilled" : 0,
"name" : "map at <console>:27",
"details" : "org.apache.spark.rdd.RDD.map(RDD.scala:370)\n$line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:27)\n$line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:35)\n$line15.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:37)\n$line15.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:39)\n$line15.$read$$iw$$iw$$iw$$iw.<init>(<console>:41)\n$line15.$read$$iw$$iw$$iw.<init>(<console>:43)\n$line15.$read$$iw$$iw.<init>(<console>:45)\n$line15.$read$$iw.<init>(<console>:47)\n$line15.$read.<init>(<console>:49)\n$line15.$read$.<init>(<console>:53)\n$line15.$read$.<clinit>(<console>)\n$line15.$eval$.$print$lzycompute(<console>:7)\n$line15.$eval$.$print(<console>:6)\n$line15.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:498)\nscala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)",
"schedulingPool" : "default",
Expand Down Expand Up @@ -864,4 +877,4 @@
}
},
"killedTasksSummary" : { }
}
}
Loading

0 comments on commit 4974fa0

Please sign in to comment.