Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zjuwangg committed Jan 24, 2025
1 parent 0a1925c commit d584637
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ package org.apache.gluten.execution
import org.apache.gluten.config.GlutenConfig

import org.apache.spark.SparkConf
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.execution.{ApplyResourceProfileExec, ColumnarShuffleExchangeExec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec

@Experimental
class AutoAdjustStageResourceProfileSuite
extends VeloxWholeStageTransformerSuite
with AdaptiveSparkPlanHelper {
Expand Down Expand Up @@ -87,15 +89,15 @@ class AutoAdjustStageResourceProfileSuite
collect(plan) { case c: ApplyResourceProfileExec => c }.size
}

test("stage contains r2c and apply new resource profile") {
test("stage contains fallback nodes and apply new resource profile") {
withSQLConf(
GlutenConfig.COLUMNAR_SHUFFLE_ENABLED.key -> "false",
GlutenConfig.AUTO_ADJUST_STAGE_RESOURCES_FALLEN_NODE_RATIO_THRESHOLD.key -> "0.1") {
runQueryAndCompare("select c1, count(*) from tmp1 group by c1") {
df =>
val plan = df.queryExecution.executedPlan
// scalastyle:off
// @formatter:off
// format: off
/*
VeloxColumnarToRow
+- ^(7) HashAggregateTransformer(keys=[c1#22], functions=[count(1)], isStreamingAgg=false, output=[c1#22, count(1)#33L])
Expand All @@ -109,7 +111,7 @@ class AutoAdjustStageResourceProfileSuite
+- ^(6) FlushableHashAggregateTransformer(keys=[c1#22], functions=[partial_count(1)], isStreamingAgg=false, output=[c1#22, count#37L])
+- ^(6) FileScanTransformer parquet default.tmp1[c1#22] Batched: true, DataFilters: [],
*/
// @formatter:on
// format: on
// scalastyle:on
assert(collectColumnarShuffleExchange(plan) == 0)
assert(collectShuffleExchange(plan) == 1)
Expand All @@ -127,7 +129,7 @@ class AutoAdjustStageResourceProfileSuite
}
}

test("whole stage fallback") {
test("Apply new resource profile when whole stage fallback") {
withSQLConf(
GlutenConfig.COLUMNAR_FALLBACK_PREFER_COLUMNAR.key -> "false",
GlutenConfig.COLUMNAR_FALLBACK_IGNORE_ROW_TO_COLUMNAR.key -> "false",
Expand All @@ -139,7 +141,7 @@ class AutoAdjustStageResourceProfileSuite
"java_method('java.lang.Integer', 'signum', tmp1.c1), count(*) " +
"from tmp1 group by java_method('java.lang.Integer', 'signum', tmp1.c1)") {
// scalastyle:off
// @formatter:off
// format: off
/*
DeserializeToObject createexternalrow(java_method(java.lang.Integer, signum, c1)#35.toString, count(1)#36L, StructField(java_method(java.lang.Integer, signum, c1),StringType,true), StructField(count(1),LongType,false)), obj#42: org.apache.spark.sql.Row
+- *(3) HashAggregate(keys=[_nondeterministic#37], functions=[count(1)], output=[java_method(java.lang.Integer, signum, c1)#35, count(1)#36L])
Expand All @@ -152,7 +154,7 @@ class AutoAdjustStageResourceProfileSuite
+- *(1) ColumnarToRow
+- FileScan parquet default.tmp1[c1#22] Batched: true, DataFilters: [], Format: Parquet
*/
// @formatter:on
// format: on
// scalastyle:on
df => assert(collectApplyResourceProfileExec(df.queryExecution.executedPlan) == 1)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ case class GlutenAutoAdjustStageResourceProfile(glutenConf: GlutenConfig, spark:
return applyNewResourceProfileIfPossible(plan, newRP, rpManager)
}

// case 1: check whether fallback exists and decide whether increase heap memory
// todo:
// case 2: check whether fallback exists and decide whether increase heap memory
val fallenNodeCnt = planNodes.count(p => !p.isInstanceOf[GlutenPlan])
val totalCount = planNodes.size

Expand Down Expand Up @@ -161,7 +160,7 @@ object GlutenAutoAdjustStageResourceProfile extends Logging {
plan.withNewChildren(IndexedSeq(wrapperPlan))
} else {
logInfo(s"Ignore apply resource profile for plan ${plan.nodeName}")
// todo: support set InsertInto stage's resource profile
// todo: support set final stage's resource profile
plan
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2284,22 +2284,22 @@ object GlutenConfig {
val AUTO_ADJUST_STAGE_RESOURCE_PROFILE_ENABLED =
buildStaticConf("spark.gluten.auto.adjustStageResource.enabled")
.internal()
.doc("If enabled, gluten will try to set the stage resource according " +
.doc("Experimental: If enabled, gluten will try to set the stage resource according " +
"to stage execution plan. Only worked when aqe is enabled at the same time!!")
.booleanConf
.createWithDefault(false)

val AUTO_ADJUST_STAGE_RESOURCES_HEAP_RATIO =
buildConf("spark.gluten.auto.adjustStageResources.heap.ratio")
.internal()
.doc("Increase executor heap memory when match adjust stage resource rule.")
.doc("Experimental: Increase executor heap memory when match adjust stage resource rule.")
.doubleConf
.createWithDefault(2.0d)

val AUTO_ADJUST_STAGE_RESOURCES_FALLEN_NODE_RATIO_THRESHOLD =
buildConf("spark.gluten.auto.adjustStageResources.fallenNode.ratio.threshold")
.internal()
.doc("Increase executor heap memory when stage contains fallen node " +
.doc("Experimental: Increase executor heap memory when stage contains fallen node " +
"count exceeds the total node count ratio.")
.doubleConf
.createWithDefault(0.5d)
Expand Down

0 comments on commit d584637

Please sign in to comment.