Skip to content

Commit

Permalink
Add PhotonPlanParser unit test
Browse files Browse the repository at this point in the history
Signed-off-by: Partho Sarthi <psarthi@nvidia.com>
  • Loading branch information
parthosa committed Sep 19, 2024
1 parent 941da22 commit cd4e32f
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 64 deletions.
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ repos:
rev: v4.0.1
hooks:
- id: check-added-large-files
name: Check for file over 2.0MiB
args: ['--maxkb=2000', '--enforce-all']
name: Check for file over 4.0MiB
args: ['--maxkb=4000', '--enforce-all']
- id: trailing-whitespace
name: trim trailing white spaces preserving md files
args: ['--markdown-linebreak-ext=md']
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
App Name,App ID,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,SQL Stage Durations Sum,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly),Total Core Seconds,Photon App
"Spark shell","local-1629446106683",16424.61,1273.38,1910,6475,17698,1910,27.76,"","","","array<struct<city:string,state:string>>;map<string,map<string,string>>","array<struct<city:string,state:string>>;map<string,map<string,string>>","NESTED COMPLEX TYPE",1453,1203,16292,0,6475,false,"","",30,132,true
"Databricks Shell","app-20240919162642-0000",205952.38,195501.61,258770,3858136,401454,258770,75.53,"","","","","","",250542,1810943,598848,0,3858136,true,"Execute AddJarsCommand;Execute CreateViewCommand","",30,6213,true
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.planparser

import com.nvidia.spark.rapids.BaseTestSuite
import com.nvidia.spark.rapids.tool.{EventLogPathProcessor, ToolTestUtils}
import com.nvidia.spark.rapids.tool.qualification._

import org.apache.spark.sql.rapids.tool.qualification.QualificationAppInfo
import org.apache.spark.sql.rapids.tool.util.RapidsToolsConfUtil

class BasePlanParserSuite extends BaseTestSuite {

val profileLogDir: String = ToolTestUtils.getTestResourcePath("spark-events-profiling")
val qualLogDir: String = ToolTestUtils.getTestResourcePath("spark-events-qualification")

def assertSizeAndNotSupported(size: Int, execs: Seq[ExecInfo],
checkDurations: Boolean = true): Unit = {
for (t <- Seq(execs)) {
assert(t.size == size, t)
assert(t.forall(_.speedupFactor == 1), t)
assert(t.forall(_.isSupported == false), t)
assert(t.forall(_.children.isEmpty), t)
if (checkDurations) {
assert(t.forall(_.duration.isEmpty), t)
}
}
}

def assertSizeAndSupported(size: Int, execs: Seq[ExecInfo],
expectedDur: Seq[Option[Long]] = Seq.empty, extraText: String = "",
checkDurations: Boolean = true): Unit = {
for (t <- Seq(execs)) {
assert(t.size == size, s"$extraText $t")
assert(t.forall(_.isSupported == true), s"$extraText $t")
assert(t.forall(_.children.isEmpty), s"$extraText $t")
if (expectedDur.nonEmpty) {
val durations = t.map(_.duration)
assert(durations.diff(expectedDur).isEmpty,
s"$extraText durations differ expected ${expectedDur.mkString(",")} " +
s"but got ${durations.mkString(",")}")
} else if (checkDurations) {
assert(t.forall(_.duration.isEmpty), s"$extraText $t")
}
}
}

def createAppFromEventlog(eventLog: String): QualificationAppInfo = {
val hadoopConf = RapidsToolsConfUtil.newHadoopConf()
val (_, allEventLogs) = EventLogPathProcessor.processAllPaths(
None, None, List(eventLog), hadoopConf)
val pluginTypeChecker = new PluginTypeChecker()
assert(allEventLogs.size == 1)
val appResult = QualificationAppInfo.createApp(allEventLogs.head, hadoopConf,
pluginTypeChecker, reportSqlLevel = false, mlOpsEnabled = false, penalizeTransitions = true)
appResult match {
case Right(app) => app
case Left(_) => throw new AssertionError("Cannot create application")
}
}

def getAllExecsFromPlan(plans: Seq[PlanInfo]): Seq[ExecInfo] = {
val topExecInfo = plans.flatMap(_.execInfo)
topExecInfo.flatMap { e =>
e.children.getOrElse(Seq.empty) :+ e
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.planparser

import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker


class PhotonPlanParserSuite extends BasePlanParserSuite {

// scalastyle:off line.size.limit
// Test cases for Photon nodes. We could add more operators here.
val photonOpTestCases: Seq[(String, String)] = Seq(
"PhotonBroadcastNestedLoopJoin" -> "BroadcastNestedLoopJoin", // Case: Photon specific parser
"PhotonProject" -> "Project", // Case: Fallback to Spark CPU parser
"PhotonShuffleMapStage" -> "WholeStageCodegen" // Case: WholeStageCodegen operator
)
// scalastyle:on line.size.limit

photonOpTestCases.foreach { case (photonName, sparkName) =>
test(s"$photonName is parsed as Spark $sparkName") {
val eventLog = s"$qualLogDir/nds_q88_photon_db_13_3.zstd"
val pluginTypeChecker = new PluginTypeChecker()
val app = createAppFromEventlog(eventLog)
assert(app.sqlPlans.nonEmpty)
val parsedPlans = app.sqlPlans.map { case (sqlID, plan) =>
SQLPlanParser.parseSQLPlan(app.appId, plan, sqlID, "", pluginTypeChecker, app)
}
val allExecInfo = getAllExecsFromPlan(parsedPlans.toSeq)
val reader = allExecInfo.filter(_.exec.contains(sparkName))
assert(reader.nonEmpty, s"Failed to find $sparkName in $allExecInfo")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import java.io.{File, PrintWriter}
import scala.collection.mutable
import scala.util.control.NonFatal

import com.nvidia.spark.rapids.BaseTestSuite
import com.nvidia.spark.rapids.tool.{EventLogPathProcessor, ToolTestUtils}
import com.nvidia.spark.rapids.tool.ToolTestUtils
import com.nvidia.spark.rapids.tool.qualification._
import org.scalatest.Matchers.{be, contain, convertToAnyShouldWrapper}
import org.scalatest.exceptions.TestFailedException
Expand All @@ -32,65 +31,9 @@ import org.apache.spark.sql.execution.ui.SQLPlanMetric
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.rapids.tool.ToolUtils
import org.apache.spark.sql.rapids.tool.qualification.QualificationAppInfo
import org.apache.spark.sql.rapids.tool.util.{FSUtils, RapidsToolsConfUtil, ToolsPlanGraph, UTF8Source}
import org.apache.spark.sql.rapids.tool.util.{FSUtils, ToolsPlanGraph, UTF8Source}

class SQLPlanParserSuite extends BaseTestSuite {

private val profileLogDir = ToolTestUtils.getTestResourcePath("spark-events-profiling")
private val qualLogDir = ToolTestUtils.getTestResourcePath("spark-events-qualification")

private def assertSizeAndNotSupported(size: Int, execs: Seq[ExecInfo],
checkDurations: Boolean = true): Unit = {
for (t <- Seq(execs)) {
assert(t.size == size, t)
assert(t.forall(_.speedupFactor == 1), t)
assert(t.forall(_.isSupported == false), t)
assert(t.forall(_.children.isEmpty), t)
if (checkDurations) {
assert(t.forall(_.duration.isEmpty), t)
}
}
}

private def assertSizeAndSupported(size: Int, execs: Seq[ExecInfo],
expectedDur: Seq[Option[Long]] = Seq.empty, extraText: String = "",
checkDurations: Boolean = true): Unit = {
for (t <- Seq(execs)) {
assert(t.size == size, s"$extraText $t")
assert(t.forall(_.isSupported == true), s"$extraText $t")
assert(t.forall(_.children.isEmpty), s"$extraText $t")
if (expectedDur.nonEmpty) {
val durations = t.map(_.duration)
assert(durations.diff(expectedDur).isEmpty,
s"$extraText durations differ expected ${expectedDur.mkString(",")} " +
s"but got ${durations.mkString(",")}")
} else if (checkDurations) {
assert(t.forall(_.duration.isEmpty), s"$extraText $t")
}
}
}

private def createAppFromEventlog(eventLog: String): QualificationAppInfo = {
val hadoopConf = RapidsToolsConfUtil.newHadoopConf()
val (_, allEventLogs) = EventLogPathProcessor.processAllPaths(
None, None, List(eventLog), hadoopConf)
val pluginTypeChecker = new PluginTypeChecker()
assert(allEventLogs.size == 1)
val appResult = QualificationAppInfo.createApp(allEventLogs.head, hadoopConf,
pluginTypeChecker, reportSqlLevel = false, mlOpsEnabled = false, penalizeTransitions = true)
appResult match {
case Right(app) => app
case Left(_) => throw new AssertionError("Cannot create application")
}
}

private def getAllExecsFromPlan(plans: Seq[PlanInfo]): Seq[ExecInfo] = {
val topExecInfo = plans.flatMap(_.execInfo)
topExecInfo.flatMap { e =>
e.children.getOrElse(Seq.empty) :+ e
}
}
class SQLPlanParserSuite extends BasePlanParserSuite {

test("Error parser does not cause entire app to fail") {
// The purpose of this test is to make sure that the SQLParser won't trigger an exception that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1758,7 +1758,7 @@ class QualificationSuite extends BaseTestSuite {
}

test("test support for photon event log") {
val logFiles = Array(s"$logDir/photon_eventlog.zstd") // photon event log
val logFiles = Array(s"$logDir/nds_q88_photon_db_13_3.zstd") // photon event log
// Status counts: 1 SUCCESS, 0 FAILURE, 0 SKIPPED, 0 UNKNOWN
val expectedStatus = Some(StatusReportCounts(1, 0, 0, 0))
runQualificationTest(logFiles, "photon_eventlog_expectation.csv",
Expand Down

0 comments on commit cd4e32f

Please sign in to comment.