diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/conf/AMConfiguration.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/conf/AMConfiguration.scala index edd3c001b5..4cbe0b138a 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/conf/AMConfiguration.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/conf/AMConfiguration.scala @@ -37,6 +37,11 @@ object AMConfiguration { val ACROSS_CLUSTER_MEMORY_PERCENTAGE_THRESHOLD = "MemoryPercentageThreshold" + val ACROSS_CLUSTER_TOTAL_MEMORY_PERCENTAGE_THRESHOLD: Double = CommonVars("linkis.yarn.across.cluster.memory.threshold", 0.8).getValue + + val ACROSS_CLUSTER_TOTAL_CPU_PERCENTAGE_THRESHOLD: Double = CommonVars("linkis.yarn.across.cluster.cpu.threshold", 0.8).getValue + + val ECM_ADMIN_OPERATIONS = CommonVars("wds.linkis.governance.admin.operations", "") val ENGINE_START_MAX_TIME = diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala index a6ed7c7512..209a4a4141 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala @@ -202,23 +202,37 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { (queueInfo \ "numActiveApps").asInstanceOf[JInt].values.toInt ) } else if ("fairScheduler".equals(schedulerType)) { - val childQueues = getChildQueues(resp \ "scheduler" \ "schedulerInfo" \ "rootQueue") - val queue = getQueue(childQueues) - if (queue.isEmpty || queue.get == null) { - logger.debug(s"cannot find any information about queue $queueName, response: " + resp) - throw new RMWarnException( - YARN_NOT_EXISTS_QUEUE.getErrorCode, - MessageFormat.format(YARN_NOT_EXISTS_QUEUE.getErrorDesc, queueName) + if ("root".equals(queueName)) { + // get cluster total resource + val queue = (resp \ "scheduler" \ "schedulerInfo" \ "rootQueue") + val rootQueue: Option[JValue] = Some(queue) + val rootQueueInfo = rootQueue.get.asInstanceOf[JObject] + ( + getYarnResource(rootQueue.map(_ \ "maxResources")).get, + getYarnResource(rootQueue.map(_ \ "usedResources")).get, + (rootQueueInfo \ "maxApps").asInstanceOf[JInt].values.toInt, + 0, + 0 + ) + } else { + val childQueues = getChildQueues(resp \ "scheduler" \ "schedulerInfo" \ "rootQueue") + val queue = getQueue(childQueues) + if (queue.isEmpty || queue.get == null) { + logger.debug(s"cannot find any information about queue $queueName, response: " + resp) + throw new RMWarnException( + YARN_NOT_EXISTS_QUEUE.getErrorCode, + MessageFormat.format(YARN_NOT_EXISTS_QUEUE.getErrorDesc, queueName) + ) + } + val queueInfo = queue.get.asInstanceOf[JObject] + ( + getYarnResource(queue.map(_ \ "maxResources")).get, + getYarnResource(queue.map(_ \ "usedResources")).get, + (queueInfo \ "maxApps").asInstanceOf[JInt].values.toInt, + (queueInfo \ "numPendingApps").asInstanceOf[JInt].values.toInt, + (queueInfo \ "numActiveApps").asInstanceOf[JInt].values.toInt ) } - val queueInfo = queue.get.asInstanceOf[JObject] - ( - getYarnResource(queue.map(_ \ "maxResources")).get, - getYarnResource(queue.map(_ \ "usedResources")).get, - (queueInfo \ "maxApps").asInstanceOf[JInt].values.toInt, - (queueInfo \ "numPendingApps").asInstanceOf[JInt].values.toInt, - (queueInfo \ "numActiveApps").asInstanceOf[JInt].values.toInt - ) } else { logger.debug( s"only support fairScheduler or capacityScheduler, schedulerType: $schedulerType , response: " + resp diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DriverAndYarnReqResourceService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DriverAndYarnReqResourceService.scala index b934c4123d..33bd9aff06 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DriverAndYarnReqResourceService.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DriverAndYarnReqResourceService.scala @@ -96,6 +96,15 @@ class DriverAndYarnReqResourceService( .isNotBlank(CPUPercentageThreshold) && StringUtils.isNotBlank(MemoryPercentageThreshold) ) { + val clusterYarnResource = + externalResourceService.getResource( + ResourceType.Yarn, + labelContainer, + new YarnResourceIdentifier("root") + ) + val (clusterMaxCapacity, clusterUsedCapacity) = + (clusterYarnResource.getMaxResource, clusterYarnResource.getUsedResource) + logger.info( s"user: $user, creator: $creator task enter cross cluster resource judgment, " + s"CPUThreshold: $CPUThreshold, MemoryThreshold: $MemoryThreshold," + @@ -106,6 +115,8 @@ class DriverAndYarnReqResourceService( queueLeftResource.asInstanceOf[YarnResource], usedCapacity.asInstanceOf[YarnResource], maxCapacity.asInstanceOf[YarnResource], + clusterMaxCapacity.asInstanceOf[YarnResource], + clusterUsedCapacity.asInstanceOf[YarnResource], CPUThreshold.toInt, MemoryThreshold.toInt, CPUPercentageThreshold.toDouble, diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/utils/AcrossClusterRulesJudgeUtils.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/utils/AcrossClusterRulesJudgeUtils.scala index 805785299d..4173144125 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/utils/AcrossClusterRulesJudgeUtils.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/utils/AcrossClusterRulesJudgeUtils.scala @@ -18,6 +18,7 @@ package org.apache.linkis.manager.rm.utils import org.apache.linkis.common.utils.Logging +import org.apache.linkis.manager.am.conf.AMConfiguration import org.apache.linkis.manager.common.entity.resource.YarnResource import org.apache.linkis.manager.common.exception.RMWarnException import org.apache.linkis.manager.rm.exception.RMErrorCode @@ -28,14 +29,37 @@ object AcrossClusterRulesJudgeUtils extends Logging { leftResource: YarnResource, usedResource: YarnResource, maxResource: YarnResource, + clusterMaxCapacity: YarnResource, + clusterUsedCapacity: YarnResource, leftCPUThreshold: Int, leftMemoryThreshold: Int, CPUPercentageThreshold: Double, MemoryPercentageThreshold: Double ): Unit = { - if (leftResource != null && usedResource != null && maxResource != null) { - val leftQueueMemory = leftResource.queueMemory / Math.pow(1024, 3).toLong + if ( + leftResource != null && usedResource != null && maxResource != null && clusterMaxCapacity != null && clusterUsedCapacity != null + ) { + + val clusterUsedCPUPercentage = clusterUsedCapacity.queueCores + .asInstanceOf[Double] / clusterMaxCapacity.queueCores.asInstanceOf[Double] + val clusterUsedMemoryPercentage = clusterUsedCapacity.queueMemory + .asInstanceOf[Double] / clusterMaxCapacity.queueMemory.asInstanceOf[Double] + val clusterCPUPercentageThreshold = + AMConfiguration.ACROSS_CLUSTER_TOTAL_CPU_PERCENTAGE_THRESHOLD + val clusterMemoryPercentageThreshold = + AMConfiguration.ACROSS_CLUSTER_TOTAL_MEMORY_PERCENTAGE_THRESHOLD + + if ( + clusterUsedCPUPercentage > clusterCPUPercentageThreshold && clusterUsedMemoryPercentage > clusterMemoryPercentageThreshold + ) { + throw new RMWarnException( + RMErrorCode.ACROSS_CLUSTER_RULE_FAILED.getErrorCode, + s"clusterUsedCPUPercentage: $clusterUsedCPUPercentage, CPUPercentageThreshold: $clusterCPUPercentageThreshold" + + s"clusterUsedMemoryPercentage: $clusterUsedMemoryPercentage, MemoryPercentageThreshold: $clusterMemoryPercentageThreshold" + ) + } + val leftQueueMemory = leftResource.queueMemory / Math.pow(1024, 3).toLong if (leftResource.queueCores > leftCPUThreshold && leftQueueMemory > leftMemoryThreshold) { val usedCPUPercentage = usedResource.queueCores.asInstanceOf[Double] / maxResource.queueCores