Skip to content

Commit

Permalink
Account for spark.executor.pyspark.memory in Yunikorn gang scheduling (
Browse files Browse the repository at this point in the history
…kubeflow#2178)

Signed-off-by: Jacob Salway <jacob.salway@gmail.com>
  • Loading branch information
jacobsalway authored Sep 19, 2024
1 parent ed3226e commit a2f71c6
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 1 deletion.
27 changes: 26 additions & 1 deletion internal/scheduler/yunikorn/resourceusage/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,26 @@ func memoryRequestBytes(podSpec *v1beta2.SparkPodSpec, memoryOverheadFactor floa
return memoryBytes + memoryOverheadBytes, nil
}

func executorPysparkMemoryBytes(app *v1beta2.SparkApplication) (int64, error) {
pysparkMemory, found := app.Spec.SparkConf["spark.executor.pyspark.memory"]
if app.Spec.Type != v1beta2.SparkApplicationTypePython || !found {
return 0, nil
}

// This fields defaults to mebibytes if no resource suffix is specified
// https://github.com/apache/spark/blob/7de71a2ec78d985c2a045f13c1275101b126cec4/docs/configuration.md?plain=1#L289-L305
if _, err := strconv.Atoi(pysparkMemory); err == nil {
pysparkMemory = pysparkMemory + "m"
}

pysparkMemoryBytes, err := byteStringAsBytes(pysparkMemory)
if err != nil {
return 0, nil
}

return pysparkMemoryBytes, nil
}

func bytesToMi(b int64) string {
// this floors the value to the nearest mebibyte
return fmt.Sprintf("%dMi", b/1024/1024)
Expand Down Expand Up @@ -103,6 +123,11 @@ func executorMemoryRequest(app *v1beta2.SparkApplication) (string, error) {
return "", err
}

pysparkMemoryBytes, err := executorPysparkMemoryBytes(app)
if err != nil {
return "", err
}

// See comment above in driver
return bytesToMi(requestBytes), nil
return bytesToMi(requestBytes + pysparkMemoryBytes), nil
}
43 changes: 43 additions & 0 deletions internal/scheduler/yunikorn/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,49 @@ func TestSchedule(t *testing.T) {
},
},
},
{
name: "spark.executor.pyspark.memory",
app: &v1beta2.SparkApplication{
Spec: v1beta2.SparkApplicationSpec{
Type: v1beta2.SparkApplicationTypePython,
Driver: v1beta2.DriverSpec{
SparkPodSpec: v1beta2.SparkPodSpec{
Cores: util.Int32Ptr(1),
Memory: util.StringPtr("512m"),
},
},
Executor: v1beta2.ExecutorSpec{
Instances: util.Int32Ptr(2),
SparkPodSpec: v1beta2.SparkPodSpec{
Cores: util.Int32Ptr(1),
Memory: util.StringPtr("512m"),
},
},
SparkConf: map[string]string{
"spark.executor.pyspark.memory": "500m",
},
},
},
expected: []taskGroup{
{
Name: "spark-driver",
MinMember: 1,
MinResource: map[string]string{
"cpu": "1",
"memory": "896Mi", // 512Mi + 384Mi min overhead
},
},
{
Name: "spark-executor",
MinMember: 2,
MinResource: map[string]string{
"cpu": "1",
// 512Mi + 384Mi min overhead + 500Mi spark.executor.pyspark.memory
"memory": "1396Mi",
},
},
},
},
}

scheduler := &Scheduler{}
Expand Down

0 comments on commit a2f71c6

Please sign in to comment.