Skip to content

Commit

Permalink
[SPARK-47208][CORE] Allow overriding base overhead memory
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
We can already select the desired overhead memory directly via the `spark.driver/executor.memoryOverhead` flags, however, if that flag is not present the overhead memory calculation goes as follows:

```
overhead_memory = Max(384, 'spark.driver/executor.memory' * 'spark.driver/executor.memoryOverheadFactor')

[where the 'memoryOverheadFactor' flag defaults to 0.1]
```

This PR adds two new spark configs: `spark.driver.minMemoryOverhead` and `spark.executor.minMemoryOverhead`, which can be used to override the 384Mib minimum value.

The memory overhead calculation will now be :

```
min_memory = sparkConf.get('spark.driver/executor.minMemoryOverhead').getOrElse(384)

overhead_memory = Max(min_memory, 'spark.driver/executor.memory' * 'spark.driver/executor.memoryOverheadFactor')
```

### Why are the changes needed?
There are certain times where being able to override the 384Mb minimum directly can be beneficial. We may have a scenario where a lot of off-heap operations are performed (ex: using package managers/native compression/decompression) where we don't have a need for a large JVM heap but we may still need a signficant amount of memory in the spark node.

Using the `memoryOverheadFactor` config flag may not prove appropriate, since we may not want the overhead allocation to directly scale with JVM memory, as a cost saving/resource limitation problem.

### Does this PR introduce _any_ user-facing change?
Yes, as described above, two new flags have been added to the spark config. No break of existing behaviours.

### How was this patch tested?
Added tests for 3 cases:
- If `spark.driver/executor.memoryOverhead` is set, then the new changes have no effect.
- If  `spark.driver/executor.minMemoryOverhead` is set and its value is higher than  'spark.driver/executor.memory' * 'spark.driver/executor.memoryOverheadFactor', the total memory will be the allocated JVM memory + `spark.driver/executor.minMemoryOverhead`
- If  `spark.driver/executor.minMemoryOverhead` but its value is lower than 'spark.driver/executor.memory' * 'spark.driver/executor.memoryOverheadFactor', the total memory will be the allocated JVM memory + 'spark.driver/executor.memory' * 'spark.driver/executor.memoryOverheadFactor'.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#45240 from jpcorreia99/jcorrreia/MinOverheadMemoryOverride.

Authored-by: jpcorreia99 <jpcorreia99@gmail.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
  • Loading branch information
jpcorreia99 authored and tgravescs committed Mar 14, 2024
1 parent 63b79c1 commit 4b073fd
Show file tree
Hide file tree
Showing 12 changed files with 252 additions and 23 deletions.
17 changes: 17 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,14 @@ package object config {
.bytesConf(ByteUnit.MiB)
.createOptional

private[spark] val DRIVER_MIN_MEMORY_OVERHEAD = ConfigBuilder("spark.driver.minMemoryOverhead")
.doc("The minimum amount of non-heap memory to be allocated per driver in cluster mode, " +
"in MiB unless otherwise specified. This value is ignored if " +
"spark.driver.memoryOverhead is set directly.")
.version("4.0.0")
.bytesConf(ByteUnit.MiB)
.createWithDefaultString("384m")

private[spark] val DRIVER_MEMORY_OVERHEAD_FACTOR =
ConfigBuilder("spark.driver.memoryOverheadFactor")
.doc("Fraction of driver memory to be allocated as additional non-heap memory per driver " +
Expand Down Expand Up @@ -358,6 +366,15 @@ package object config {
.bytesConf(ByteUnit.MiB)
.createOptional

private[spark] val EXECUTOR_MIN_MEMORY_OVERHEAD =
ConfigBuilder("spark.executor.minMemoryOverhead")
.doc("The minimum amount of non-heap memory to be allocated per executor " +
"in MiB unless otherwise specified. This value is ignored if " +
"spark.executor.memoryOverhead is set directly.")
.version("4.0.0")
.bytesConf(ByteUnit.MiB)
.createWithDefaultString("384m")

private[spark] val EXECUTOR_MEMORY_OVERHEAD_FACTOR =
ConfigBuilder("spark.executor.memoryOverheadFactor")
.doc("Fraction of executor memory to be allocated as additional non-heap memory per " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,6 @@ object ResourceProfile extends Logging {
val UNKNOWN_RESOURCE_PROFILE_ID = -1
val DEFAULT_RESOURCE_PROFILE_ID = 0

private[spark] val MEMORY_OVERHEAD_MIN_MIB = 384L

private lazy val nextProfileId = new AtomicInteger(0)
private val DEFAULT_PROFILE_LOCK = new Object()

Expand Down Expand Up @@ -489,10 +487,11 @@ object ResourceProfile extends Logging {

private[spark] def calculateOverHeadMemory(
overHeadMemFromConf: Option[Long],
minimumOverHeadMemoryFromConf: Long,
executorMemoryMiB: Long,
overheadFactor: Double): Long = {
overHeadMemFromConf.getOrElse(math.max((overheadFactor * executorMemoryMiB).toInt,
ResourceProfile.MEMORY_OVERHEAD_MIN_MIB))
minimumOverHeadMemoryFromConf))
}

/**
Expand All @@ -504,6 +503,7 @@ object ResourceProfile extends Logging {
private[spark] def getResourcesForClusterManager(
rpId: Int,
execResources: Map[String, ExecutorResourceRequest],
minimumOverheadMemory: Long,
overheadFactor: Double,
conf: SparkConf,
isPythonApp: Boolean,
Expand All @@ -515,7 +515,7 @@ object ResourceProfile extends Logging {
var memoryOffHeapMiB = defaultResources.memoryOffHeapMiB
var pysparkMemoryMiB = defaultResources.pysparkMemoryMiB.getOrElse(0L)
var memoryOverheadMiB = calculateOverHeadMemory(defaultResources.memoryOverheadMiB,
executorMemoryMiB, overheadFactor)
minimumOverheadMemory, executorMemoryMiB, overheadFactor)

val finalCustomResources = if (rpId != DEFAULT_RESOURCE_PROFILE_ID) {
val customResources = new mutable.HashMap[String, ExecutorResourceRequest]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class ResourceProfileSuite extends SparkFunSuite with MockitoSugar {
new ExecutorResourceRequests().cores(4)
val rp = rpBuilder.require(taskReq).require(execReq).build()
val executorResourceForRp = ResourceProfile.getResourcesForClusterManager(
rp.id, rp.executorResources, 0.0, sparkConf, false, Map.empty)
rp.id, rp.executorResources, 500L, 0.0, sparkConf, false, Map.empty)
// Standalone cluster only take cores and executor memory as built-in resources.
assert(executorResourceForRp.cores.get === 4)
assert(executorResourceForRp.executorMemoryMiB === 1024L)
Expand Down
22 changes: 20 additions & 2 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ of the most common options to set are:
</tr>
<tr>
<td><code>spark.driver.memoryOverhead</code></td>
<td>driverMemory * <code>spark.driver.memoryOverheadFactor</code>, with minimum of 384 </td>
<td>driverMemory * <code>spark.driver.memoryOverheadFactor</code>, with minimum of <code>spark.driver.minMemoryOverhead</code></td>
<td>
Amount of non-heap memory to be allocated per driver process in cluster mode, in MiB unless
otherwise specified. This is memory that accounts for things like VM overheads, interned strings,
Expand All @@ -202,6 +202,15 @@ of the most common options to set are:
</td>
<td>2.3.0</td>
</tr>
<tr>
<td><code>spark.driver.minMemoryOverhead</code></td>
<td>384m</td>
<td>
The minimum amount of non-heap memory to be allocated per driver process in cluster mode, in MiB unless otherwise specified, if <code>spark.driver.memoryOverhead</code> is not defined.
This option is currently supported on YARN and Kubernetes.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.driver.memoryOverheadFactor</code></td>
<td>0.10</td>
Expand Down Expand Up @@ -291,7 +300,7 @@ of the most common options to set are:
</tr>
<tr>
<td><code>spark.executor.memoryOverhead</code></td>
<td>executorMemory * <code>spark.executor.memoryOverheadFactor</code>, with minimum of 384 </td>
<td>executorMemory * <code>spark.executor.memoryOverheadFactor</code>, with minimum of <code>spark.executor.minMemoryOverhead</code></td>
<td>
Amount of additional memory to be allocated per executor process, in MiB unless otherwise specified.
This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
Expand All @@ -306,6 +315,15 @@ of the most common options to set are:
</td>
<td>2.3.0</td>
</tr>
<tr>
<td><code>spark.driver.minMemoryOverhead</code></td>
<td>384m</td>
<td>
The minimum amount of non-heap memory to be allocated per executor process, in MiB unless otherwise specified, if <code>spark.executor.memoryOverhead</code> is not defined.
This option is currently supported on YARN and Kubernetes.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.executor.memoryOverheadFactor</code></td>
<td>0.10</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.internal.config._
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -67,17 +66,19 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
conf.get(MEMORY_OVERHEAD_FACTOR)
}

private val driverMinimumMemoryOverhead = conf.get(DRIVER_MIN_MEMORY_OVERHEAD)

// Prefer the driver memory overhead factor if set explicitly
private val memoryOverheadFactor = if (conf.contains(DRIVER_MEMORY_OVERHEAD_FACTOR)) {
conf.get(DRIVER_MEMORY_OVERHEAD_FACTOR)
} else {
defaultOverheadFactor
}

private val memoryOverheadMiB = conf
private val memoryOverheadMiB: Long = conf
.get(DRIVER_MEMORY_OVERHEAD)
.getOrElse(math.max((memoryOverheadFactor * driverMemoryMiB).toInt,
ResourceProfile.MEMORY_OVERHEAD_MIN_MIB))
driverMinimumMemoryOverhead))
private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB

override def configurePod(pod: SparkPod): SparkPod = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ private[spark] class BasicExecutorFeatureStep(
private val isDefaultProfile = resourceProfile.id == ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
private val isPythonApp = kubernetesConf.get(APP_RESOURCE_TYPE) == Some(APP_RESOURCE_TYPE_PYTHON)
private val disableConfigMap = kubernetesConf.get(KUBERNETES_EXECUTOR_DISABLE_CONFIGMAP)
private val minimumMemoryOverhead = kubernetesConf.get(EXECUTOR_MIN_MEMORY_OVERHEAD)
private val memoryOverheadFactor = if (kubernetesConf.contains(EXECUTOR_MEMORY_OVERHEAD_FACTOR)) {
kubernetesConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR)
} else {
Expand All @@ -68,6 +69,7 @@ private[spark] class BasicExecutorFeatureStep(
val execResources = ResourceProfile.getResourcesForClusterManager(
resourceProfile.id,
resourceProfile.executorResources,
minimumMemoryOverhead,
memoryOverheadFactor,
kubernetesConf.sparkConf,
isPythonApp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.deploy.k8s.features.KubernetesFeaturesTestUtils.TestReso
import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.UI._
import org.apache.spark.resource.{ResourceID, ResourceProfile}
import org.apache.spark.resource.ResourceID
import org.apache.spark.resource.ResourceUtils._
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -205,7 +205,8 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
test(s"memory overhead factor new config: $name") {
// Choose a driver memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB
val driverMem =
ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2
DRIVER_MIN_MEMORY_OVERHEAD.defaultValue.get /
DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2

// main app resource, overhead factor
val sparkConf = new SparkConf(false)
Expand Down Expand Up @@ -235,7 +236,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
test(s"memory overhead factor old config: $name") {
// Choose a driver memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB
val driverMem =
ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2
DRIVER_MIN_MEMORY_OVERHEAD.defaultValue.get / MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2

// main app resource, overhead factor
val sparkConf = new SparkConf(false)
Expand All @@ -259,7 +260,8 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
test(s"SPARK-38194: memory overhead factor precendence") {
// Choose a driver memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB
val driverMem =
ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2
DRIVER_MIN_MEMORY_OVERHEAD.defaultValue.get /
DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2

// main app resource, overhead factor
val sparkConf = new SparkConf(false)
Expand Down Expand Up @@ -288,7 +290,8 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
test(s"SPARK-38194: old memory factor settings is applied if new one isn't given") {
// Choose a driver memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB
val driverMem =
ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2
DRIVER_MIN_MEMORY_OVERHEAD.defaultValue.get /
DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2

// main app resource, overhead factor
val sparkConf = new SparkConf(false)
Expand Down Expand Up @@ -372,6 +375,78 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
path.startsWith(FILE_UPLOAD_PATH) && path.endsWith("some-local-jar.jar")))
}

test("SPARK-47208: User can override the minimum memory overhead of the driver") {
val sparkConf = new SparkConf()
.set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
.set(DRIVER_MEMORY.key, "256M")
.set(DRIVER_MIN_MEMORY_OVERHEAD, 500L)
.set(CONTAINER_IMAGE, "spark-driver:latest")
val kubernetesConf: KubernetesDriverConf = KubernetesTestConf.createDriverConf(
sparkConf = sparkConf,
labels = CUSTOM_DRIVER_LABELS,
environment = DRIVER_ENVS,
annotations = DRIVER_ANNOTATIONS)

val featureStep = new BasicDriverFeatureStep(kubernetesConf)
val basePod = SparkPod.initialPod()
val configuredPod = featureStep.configurePod(basePod)

val resourceRequirements = configuredPod.container.getResources
val requests = resourceRequirements.getRequests.asScala
assert(amountAndFormat(requests("memory")) === "756Mi")
val limits = resourceRequirements.getLimits.asScala
assert(amountAndFormat(limits("memory")) === "756Mi")
}

test("SPARK-47208: Explicit overhead takes precedence over minimum overhead") {
val sparkConf = new SparkConf()
.set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
.set(DRIVER_MEMORY.key, "256M")
.set(DRIVER_MIN_MEMORY_OVERHEAD, 500L)
.set(DRIVER_MEMORY_OVERHEAD, 200L)
.set(CONTAINER_IMAGE, "spark-driver:latest")
val kubernetesConf: KubernetesDriverConf = KubernetesTestConf.createDriverConf(
sparkConf = sparkConf,
labels = CUSTOM_DRIVER_LABELS,
environment = DRIVER_ENVS,
annotations = DRIVER_ANNOTATIONS)

val featureStep = new BasicDriverFeatureStep(kubernetesConf)
val basePod = SparkPod.initialPod()
val configuredPod = featureStep.configurePod(basePod)

val resourceRequirements = configuredPod.container.getResources
val requests = resourceRequirements.getRequests.asScala
assert(amountAndFormat(requests("memory")) === "456Mi")
val limits = resourceRequirements.getLimits.asScala
assert(amountAndFormat(limits("memory")) === "456Mi")
}

test("SPARK-47208: Overhead is maximum between factor of memory and min base overhead") {
val sparkConf = new SparkConf()
.set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
.set(DRIVER_MEMORY.key, "5000M")
.set(DRIVER_MIN_MEMORY_OVERHEAD, 200L)
.set(CONTAINER_IMAGE, "spark-driver:latest")
val kubernetesConf: KubernetesDriverConf = KubernetesTestConf.createDriverConf(
sparkConf = sparkConf,
labels = CUSTOM_DRIVER_LABELS,
environment = DRIVER_ENVS,
annotations = DRIVER_ANNOTATIONS)

val featureStep = new BasicDriverFeatureStep(kubernetesConf)
val basePod = SparkPod.initialPod()
val configuredPod = featureStep.configurePod(basePod)

val resourceRequirements = configuredPod.container.getResources
val requests = resourceRequirements.getRequests.asScala
// mem = 5000 + max(overhead_factor[0.1] * 5000, 200)
assert(amountAndFormat(requests("memory")) === "5500Mi")
val limits = resourceRequirements.getLimits.asScala
assert(amountAndFormat(limits("memory")) === "5500Mi")
}


def containerPort(name: String, portNumber: Int): ContainerPort =
new ContainerPortBuilder()
.withName(name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
test(s"SPARK-38194: memory overhead factor precendence") {
// Choose an executor memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB
val defaultFactor = EXECUTOR_MEMORY_OVERHEAD_FACTOR.defaultValue.get
val executorMem = ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / defaultFactor * 2
val executorMem = EXECUTOR_MIN_MEMORY_OVERHEAD.defaultValue.get / defaultFactor * 2

// main app resource, overhead factor
val sparkConf = new SparkConf(false)
Expand All @@ -487,7 +487,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
test(s"SPARK-38194: old memory factor settings is applied if new one isn't given") {
// Choose an executor memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB
val defaultFactor = EXECUTOR_MEMORY_OVERHEAD_FACTOR.defaultValue.get
val executorMem = ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / defaultFactor * 2
val executorMem = EXECUTOR_MIN_MEMORY_OVERHEAD.defaultValue.get / defaultFactor * 2

// main app resource, overhead factor
val sparkConf = new SparkConf(false)
Expand Down Expand Up @@ -524,6 +524,45 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
assert(podConfigured1.container.getPorts.contains(ports))
}

test("SPARK-47208: User can override the minimum memory overhead of the executor") {
// main app resource, overriding the minimum oberhead to 500Mb
val sparkConf = new SparkConf(false)
.set(CONTAINER_IMAGE, "spark-driver:latest")
.set(EXECUTOR_MIN_MEMORY_OVERHEAD, 500L)

val conf = KubernetesTestConf.createExecutorConf(
sparkConf = sparkConf)
ResourceProfile.clearDefaultProfile()
val resourceProfile = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
val step = new BasicExecutorFeatureStep(conf, new SecurityManager(baseConf),
resourceProfile)
val executor = step.configurePod(SparkPod.initialPod())

// memory = 1024M (default) + 500B (minimum overhead got overridden from the 384Mib)
assert(amountAndFormat(executor.container.getResources
.getLimits.get("memory")) === "1524Mi")
}

test("SPARK-47208: Explicit overhead takes precedence over minimum overhead") {
// main app resource, explicit overhead of 150MiB
val sparkConf = new SparkConf(false)
.set(CONTAINER_IMAGE, "spark-driver:latest")
.set(EXECUTOR_MEMORY_OVERHEAD, 150L)
.set(EXECUTOR_MIN_MEMORY_OVERHEAD, 500L)

val conf = KubernetesTestConf.createExecutorConf(
sparkConf = sparkConf)
ResourceProfile.clearDefaultProfile()
val resourceProfile = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
val step = new BasicExecutorFeatureStep(conf, new SecurityManager(baseConf),
resourceProfile)
val executor = step.configurePod(SparkPod.initialPod())

// memory = 1024M + 150MB (overrides any other overhead calculation)
assert(amountAndFormat(executor.container.getResources
.getLimits.get("memory")) === "1174Mi")
}

// There is always exactly one controller reference, and it points to the driver pod.
private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = {
assert(executor.getMetadata.getOwnerReferences.size() === 1)
Expand Down
Loading

0 comments on commit 4b073fd

Please sign in to comment.