diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 19ab1a9d3..4204d7503 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -31,8 +31,8 @@ repos: rev: v4.0.1 hooks: - id: check-added-large-files - name: Check for file over 2.0MiB - args: ['--maxkb=2000', '--enforce-all'] + name: Check for file over 4.0MiB + args: ['--maxkb=4000', '--enforce-all'] - id: trailing-whitespace name: trim trailing white spaces preserving md files args: ['--markdown-linebreak-ext=md'] diff --git a/core/src/test/resources/QualificationExpectations/photon_eventlog_expectation.csv b/core/src/test/resources/QualificationExpectations/photon_eventlog_expectation.csv index f76c82b88..052c11bb9 100644 --- a/core/src/test/resources/QualificationExpectations/photon_eventlog_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/photon_eventlog_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,SQL Stage Durations Sum,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly),Total Core Seconds,Photon App -"Spark shell","local-1629446106683",16424.61,1273.38,1910,6475,17698,1910,27.76,"","","","array>;map>","array>;map>","NESTED COMPLEX TYPE",1453,1203,16292,0,6475,false,"","",30,132,true +"Databricks Shell","app-20240919162642-0000",205952.38,195501.61,258770,3858136,401454,258770,75.53,"","","","","","",250542,1810943,598848,0,3858136,true,"Execute AddJarsCommand;Execute CreateViewCommand","",30,6213,true diff --git a/core/src/test/resources/spark-events-qualification/nds_q88_photon_db_13_3.zstd b/core/src/test/resources/spark-events-qualification/nds_q88_photon_db_13_3.zstd new file mode 100644 index 000000000..505a36b01 Binary files /dev/null and b/core/src/test/resources/spark-events-qualification/nds_q88_photon_db_13_3.zstd differ diff --git a/core/src/test/resources/spark-events-qualification/photon_eventlog.zstd b/core/src/test/resources/spark-events-qualification/photon_eventlog.zstd deleted file mode 100644 index 2ff810859..000000000 Binary files a/core/src/test/resources/spark-events-qualification/photon_eventlog.zstd and /dev/null differ diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/BasePlanParserSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/BasePlanParserSuite.scala new file mode 100644 index 000000000..3bb35b75f --- /dev/null +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/BasePlanParserSuite.scala @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.planparser + +import com.nvidia.spark.rapids.BaseTestSuite +import com.nvidia.spark.rapids.tool.{EventLogPathProcessor, ToolTestUtils} +import com.nvidia.spark.rapids.tool.qualification._ + +import org.apache.spark.sql.rapids.tool.qualification.QualificationAppInfo +import org.apache.spark.sql.rapids.tool.util.RapidsToolsConfUtil + +class BasePlanParserSuite extends BaseTestSuite { + + val profileLogDir: String = ToolTestUtils.getTestResourcePath("spark-events-profiling") + val qualLogDir: String = ToolTestUtils.getTestResourcePath("spark-events-qualification") + + def assertSizeAndNotSupported(size: Int, execs: Seq[ExecInfo], + checkDurations: Boolean = true): Unit = { + for (t <- Seq(execs)) { + assert(t.size == size, t) + assert(t.forall(_.speedupFactor == 1), t) + assert(t.forall(_.isSupported == false), t) + assert(t.forall(_.children.isEmpty), t) + if (checkDurations) { + assert(t.forall(_.duration.isEmpty), t) + } + } + } + + def assertSizeAndSupported(size: Int, execs: Seq[ExecInfo], + expectedDur: Seq[Option[Long]] = Seq.empty, extraText: String = "", + checkDurations: Boolean = true): Unit = { + for (t <- Seq(execs)) { + assert(t.size == size, s"$extraText $t") + assert(t.forall(_.isSupported == true), s"$extraText $t") + assert(t.forall(_.children.isEmpty), s"$extraText $t") + if (expectedDur.nonEmpty) { + val durations = t.map(_.duration) + assert(durations.diff(expectedDur).isEmpty, + s"$extraText durations differ expected ${expectedDur.mkString(",")} " + + s"but got ${durations.mkString(",")}") + } else if (checkDurations) { + assert(t.forall(_.duration.isEmpty), s"$extraText $t") + } + } + } + + def createAppFromEventlog(eventLog: String): QualificationAppInfo = { + val hadoopConf = RapidsToolsConfUtil.newHadoopConf() + val (_, allEventLogs) = EventLogPathProcessor.processAllPaths( + None, None, List(eventLog), hadoopConf) + val pluginTypeChecker = new PluginTypeChecker() + assert(allEventLogs.size == 1) + val appResult = QualificationAppInfo.createApp(allEventLogs.head, hadoopConf, + pluginTypeChecker, reportSqlLevel = false, mlOpsEnabled = false, penalizeTransitions = true) + appResult match { + case Right(app) => app + case Left(_) => throw new AssertionError("Cannot create application") + } + } + + def getAllExecsFromPlan(plans: Seq[PlanInfo]): Seq[ExecInfo] = { + val topExecInfo = plans.flatMap(_.execInfo) + topExecInfo.flatMap { e => + e.children.getOrElse(Seq.empty) :+ e + } + } +} diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/PhotonPlanParserSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/PhotonPlanParserSuite.scala new file mode 100644 index 000000000..edf8095bc --- /dev/null +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/PhotonPlanParserSuite.scala @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.planparser + +import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker + + +class PhotonPlanParserSuite extends BasePlanParserSuite { + + // scalastyle:off line.size.limit + // Test cases for Photon nodes. We could add more operators here. + val photonOpTestCases: Seq[(String, String)] = Seq( + "PhotonBroadcastNestedLoopJoin" -> "BroadcastNestedLoopJoin", // Case: Photon specific parser + "PhotonProject" -> "Project", // Case: Fallback to Spark CPU parser + "PhotonShuffleMapStage" -> "WholeStageCodegen" // Case: WholeStageCodegen operator + ) + // scalastyle:on line.size.limit + + photonOpTestCases.foreach { case (photonName, sparkName) => + test(s"$photonName is parsed as Spark $sparkName") { + val eventLog = s"$qualLogDir/nds_q88_photon_db_13_3.zstd" + val pluginTypeChecker = new PluginTypeChecker() + val app = createAppFromEventlog(eventLog) + assert(app.sqlPlans.nonEmpty) + val parsedPlans = app.sqlPlans.map { case (sqlID, plan) => + SQLPlanParser.parseSQLPlan(app.appId, plan, sqlID, "", pluginTypeChecker, app) + } + val allExecInfo = getAllExecsFromPlan(parsedPlans.toSeq) + val reader = allExecInfo.filter(_.exec.contains(sparkName)) + assert(reader.nonEmpty, s"Failed to find $sparkName in $allExecInfo") + } + } +} diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala index 2f7829abc..dbafd12c1 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala @@ -21,8 +21,7 @@ import java.io.{File, PrintWriter} import scala.collection.mutable import scala.util.control.NonFatal -import com.nvidia.spark.rapids.BaseTestSuite -import com.nvidia.spark.rapids.tool.{EventLogPathProcessor, ToolTestUtils} +import com.nvidia.spark.rapids.tool.ToolTestUtils import com.nvidia.spark.rapids.tool.qualification._ import org.scalatest.Matchers.{be, contain, convertToAnyShouldWrapper} import org.scalatest.exceptions.TestFailedException @@ -32,65 +31,9 @@ import org.apache.spark.sql.execution.ui.SQLPlanMetric import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ import org.apache.spark.sql.rapids.tool.ToolUtils -import org.apache.spark.sql.rapids.tool.qualification.QualificationAppInfo -import org.apache.spark.sql.rapids.tool.util.{FSUtils, RapidsToolsConfUtil, ToolsPlanGraph, UTF8Source} +import org.apache.spark.sql.rapids.tool.util.{FSUtils, ToolsPlanGraph, UTF8Source} -class SQLPlanParserSuite extends BaseTestSuite { - - private val profileLogDir = ToolTestUtils.getTestResourcePath("spark-events-profiling") - private val qualLogDir = ToolTestUtils.getTestResourcePath("spark-events-qualification") - - private def assertSizeAndNotSupported(size: Int, execs: Seq[ExecInfo], - checkDurations: Boolean = true): Unit = { - for (t <- Seq(execs)) { - assert(t.size == size, t) - assert(t.forall(_.speedupFactor == 1), t) - assert(t.forall(_.isSupported == false), t) - assert(t.forall(_.children.isEmpty), t) - if (checkDurations) { - assert(t.forall(_.duration.isEmpty), t) - } - } - } - - private def assertSizeAndSupported(size: Int, execs: Seq[ExecInfo], - expectedDur: Seq[Option[Long]] = Seq.empty, extraText: String = "", - checkDurations: Boolean = true): Unit = { - for (t <- Seq(execs)) { - assert(t.size == size, s"$extraText $t") - assert(t.forall(_.isSupported == true), s"$extraText $t") - assert(t.forall(_.children.isEmpty), s"$extraText $t") - if (expectedDur.nonEmpty) { - val durations = t.map(_.duration) - assert(durations.diff(expectedDur).isEmpty, - s"$extraText durations differ expected ${expectedDur.mkString(",")} " + - s"but got ${durations.mkString(",")}") - } else if (checkDurations) { - assert(t.forall(_.duration.isEmpty), s"$extraText $t") - } - } - } - - private def createAppFromEventlog(eventLog: String): QualificationAppInfo = { - val hadoopConf = RapidsToolsConfUtil.newHadoopConf() - val (_, allEventLogs) = EventLogPathProcessor.processAllPaths( - None, None, List(eventLog), hadoopConf) - val pluginTypeChecker = new PluginTypeChecker() - assert(allEventLogs.size == 1) - val appResult = QualificationAppInfo.createApp(allEventLogs.head, hadoopConf, - pluginTypeChecker, reportSqlLevel = false, mlOpsEnabled = false, penalizeTransitions = true) - appResult match { - case Right(app) => app - case Left(_) => throw new AssertionError("Cannot create application") - } - } - - private def getAllExecsFromPlan(plans: Seq[PlanInfo]): Seq[ExecInfo] = { - val topExecInfo = plans.flatMap(_.execInfo) - topExecInfo.flatMap { e => - e.children.getOrElse(Seq.empty) :+ e - } - } +class SQLPlanParserSuite extends BasePlanParserSuite { test("Error parser does not cause entire app to fail") { // The purpose of this test is to make sure that the SQLParser won't trigger an exception that diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala index 417c4b378..2f7441325 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala @@ -1758,7 +1758,7 @@ class QualificationSuite extends BaseTestSuite { } test("test support for photon event log") { - val logFiles = Array(s"$logDir/photon_eventlog.zstd") // photon event log + val logFiles = Array(s"$logDir/nds_q88_photon_db_13_3.zstd") // photon event log // Status counts: 1 SUCCESS, 0 FAILURE, 0 SKIPPED, 0 UNKNOWN val expectedStatus = Some(StatusReportCounts(1, 0, 0, 0)) runQualificationTest(logFiles, "photon_eventlog_expectation.csv",