Skip to content

Commit

Permalink
address feedback to merge two tables together
Browse files Browse the repository at this point in the history
Signed-off-by: cindyyuanjiang <cindyj@nvidia.com>
  • Loading branch information
cindyyuanjiang committed Oct 9, 2024
1 parent 6aefee9 commit 5a56d30
Show file tree
Hide file tree
Showing 11 changed files with 60 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.nvidia.spark.rapids.tool.analysis

import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult, StageDiagnosticTaskMetricsProfileResult, StageDiagnosticAggMetricsProfileResult}
import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult, StageDiagnosticMetricsProfileResult}

/**
* The result of the aggregation of the raw metrics. It contains the aggregated metrics for an
Expand All @@ -32,7 +32,6 @@ import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTa
* @param ioAggs lists the SQLs along their IO metrics
* @param sqlDurAggs the aggregated duration and CPU time for SQLs
* @param maxTaskInputSizes a sequence of SQLMaxTaskInputSizes that contains the maximum input size
* @param stageDiagnosticsAggs the aggregated stage level diagnostic Spark metrics
* @param stageDiagnostics the stage level Spark metrics for diagnostic purposes
*/
case class AggRawMetricsResult(
Expand All @@ -43,5 +42,4 @@ case class AggRawMetricsResult(
ioAggs: Seq[IOAnalysisProfileResult],
sqlDurAggs: Seq[SQLDurationExecutorTimeProfileResult],
maxTaskInputSizes: Seq[SQLMaxTaskInputSizes],
stageDiagnosticsAggs: Seq[StageDiagnosticAggMetricsProfileResult],
stageDiagnostics: Seq[StageDiagnosticTaskMetricsProfileResult])
stageDiagnostics: Seq[StageDiagnosticMetricsProfileResult])
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ trait AppSparkMetricsAggTrait extends AppIndexMapperTrait {
analysisObj.aggregateIOMetricsBySql(analysisObj.aggregateSparkMetricsBySql(index)),
analysisObj.aggregateDurationAndCPUTimeBySql(index),
Seq(analysisObj.maxTaskInputSizeBytesPerSQL(index)),
analysisObj.aggregateStageAggDiagnosticsSparkMetrics(index),
analysisObj.aggregateDiagnosticSparkMetricsByStage(index))
}

Expand All @@ -62,7 +61,6 @@ trait AppSparkMetricsAggTrait extends AppIndexMapperTrait {
agg1.ioAggs ++ agg2.ioAggs,
agg1.sqlDurAggs ++ agg2.sqlDurAggs,
agg1.maxTaskInputSizes ++ agg2.maxTaskInputSizes,
agg1.stageDiagnosticsAggs ++ agg2.stageDiagnosticsAggs,
agg1.stageDiagnostics ++ agg2.stageDiagnostics)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids.tool.analysis

import scala.collection.mutable

import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult, StageDiagnosticTaskMetricsProfileResult, StageDiagnosticAggMetricsProfileResult}
import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult, StageDiagnosticMetricsProfileResult}

import org.apache.spark.sql.rapids.tool.{AppBase, ToolUtils}
import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo
Expand Down Expand Up @@ -323,13 +323,15 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
* @param index the App-index (used by the profiler tool)
* @return sequence of StageDiagnosticAggTaskMetricsProfileResult
*/
def aggregateDiagnosticSparkMetricsByStage(index: Int): Seq[StageDiagnosticTaskMetricsProfileResult] = {
def aggregateDiagnosticSparkMetricsByStage(index: Int):
Seq[StageDiagnosticMetricsProfileResult] = {
// TODO: this has stage attempts. we should handle different attempts
val rows = app.stageManager.getAllStages.map { sm =>
System.out.println("In aggregateDiagnosticSparkMetricsByStage!\n")
// TODO: Should we only consider successful tasks?
val tasksInStage = app.taskManager.getTasks(sm.stageInfo.stageId,
sm.stageInfo.attemptNumber())
// count duplicate task attempts
val numAttempts = tasksInStage.size
val (diskSpilledMin, diskSpilledMed, diskSpilledMax, diskSpilledSum) =
AppSparkMetricsAnalyzer.getStatistics(tasksInStage.map(_.diskBytesSpilled))
val (memSpilledMin, memSpilledMed, memSpilledMax, memSpilledSum) =
Expand All @@ -346,13 +348,14 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
AppSparkMetricsAnalyzer.getStatistics(tasksInStage.map(_.sr_fetchWaitTime))
val (swWriteTimeMin, swWriteTimeMed, swWriteTimeMax, swWriteTimeSum) =
AppSparkMetricsAnalyzer.getStatistics(tasksInStage.map(_.sw_writeTime))
val nodeNames = app.asInstanceOf[ApplicationInfo].planMetricProcessor.stageToNodeNames.getOrElse(sm.stageInfo.stageId, Seq.empty[String])
StageDiagnosticTaskMetricsProfileResult(index,
val nodeNames = app.asInstanceOf[ApplicationInfo].planMetricProcessor.stageToNodeNames.
getOrElse(sm.stageInfo.stageId, Seq.empty[String])
StageDiagnosticMetricsProfileResult(index,
app.appId,
app.getAppName,
sm.stageInfo.stageId,
sm.duration,
tasksInStage.size, // TODO: why is this numAttempts and not numTasks?
numAttempts, // TODO: why is this numAttempts and not numTasks?
memSpilledMin / (1024 * 1024),
memSpilledMed / (1024 * 1024),
memSpilledMax / (1024 * 1024),
Expand Down Expand Up @@ -440,29 +443,6 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
stageLevelSparkMetrics(index).put(sm.stageInfo.stageId, stageRow)
}
}

def aggregateStageAggDiagnosticsSparkMetrics(index: Int): Seq[StageDiagnosticAggMetricsProfileResult] = {
// TODO: this has stage attempts. we should handle different attempts
val rows = app.stageManager.getAllStages.map { sm =>
// TODO: Should we only consider successful tasks?
val tasksInStage = app.taskManager.getTasks(sm.stageInfo.stageId,
sm.stageInfo.attemptNumber())

val nodeNames = app.asInstanceOf[ApplicationInfo].planMetricProcessor.stageToNodeNames.getOrElse(sm.stageInfo.stageId, Seq.empty[String])

StageDiagnosticAggMetricsProfileResult(index,
app.appId,
app.getAppName,
sm.stageInfo.stageId,
sm.duration,
tasksInStage.size, // TODO: why is this numAttempts and not numTasks?
tasksInStage.map(_.memoryBytesSpilled).sum / (1024 * 1024),
tasksInStage.map(_.diskBytesSpilled).sum / (1024 * 1024),
nodeNames
)
}
rows.toSeq
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ case class ApplicationSummaryInfo(
sysProps: Seq[RapidsPropertyProfileResult],
sqlCleanedAlignedIds: Seq[SQLCleanAndAlignIdsProfileResult],
sparkRapidsBuildInfo: Seq[SparkRapidsBuildInfoEvent],
stageDiagnosticsAggs: Seq[StageDiagnosticAggMetricsProfileResult],
stageDiagnostics: Seq[StageDiagnosticTaskMetricsProfileResult])
stageDiagnostics: Seq[StageDiagnosticMetricsProfileResult])

trait AppInfoPropertyGetter {
// returns all the properties (i.e., spark)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,7 @@ case class AppStatusResult(
Seq(path, status, appId, message)
}

override def convertToCSVSeq: Seq[String] = {
Seq(path, status, appId, message)
}
override def convertToCSVSeq: Seq[String] = convertToSeq
}

// note that some things might not be set until after sqlMetricsAggregation called
Expand Down Expand Up @@ -501,40 +499,7 @@ trait BaseJobStageAggTaskMetricsProfileResult extends ProfileResult {
swWriteTimeSum.toString)
}

override def convertToCSVSeq: Seq[String] = {
Seq(appIndex.toString,
id.toString,
numTasks.toString,
durStr,
diskBytesSpilledSum.toString,
durationSum.toString,
durationMax.toString,
durationMin.toString,
durationAvg.toString,
executorCPUTimeSum.toString,
executorDeserializeCpuTimeSum.toString,
executorDeserializeTimeSum.toString,
executorRunTimeSum.toString,
inputBytesReadSum.toString,
inputRecordsReadSum.toString,
jvmGCTimeSum.toString,
memoryBytesSpilledSum.toString,
outputBytesWrittenSum.toString,
outputRecordsWrittenSum.toString,
peakExecutionMemoryMax.toString,
resultSerializationTimeSum.toString,
resultSizeMax.toString,
srFetchWaitTimeSum.toString,
srLocalBlocksFetchedSum.toString,
srcLocalBytesReadSum.toString,
srRemoteBlocksFetchSum.toString,
srRemoteBytesReadSum.toString,
srRemoteBytesReadToDiskSum.toString,
srTotalBytesReadSum.toString,
swBytesWrittenSum.toString,
swRecordsWrittenSum.toString,
swWriteTimeSum.toString)
}
override def convertToCSVSeq: Seq[String] = convertToSeq
}

case class JobAggTaskMetricsProfileResult(
Expand Down Expand Up @@ -609,7 +574,7 @@ case class StageAggTaskMetricsProfileResult(
override def idHeader = "stageId"
}

case class StageDiagnosticTaskMetricsProfileResult(
case class StageDiagnosticMetricsProfileResult(
appIndex: Int,
appId: String,
appName: String,
Expand Down Expand Up @@ -709,54 +674,50 @@ case class StageDiagnosticTaskMetricsProfileResult(
swWriteTimeMed.toString,
swWriteTimeMax.toString,
swWriteTimeSum.toString,
StringUtils.reformatCSVString(nodeNames.mkString(",")))
}

override def convertToCSVSeq: Seq[String] = convertToSeq
}

case class StageDiagnosticAggMetricsProfileResult(
appIndex: Int,
appId: String,
appName: String,
id: Long,
duration: Option[Long],
numTasks: Int,
memoryBytesSpilledSumMB: Long,
diskBytesSpilledSumMB: Long,
nodeNames: Seq[String]) extends ProfileResult {

val durStr = duration match {
case Some(dur) => dur.toString
case None => "null"
}

override val outputHeaders = Seq("appIndex", "appId", "appName",
"stageId", "Duration", "numTasks", "memoryBytesSpilledTotalMB",
"diskBytesSpilledTotalMB", "SQL Nodes(IDs)")

override def convertToSeq: Seq[String] = {
Seq(appIndex.toString,
appId,
appName,
id.toString,
durStr,
numTasks.toString,
memoryBytesSpilledSumMB.toString,
diskBytesSpilledSumMB.toString,
nodeNames.mkString(","))
}

override def convertToCSVSeq: Seq[String] =
override def convertToCSVSeq: Seq[String] = {
Seq(appIndex.toString,
appId,
appName,
id.toString,
durStr,
numTasks.toString,
memoryBytesSpilledSumMB.toString,
diskBytesSpilledSumMB.toString,
memoryBytesSpilledMBMin.toString,
memoryBytesSpilledMBMed.toString,
memoryBytesSpilledMBMax.toString,
memoryBytesSpilledMBSum.toString,
diskBytesSpilledMBMin.toString,
diskBytesSpilledMBMed.toString,
diskBytesSpilledMBMax.toString,
diskBytesSpilledMBSum.toString,
inputBytesReadMin.toString,
inputBytesReadMed.toString,
inputBytesReadMax.toString,
inputBytesReadSum.toString,
outputBytesWrittenMin.toString,
outputBytesWrittenMed.toString,
outputBytesWrittenMax.toString,
outputBytesWrittenSum.toString,
srTotalBytesReadMin.toString,
srTotalBytesReadMed.toString,
srTotalBytesReadMax.toString,
srTotalBytesReadSum.toString,
swBytesWrittenMin.toString,
swBytesWrittenMed.toString,
swBytesWrittenMax.toString,
swBytesWrittenSum.toString,
srFetchWaitTimeMin.toString,
srFetchWaitTimeMed.toString,
srFetchWaitTimeMax.toString,
srFetchWaitTimeSum.toString,
swWriteTimeMin.toString,
swWriteTimeMed.toString,
swWriteTimeMax.toString,
swWriteTimeSum.toString,
StringUtils.reformatCSVString(nodeNames.mkString(",")))
}
}

case class SQLMaxTaskInputSizes(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
failedTasks, failedStages, failedJobs, removedBMs, removedExecutors,
unsupportedOps, sparkProps, sqlToStages, wholeStage, maxTaskInputInfo,
appLogPath, analysis.ioAggs, systemProps, sqlIdAlign, sparkRapidsBuildInfo,
analysis.stageDiagnosticsAggs, analysis.stageDiagnostics), compareRes)
analysis.stageDiagnostics), compareRes)
}

/**
Expand Down Expand Up @@ -492,8 +492,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
combineProps("system", appsSum).sortBy(_.key),
appsSum.flatMap(_.sqlCleanedAlignedIds).sortBy(_.appIndex),
appsSum.flatMap(_.sparkRapidsBuildInfo),
appsSum.flatMap(_.stageDiagnosticsAggs),
appsSum.flatMap(_.stageDiagnostics).sortBy(_.duration)(Ordering[Option[Long]].reverse)
appsSum.flatMap(_.stageDiagnostics)
)
Seq(reduced)
} else {
Expand Down Expand Up @@ -541,7 +540,6 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
val skewTableDesc = AGG_DESCRIPTION(TASK_SHUFFLE_SKEW)
profileOutputWriter.write(skewHeader, app.skewInfo, tableDesc = Some(skewTableDesc))
profileOutputWriter.write(STAGE_DIAGNOSTICS_LABEL, app.stageDiagnostics)
profileOutputWriter.write(STAGE_AGGS_DIAGNOSTICS_LABEL, app.stageDiagnosticsAggs)

profileOutputWriter.writeText("\n### C. Health Check###\n")
profileOutputWriter.write(ProfFailedTaskView.getLabel, app.failedTasks)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.nvidia.spark.rapids.tool.views

import com.nvidia.spark.rapids.tool.profiling.{BaseJobStageAggTaskMetricsProfileResult, IOAnalysisProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLTaskAggMetricsProfileResult, StageDiagnosticTaskMetricsProfileResult, StageDiagnosticAggMetricsProfileResult}
import com.nvidia.spark.rapids.tool.profiling.{BaseJobStageAggTaskMetricsProfileResult, IOAnalysisProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLTaskAggMetricsProfileResult, StageDiagnosticMetricsProfileResult}

/**
* Contains the sort logic for the aggregated Spark RawMetrics.
Expand Down Expand Up @@ -92,22 +92,9 @@ object AggMetricsResultSorter {
}
}

def sortStageDiagnosticsAggs(
rows: Seq[StageDiagnosticAggMetricsProfileResult]):
Seq[StageDiagnosticAggMetricsProfileResult] = {
if (rows.isEmpty) {
Seq.empty
} else {
rows.sortBy { cols =>
val sortDur = cols.duration.getOrElse(0L)
(cols.appIndex, -sortDur, -cols.memoryBytesSpilledSumMB, cols.appName)
}
}
}

def sortStageDiagnostics(
rows: Seq[StageDiagnosticTaskMetricsProfileResult]):
Seq[StageDiagnosticTaskMetricsProfileResult] = {
rows: Seq[StageDiagnosticMetricsProfileResult]):
Seq[StageDiagnosticMetricsProfileResult] = {
if (rows.isEmpty) {
Seq.empty
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ object QualRawReportGenerator {
AggMetricsResultSorter.sortIO(aggRawResult.ioAggs),
AggMetricsResultSorter.sortSqlDurationAgg(aggRawResult.sqlDurAggs),
aggRawResult.maxTaskInputSizes,
AggMetricsResultSorter.sortStageDiagnosticsAggs(aggRawResult.stageDiagnosticsAggs),
AggMetricsResultSorter.sortStageDiagnostics(aggRawResult.stageDiagnostics))
Map(
STAGE_AGG_LABEL -> sortedRes.stageAggs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.nvidia.spark.rapids.tool.views

import com.nvidia.spark.rapids.tool.analysis.ProfSparkMetricsAnalyzer
import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult, StageDiagnosticTaskMetricsProfileResult, StageDiagnosticAggMetricsProfileResult}
import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult, StageDiagnosticMetricsProfileResult}

import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo

Expand All @@ -31,14 +31,11 @@ case class ProfilerAggregatedView(
ioAggs: Seq[IOAnalysisProfileResult],
sqlDurAggs: Seq[SQLDurationExecutorTimeProfileResult],
maxTaskInputSizes: Seq[SQLMaxTaskInputSizes],
stageDiagnosticsAggs: Seq[StageDiagnosticAggMetricsProfileResult],
stageDiagnostics: Seq[StageDiagnosticTaskMetricsProfileResult])
stageDiagnostics: Seq[StageDiagnosticMetricsProfileResult])

object RawMetricProfilerView {
def getAggMetrics(apps: Seq[ApplicationInfo]): ProfilerAggregatedView = {
val aggMetricsResults = ProfSparkMetricsAnalyzer.getAggregateRawMetrics(apps)
System.out.println("AggMetricsResultSorter.sortStageDiagnostics(aggMetricsResults.stageDiagnostics)")
System.out.println(AggMetricsResultSorter.sortStageDiagnostics(aggMetricsResults.stageDiagnostics))
ProfilerAggregatedView(
AggMetricsResultSorter.sortJobSparkMetrics(aggMetricsResults.jobAggs),
AggMetricsResultSorter.sortJobSparkMetrics(aggMetricsResults.stageAggs),
Expand All @@ -47,7 +44,6 @@ object RawMetricProfilerView {
AggMetricsResultSorter.sortIO(aggMetricsResults.ioAggs),
AggMetricsResultSorter.sortSqlDurationAgg(aggMetricsResults.sqlDurAggs),
aggMetricsResults.maxTaskInputSizes,
AggMetricsResultSorter.sortStageDiagnosticsAggs(aggMetricsResults.stageDiagnosticsAggs),
AggMetricsResultSorter.sortStageDiagnostics(aggMetricsResults.stageDiagnostics))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ package object views {
val IO_LABEL = "IO Metrics"
val SQL_DUR_LABEL = "SQL Duration and Executor CPU Time Percent"
val SQL_MAX_INPUT_SIZE = "SQL Max Task Input Size"
val STAGE_AGGS_DIAGNOSTICS_LABEL = "Stage agg diagnostics metrics"
val STAGE_DIAGNOSTICS_LABEL = "Stage level diagnostic metrics"

val AGG_DESCRIPTION = Map(
Expand Down
Loading

0 comments on commit 5a56d30

Please sign in to comment.