Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zjuwangg committed Jan 15, 2025
1 parent 5612ffc commit 3c8a767
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class AutoAdjustStageResourceProfileSuite
test("stage contains r2c and apply new resource profile") {
withSQLConf(
GlutenConfig.COLUMNAR_SHUFFLE_ENABLED.key -> "false",
GlutenConfig.AUTO_ADJUST_STAGE_RESOURCES_C2R_OR_R2C_RATIO_THRESHOLD.key -> "0.1") {
GlutenConfig.AUTO_ADJUST_STAGE_RESOURCES_FALLEN_NODE_RATIO_THRESHOLD.key -> "0.1") {
runQueryAndCompare("select c1, count(*) from tmp1 group by c1") {
df =>
val plan = df.queryExecution.executedPlan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@ case class GlutenAutoAdjustStageResourceProfile(glutenConf: GlutenConfig, spark:
}

// case 1: check whether fallback exists and decide whether increase heap memory
val c2RorR2CCnt = planNodes.count(
p => p.isInstanceOf[ColumnarToRowTransition] || p.isInstanceOf[RowToColumnarTransition])
// todo:
val fallenNodeCnt = planNodes.count(p => !p.isInstanceOf[GlutenPlan])
val totalCount = planNodes.size

if (1.0 * c2RorR2CCnt / totalCount >= glutenConf.autoAdjustStageC2RorR2CRatioThreshold) {
if (1.0 * fallenNodeCnt / totalCount >= glutenConf.autoAdjustStageFallenNodeThreshold) {
val newMemoryAmount = memoryRequest.get.amount * glutenConf.autoAdjustStageRPHeapRatio;
val newExecutorMemory =
new ExecutorResourceRequest(ResourceProfile.MEMORY, newMemoryAmount.toLong)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,8 +505,8 @@ class GlutenConfig(conf: SQLConf) extends Logging {

def autoAdjustStageRPHeapRatio: Double = getConf(AUTO_ADJUST_STAGE_RESOURCES_HEAP_RATIO)

def autoAdjustStageC2RorR2CRatioThreshold: Double =
getConf(AUTO_ADJUST_STAGE_RESOURCES_C2R_OR_R2C_RATIO_THRESHOLD)
def autoAdjustStageFallenNodeThreshold: Double =
getConf(AUTO_ADJUST_STAGE_RESOURCES_FALLEN_NODE_RATIO_THRESHOLD)
}

object GlutenConfig {
Expand Down Expand Up @@ -2311,10 +2311,10 @@ object GlutenConfig {
.doubleConf
.createWithDefault(2.0d)

val AUTO_ADJUST_STAGE_RESOURCES_C2R_OR_R2C_RATIO_THRESHOLD =
buildConf("spark.gluten.auto.adjustStageResources.c2rORr2c.ratio.threshold")
val AUTO_ADJUST_STAGE_RESOURCES_FALLEN_NODE_RATIO_THRESHOLD =
buildConf("spark.gluten.auto.adjustStageResources.fallenNode.ratio.threshold")
.internal()
.doc("Increase executor heap memory when stage contains c2r and r2c node " +
.doc("Increase executor heap memory when stage contains fallen node " +
"count exceeds the total node count ratio.")
.doubleConf
.createWithDefault(0.5d)
Expand Down

0 comments on commit 3c8a767

Please sign in to comment.