Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add cluster resource threshold #330

Merged
merged 6 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ object AMConfiguration {

val ACROSS_CLUSTER_MEMORY_PERCENTAGE_THRESHOLD = "MemoryPercentageThreshold"

val ACROSS_CLUSTER_TOTAL_MEMORY_PERCENTAGE_THRESHOLD: Double = CommonVars("linkis.yarn.across.cluster.memory.threshold", 0.8).getValue

val ACROSS_CLUSTER_TOTAL_CPU_PERCENTAGE_THRESHOLD: Double = CommonVars("linkis.yarn.across.cluster.cpu.threshold", 0.8).getValue


val ECM_ADMIN_OPERATIONS = CommonVars("wds.linkis.governance.admin.operations", "")

val ENGINE_START_MAX_TIME =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,23 +202,37 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {
(queueInfo \ "numActiveApps").asInstanceOf[JInt].values.toInt
)
} else if ("fairScheduler".equals(schedulerType)) {
val childQueues = getChildQueues(resp \ "scheduler" \ "schedulerInfo" \ "rootQueue")
val queue = getQueue(childQueues)
if (queue.isEmpty || queue.get == null) {
logger.debug(s"cannot find any information about queue $queueName, response: " + resp)
throw new RMWarnException(
YARN_NOT_EXISTS_QUEUE.getErrorCode,
MessageFormat.format(YARN_NOT_EXISTS_QUEUE.getErrorDesc, queueName)
if ("root".equals(queueName)) {
// get cluster total resource
val queue = (resp \ "scheduler" \ "schedulerInfo" \ "rootQueue")
val rootQueue: Option[JValue] = Some(queue)
val rootQueueInfo = rootQueue.get.asInstanceOf[JObject]
(
getYarnResource(rootQueue.map(_ \ "maxResources")).get,
getYarnResource(rootQueue.map(_ \ "usedResources")).get,
(rootQueueInfo \ "maxApps").asInstanceOf[JInt].values.toInt,
0,
0
)
} else {
val childQueues = getChildQueues(resp \ "scheduler" \ "schedulerInfo" \ "rootQueue")
val queue = getQueue(childQueues)
if (queue.isEmpty || queue.get == null) {
logger.debug(s"cannot find any information about queue $queueName, response: " + resp)
throw new RMWarnException(
YARN_NOT_EXISTS_QUEUE.getErrorCode,
MessageFormat.format(YARN_NOT_EXISTS_QUEUE.getErrorDesc, queueName)
)
}
val queueInfo = queue.get.asInstanceOf[JObject]
(
getYarnResource(queue.map(_ \ "maxResources")).get,
getYarnResource(queue.map(_ \ "usedResources")).get,
(queueInfo \ "maxApps").asInstanceOf[JInt].values.toInt,
(queueInfo \ "numPendingApps").asInstanceOf[JInt].values.toInt,
(queueInfo \ "numActiveApps").asInstanceOf[JInt].values.toInt
)
}
val queueInfo = queue.get.asInstanceOf[JObject]
(
getYarnResource(queue.map(_ \ "maxResources")).get,
getYarnResource(queue.map(_ \ "usedResources")).get,
(queueInfo \ "maxApps").asInstanceOf[JInt].values.toInt,
(queueInfo \ "numPendingApps").asInstanceOf[JInt].values.toInt,
(queueInfo \ "numActiveApps").asInstanceOf[JInt].values.toInt
)
} else {
logger.debug(
s"only support fairScheduler or capacityScheduler, schedulerType: $schedulerType , response: " + resp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ class DriverAndYarnReqResourceService(
.isNotBlank(CPUPercentageThreshold) && StringUtils.isNotBlank(MemoryPercentageThreshold)
) {

val clusterYarnResource =
externalResourceService.getResource(
ResourceType.Yarn,
labelContainer,
new YarnResourceIdentifier("root")
)
val (clusterMaxCapacity, clusterUsedCapacity) =
(clusterYarnResource.getMaxResource, clusterYarnResource.getUsedResource)

logger.info(
s"user: $user, creator: $creator task enter cross cluster resource judgment, " +
s"CPUThreshold: $CPUThreshold, MemoryThreshold: $MemoryThreshold," +
Expand All @@ -106,6 +115,8 @@ class DriverAndYarnReqResourceService(
queueLeftResource.asInstanceOf[YarnResource],
usedCapacity.asInstanceOf[YarnResource],
maxCapacity.asInstanceOf[YarnResource],
clusterMaxCapacity.asInstanceOf[YarnResource],
clusterUsedCapacity.asInstanceOf[YarnResource],
CPUThreshold.toInt,
MemoryThreshold.toInt,
CPUPercentageThreshold.toDouble,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.linkis.manager.rm.utils

import org.apache.linkis.common.utils.Logging
import org.apache.linkis.manager.am.conf.AMConfiguration
import org.apache.linkis.manager.common.entity.resource.YarnResource
import org.apache.linkis.manager.common.exception.RMWarnException
import org.apache.linkis.manager.rm.exception.RMErrorCode
Expand All @@ -28,14 +29,37 @@ object AcrossClusterRulesJudgeUtils extends Logging {
leftResource: YarnResource,
usedResource: YarnResource,
maxResource: YarnResource,
clusterMaxCapacity: YarnResource,
clusterUsedCapacity: YarnResource,
leftCPUThreshold: Int,
leftMemoryThreshold: Int,
CPUPercentageThreshold: Double,
MemoryPercentageThreshold: Double
): Unit = {
if (leftResource != null && usedResource != null && maxResource != null) {
val leftQueueMemory = leftResource.queueMemory / Math.pow(1024, 3).toLong
if (
leftResource != null && usedResource != null && maxResource != null && clusterMaxCapacity != null && clusterUsedCapacity != null
) {

val clusterUsedCPUPercentage = clusterUsedCapacity.queueCores
.asInstanceOf[Double] / clusterMaxCapacity.queueCores.asInstanceOf[Double]
val clusterUsedMemoryPercentage = clusterUsedCapacity.queueMemory
.asInstanceOf[Double] / clusterMaxCapacity.queueMemory.asInstanceOf[Double]
val clusterCPUPercentageThreshold =
AMConfiguration.ACROSS_CLUSTER_TOTAL_CPU_PERCENTAGE_THRESHOLD
val clusterMemoryPercentageThreshold =
AMConfiguration.ACROSS_CLUSTER_TOTAL_MEMORY_PERCENTAGE_THRESHOLD

if (
clusterUsedCPUPercentage > clusterCPUPercentageThreshold && clusterUsedMemoryPercentage > clusterMemoryPercentageThreshold
) {
throw new RMWarnException(
RMErrorCode.ACROSS_CLUSTER_RULE_FAILED.getErrorCode,
s"clusterUsedCPUPercentage: $clusterUsedCPUPercentage, CPUPercentageThreshold: $clusterCPUPercentageThreshold" +
s"clusterUsedMemoryPercentage: $clusterUsedMemoryPercentage, MemoryPercentageThreshold: $clusterMemoryPercentageThreshold"
)
}

val leftQueueMemory = leftResource.queueMemory / Math.pow(1024, 3).toLong
if (leftResource.queueCores > leftCPUThreshold && leftQueueMemory > leftMemoryThreshold) {
val usedCPUPercentage =
usedResource.queueCores.asInstanceOf[Double] / maxResource.queueCores
Expand Down
Loading