Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster information should handle dynamic allocation and nodes being removed and added #1369

Open
wants to merge 17 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ mvn -Dbuildver=351 clean package

Run `mvn help:all-profiles` to list supported Spark versions.

### Running tests

The unit tests are run by default when building unless they are explicitly skipped by specifying `-DskipTests`.

To run an individual test the `-Dsuites` option can be specified:

```bash
mvn test -Dsuites=com.nvidia.spark.rapids.tool.qualification.QualificationSuite
```


### Setting up an Integrated Development Environment

Before proceeding with importing spark-rapids-tools into IDEA or switching to a different Spark release
Expand Down
130 changes: 87 additions & 43 deletions core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ object PlatformNames {
)
}

case class DynamicAllocationInfo(enabled: Boolean, max: String, min: String, initial: String)

// resource information and name of the CSP instance types, or for onprem its
// the executor information since we can't recommend node types
case class InstanceInfo(cores: Int, memoryMB: Long, name: String, numGpus: Int)
Expand Down Expand Up @@ -211,7 +213,6 @@ abstract class Platform(var gpuDevice: Option[GpuDevice],
*/
def getRetainedSystemProps: Set[String] = Set.empty


def getExecutorHeapMemoryMB(sparkProperties: Map[String, String]): Long = {
// Potentially enhance this to handle if no config then check the executor
// added or resource profile added events for the heap size
Expand Down Expand Up @@ -288,28 +289,20 @@ abstract class Platform(var gpuDevice: Option[GpuDevice],
math.max(1, gpus)
}

// Get the number of nodes that were used in the source cluster.
def getSourceNumNodes(): Int = {
if (clusterProperties.isDefined) {
Math.max(1, clusterProperties.get.system.numWorkers)
} else if (clusterInfoFromEventLog.isDefined) {
clusterInfoFromEventLog.get.numWorkerNodes
} else {
1
}
}

// we want to keep the number of executors used between runs the same
def getNumExecutorInstances(sparkProperties: Map[String, String]): Int = {
val dynamicAllocationEnabled = Platform.isDynamicAllocationEnabled(sparkProperties)
val execInstFromProps = sparkProperties.get("spark.executor.instances")
// If the cluster properties were specified make sure to use those and not
// the eventlog inference. This is broken in my mind but is backwards compatible,
// or maybe use number gpus per node as an improvement.
if (clusterProperties.isDefined) {
val numWorkers = Math.max(1, clusterProperties.get.system.numWorkers)
this.numGpus * numWorkers
} else if (execInstFromProps.isDefined) {
} else if (execInstFromProps.isDefined && !dynamicAllocationEnabled) {
execInstFromProps.get.toInt
} else if (clusterInfoFromEventLog.isDefined) {
val clusterInfo = clusterInfoFromEventLog.get
clusterInfo.numWorkerNodes * clusterInfo.numExecsPerNode
clusterInfoFromEventLog.get.numExecutors
} else {
// not sure so don't set it
0
Expand Down Expand Up @@ -338,24 +331,28 @@ abstract class Platform(var gpuDevice: Option[GpuDevice],

def createClusterInfo(coresPerExecutor: Int,
numExecsPerNode: Int,
numExecs: Int,
numWorkerNodes: Int,
sparkProperties: Map[String, String],
systemProperties: Map[String, String]): ExistingClusterInfo = {
val driverHost = sparkProperties.get("spark.driver.host")
val executorHeapMem = getExecutorHeapMemoryMB(sparkProperties)
ExistingClusterInfo(platformName, coresPerExecutor, numExecsPerNode, numWorkerNodes,
executorHeapMem, driverHost = driverHost)
val dynamicAllocSettings = Platform.getDynamicAllocationSettings(sparkProperties)
ExistingClusterInfo(platformName, coresPerExecutor, numExecsPerNode, numExecs, numWorkerNodes,
executorHeapMem, dynamicAllocSettings.enabled, dynamicAllocSettings.max,
dynamicAllocSettings.min, dynamicAllocSettings.initial, driverHost = driverHost)
}

// set the cluster information for this platform based on what we found in the
// eventlog
def configureClusterInfoFromEventLog(coresPerExecutor: Int,
execsPerNode: Int,
numExecs: Int,
numExecutorNodes: Int,
sparkProperties: Map[String, String],
systemProperties: Map[String, String]): Unit = {
clusterInfoFromEventLog = Some(createClusterInfo(coresPerExecutor, execsPerNode,
numExecutorNodes, sparkProperties, systemProperties))
numExecs, numExecutorNodes, sparkProperties, systemProperties))
}

override def toString: String = {
Expand Down Expand Up @@ -383,17 +380,30 @@ abstract class Platform(var gpuDevice: Option[GpuDevice],
*/
def getGPUInstanceTypeRecommendation(
sparkProperties: Map[String, String]): Option[RecommendedClusterInfo] = {
val initialNumExecInstances = getNumExecutorInstances(sparkProperties)
val vendor = clusterInfoFromEventLog.map(_.vendor).getOrElse("")
val numExecs = getNumExecutorInstances(sparkProperties)
// If the cluster properties were specified make sure to use those and not
// the eventlog inference. This is broken in my mind but is backwards compatible,
// or maybe use number gpus per node as an improvement.
val origClusterNumExecsPerNode = clusterInfoFromEventLog.map(_.numExecsPerNode).getOrElse(1)
val numExecsPerNode = if (clusterProperties.isEmpty) {
clusterInfoFromEventLog.map(_.numExecsPerNode).getOrElse(1)
// numExecsPerNode can be -1 if dynamic allocation so just make it 1 for
// this set of calculations. However if we are on a CSP then we want to recommend
// the best size machine so use the number of GPUs as proxy to be the number of executors
// we could put on a node.
if (origClusterNumExecsPerNode == -1) {
maxGpusSupported
} else {
origClusterNumExecsPerNode
}
} else {
1
}
// onprem yarn multi-tenant vs yarn static cluster (dataproc) for just that application
// should be handled automatically unless heterogeneous nodes
val gpusToUse =
Math.max(this.numGpus, Math.min(numExecsPerNode, maxGpusSupported))

// update the global numGpus based on the instance type we are using
this.numGpus = gpusToUse
val nodeCores = if (clusterProperties.isDefined) {
Expand Down Expand Up @@ -424,11 +434,6 @@ abstract class Platform(var gpuDevice: Option[GpuDevice],
logWarning("cluster information from event log is missing, executor cores set to 0!")
0
}
val numExecsPerNode = if (clusterInfoFromEventLog.isDefined) {
clusterInfoFromEventLog.get.numExecsPerNode
} else {
1
}
val nodeCoresToUse = execCores * gpusToUse
val nodeMemMB = getMemoryMBPerNode(sparkProperties)
// It's possible if a cpu run was used, it could run with multiple executors, but
Expand All @@ -454,29 +459,37 @@ abstract class Platform(var gpuDevice: Option[GpuDevice],
} else {
instanceInfoOpt
}
val numExistingNodes = getSourceNumNodes
// check if instance type supports that number of gpus, if not we add extra executors
val (numExecs, numNodes) = if (finalInstanceInfo.get.numGpus >= numExecsPerNode) {
// TODO - really if instance has more GPUs we should calculate the other way to
// recommend less nodes but leave that open for now
(initialNumExecInstances, numExistingNodes)
} else {
// just flatten to use 1 but we should really see if multiples
val numGpusLeft = numExecsPerNode / finalInstanceInfo.get.numGpus
(initialNumExecInstances, numExistingNodes * numGpusLeft)
}
// note this is going over as for instance if you have 4 gpus per node but only need
// 10 executors, this would tell you to allocate enough to fit 12.
val numNodes = math.ceil(numExecs.toDouble / finalInstanceInfo.get.numGpus).toInt
val coresPerExec = if (finalInstanceInfo.isDefined) {
finalInstanceInfo.get.cores / finalInstanceInfo.get.numGpus
// We may not be able to match instance type up exactly, this means the number of
// cores per executor could come out to be more then the original application.
// For now we want the cores per executor to stay the same as original app so if
// that is set, use it first.
if (clusterInfoFromEventLog.isDefined) {
clusterInfoFromEventLog.get.coresPerExecutor
} else {
finalInstanceInfo.get.cores / finalInstanceInfo.get.numGpus
}
} else {
1
}
val finalNumNodes = if (vendor == PlatformNames.ONPREM) {
// if its onprem we really have no idea of the size of the cluster
-1
} else {
numNodes
}
if (numExecs > 0) {
val vendor = clusterInfoFromEventLog.map(_.vendor).getOrElse("")
val instanceName = finalInstanceInfo.map(_.name).getOrElse("")
val numGpus = finalInstanceInfo.map(_.numGpus).getOrElse(1)
val dynamicAllocSettings = Platform.getDynamicAllocationSettings(sparkProperties)
// Num of executors per node is the number of GPUs
recommendedClusterInfo = Some(RecommendedClusterInfo(vendor, coresPerExec,
numNodes, numGpus, numExecs, gpuDevice = getGpuOrDefault.toString,
finalNumNodes, numGpus, numExecs, gpuDevice = getGpuOrDefault.toString,
dynamicAllocSettings.enabled, dynamicAllocSettings.max,
dynamicAllocSettings.min, dynamicAllocSettings.initial,
workerNodeType = Some(instanceName)))
recommendedNodeInstanceInfo = finalInstanceInfo
recommendedClusterInfo
Expand Down Expand Up @@ -505,6 +518,7 @@ abstract class DatabricksPlatform(gpuDevice: Option[GpuDevice],

override def createClusterInfo(coresPerExecutor: Int,
numExecsPerNode: Int,
numExecs: Int,
numWorkerNodes: Int,
sparkProperties: Map[String, String],
systemProperties: Map[String, String]): ExistingClusterInfo = {
Expand All @@ -514,8 +528,11 @@ abstract class DatabricksPlatform(gpuDevice: Option[GpuDevice],
val driverHost = sparkProperties.get("spark.driver.host")
val clusterName = sparkProperties.get(DatabricksParseHelper.PROP_TAG_CLUSTER_NAME_KEY)
val executorHeapMem = getExecutorHeapMemoryMB(sparkProperties)
ExistingClusterInfo(platformName, coresPerExecutor, numExecsPerNode, numWorkerNodes,
executorHeapMem, driverNodeType, workerNodeType, driverHost, clusterId, clusterName)
val dynamicAllocSettings = Platform.getDynamicAllocationSettings(sparkProperties)
ExistingClusterInfo(platformName, coresPerExecutor, numExecsPerNode, numExecs, numWorkerNodes,
executorHeapMem, dynamicAllocSettings.enabled, dynamicAllocSettings.max,
dynamicAllocSettings.min, dynamicAllocSettings.initial, driverNodeType,
workerNodeType, driverHost, clusterId, clusterName)
}
}

Expand Down Expand Up @@ -629,14 +646,18 @@ class EmrPlatform(gpuDevice: Option[GpuDevice],

override def createClusterInfo(coresPerExecutor: Int,
numExecsPerNode: Int,
numExecs: Int,
numWorkerNodes: Int,
sparkProperties: Map[String, String],
systemProperties: Map[String, String]): ExistingClusterInfo = {
val clusterId = systemProperties.get("EMR_CLUSTER_ID")
val driverHost = sparkProperties.get("spark.driver.host")
val executorHeapMem = getExecutorHeapMemoryMB(sparkProperties)
ExistingClusterInfo(platformName, coresPerExecutor, numExecsPerNode, numWorkerNodes,
executorHeapMem, clusterId = clusterId, driverHost = driverHost)
val dynamicAllocSettings = Platform.getDynamicAllocationSettings(sparkProperties)
ExistingClusterInfo(platformName, coresPerExecutor, numExecsPerNode, numExecs,
numWorkerNodes, executorHeapMem, dynamicAllocSettings.enabled, dynamicAllocSettings.max,
dynamicAllocSettings.min, dynamicAllocSettings.initial, clusterId = clusterId,
driverHost = driverHost)
}

override def getInstanceByResources(
Expand Down Expand Up @@ -672,6 +693,29 @@ class OnPremPlatform(gpuDevice: Option[GpuDevice],
override def maxGpusSupported: Int = 1
}

object Platform {
def isDynamicAllocationEnabled(sparkProperties: Map[String, String]): Boolean = {
sparkProperties.getOrElse("spark.dynamicAllocation.enabled", "false").toBoolean
}

def getDynamicAllocationSettings(sparkProperties: Map[String, String]): DynamicAllocationInfo = {
val dynamicAllocationEnabled = isDynamicAllocationEnabled(sparkProperties)
if (dynamicAllocationEnabled) {
val dynamicAllocationMax = sparkProperties.
getOrElse("spark.dynamicAllocation.maxExecutors", Int.MaxValue.toString)
val dynamicAllocationMin = sparkProperties.
getOrElse("spark.dynamicAllocation.minExecutors", "0")
val dynamicAllocationInit = sparkProperties.
getOrElse("spark.dynamicAllocation.initialExecutors", sparkProperties.
getOrElse("spark.executor.instances", dynamicAllocationMin))
DynamicAllocationInfo(dynamicAllocationEnabled, dynamicAllocationMax,
dynamicAllocationMin, dynamicAllocationInit)
} else {
DynamicAllocationInfo(dynamicAllocationEnabled, "N/A", "N/A", "N/A")
}
}
}

/**
* Factory for creating instances of different platforms.
* This factory supports various platforms and provides methods for creating
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1393,14 +1393,12 @@ object AutoTuner extends Logging {
}

def buildAutoTuner(
workerInfoFilePath: String,
singleAppProvider: AppSummaryInfoBaseProvider,
platform: Platform = PlatformFactory.createInstance(clusterProperties = None),
platform: Platform,
driverInfoProvider: DriverLogInfoProvider = BaseDriverLogInfoProvider.noneDriverLog
): AutoTuner = {
try {
val clusterPropsOpt = loadClusterProps(workerInfoFilePath)
val autoT = new AutoTuner(clusterPropsOpt.getOrElse(new ClusterProperties()),
val autoT = new AutoTuner(platform.clusterProperties.getOrElse(new ClusterProperties()),
singleAppProvider, platform, driverInfoProvider)
autoT
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,10 +409,9 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
if (appInfo.isDefined && appInfo.get.appInfo.head.pluginEnabled) {
val appInfoProvider = AppSummaryInfoBaseProvider.fromAppInfo(appInfo)
val workerInfoPath = appArgs.workerInfo.getOrElse(AutoTuner.DEFAULT_WORKER_INFO_PATH)
val platform = appArgs.platform()
val clusterPropsOpt = loadClusterProps(workerInfoPath)
val autoTuner: AutoTuner = AutoTuner.buildAutoTuner(workerInfoPath, appInfoProvider,
PlatformFactory.createInstance(platform, clusterPropsOpt), driverInfoProvider)
val autoTuner: AutoTuner = AutoTuner.buildAutoTuner(appInfoProvider,
PlatformFactory.createInstance(appArgs.platform(), clusterPropsOpt), driverInfoProvider)

// The autotuner allows skipping some properties,
// e.g., getRecommendedProperties(Some(Seq("spark.executor.instances"))) skips the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ object OpSuppLevel extends Enumeration {
* by the plugin which lists the formats and types supported.
* The class also supports a custom speedup factor file as input.
*/
class PluginTypeChecker(val platform: Platform = PlatformFactory.createInstance(),
class PluginTypeChecker(platform: Platform = PlatformFactory.createInstance(),
speedupFactorFile: Option[String] = None) extends Logging {
private val NONE = "None"

Expand Down
Loading