From 0a1925c1a7e8018594a32e5a716e3f986e57ea67 Mon Sep 17 00:00:00 2001 From: Terry Wang Date: Wed, 15 Jan 2025 18:03:58 +0800 Subject: [PATCH] address comments --- .../AutoAdjustStageResourceProfileSuite.scala | 2 +- .../GlutenAutoAdjustStageResourceProfile.scala | 6 +++--- .../scala/org/apache/gluten/config/GlutenConfig.scala | 10 +++++----- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/AutoAdjustStageResourceProfileSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/AutoAdjustStageResourceProfileSuite.scala index 10aa2afcefd9..f4936cca1e26 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/AutoAdjustStageResourceProfileSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/AutoAdjustStageResourceProfileSuite.scala @@ -90,7 +90,7 @@ class AutoAdjustStageResourceProfileSuite test("stage contains r2c and apply new resource profile") { withSQLConf( GlutenConfig.COLUMNAR_SHUFFLE_ENABLED.key -> "false", - GlutenConfig.AUTO_ADJUST_STAGE_RESOURCES_C2R_OR_R2C_RATIO_THRESHOLD.key -> "0.1") { + GlutenConfig.AUTO_ADJUST_STAGE_RESOURCES_FALLEN_NODE_RATIO_THRESHOLD.key -> "0.1") { runQueryAndCompare("select c1, count(*) from tmp1 group by c1") { df => val plan = df.queryExecution.executedPlan diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenAutoAdjustStageResourceProfile.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenAutoAdjustStageResourceProfile.scala index 0bfd54ee55bc..56592f5bfcad 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenAutoAdjustStageResourceProfile.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenAutoAdjustStageResourceProfile.scala @@ -93,11 +93,11 @@ case class GlutenAutoAdjustStageResourceProfile(glutenConf: GlutenConfig, spark: } // case 1: check whether fallback exists and decide whether increase heap memory - val c2RorR2CCnt = planNodes.count( - p => p.isInstanceOf[ColumnarToRowTransition] || p.isInstanceOf[RowToColumnarTransition]) + // todo: + val fallenNodeCnt = planNodes.count(p => !p.isInstanceOf[GlutenPlan]) val totalCount = planNodes.size - if (1.0 * c2RorR2CCnt / totalCount >= glutenConf.autoAdjustStageC2RorR2CRatioThreshold) { + if (1.0 * fallenNodeCnt / totalCount >= glutenConf.autoAdjustStageFallenNodeThreshold) { val newMemoryAmount = memoryRequest.get.amount * glutenConf.autoAdjustStageRPHeapRatio; val newExecutorMemory = new ExecutorResourceRequest(ResourceProfile.MEMORY, newMemoryAmount.toLong) diff --git a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala index 7619868305ab..4331b2324bd7 100644 --- a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala @@ -501,8 +501,8 @@ class GlutenConfig(conf: SQLConf) extends Logging { def autoAdjustStageRPHeapRatio: Double = getConf(AUTO_ADJUST_STAGE_RESOURCES_HEAP_RATIO) - def autoAdjustStageC2RorR2CRatioThreshold: Double = - getConf(AUTO_ADJUST_STAGE_RESOURCES_C2R_OR_R2C_RATIO_THRESHOLD) + def autoAdjustStageFallenNodeThreshold: Double = + getConf(AUTO_ADJUST_STAGE_RESOURCES_FALLEN_NODE_RATIO_THRESHOLD) } object GlutenConfig { @@ -2296,10 +2296,10 @@ object GlutenConfig { .doubleConf .createWithDefault(2.0d) - val AUTO_ADJUST_STAGE_RESOURCES_C2R_OR_R2C_RATIO_THRESHOLD = - buildConf("spark.gluten.auto.adjustStageResources.c2rORr2c.ratio.threshold") + val AUTO_ADJUST_STAGE_RESOURCES_FALLEN_NODE_RATIO_THRESHOLD = + buildConf("spark.gluten.auto.adjustStageResources.fallenNode.ratio.threshold") .internal() - .doc("Increase executor heap memory when stage contains c2r and r2c node " + + .doc("Increase executor heap memory when stage contains fallen node " + "count exceeds the total node count ratio.") .doubleConf .createWithDefault(0.5d)