diff --git a/native-sql-engine/core/src/main/scala/org/apache/spark/util/ExecutorManager.scala b/native-sql-engine/core/src/main/scala/org/apache/spark/util/ExecutorManager.scala index 4b40519a8..7c088205c 100644 --- a/native-sql-engine/core/src/main/scala/org/apache/spark/util/ExecutorManager.scala +++ b/native-sql-engine/core/src/main/scala/org/apache/spark/util/ExecutorManager.scala @@ -16,21 +16,19 @@ */ package org.apache.spark.util -import org.apache.spark.{SparkConf, SparkContext, SparkEnv} -import java.io.File -import java.nio.file.Files -import java.nio.file.LinkOption -import java.nio.file.Path -import java.nio.file.Paths -import java.io.{InputStreamReader, BufferedReader} -import scala.collection.mutable.ListBuffer import java.lang.management.ManagementFactory + +import scala.util.Random + import com.intel.oap._ -object ExecutorManager { +import org.apache.spark.{SparkContext, SparkEnv} +import org.apache.spark.internal.Logging + +object ExecutorManager extends Logging { def getExecutorIds(sc: SparkContext): Seq[String] = sc.getExecutorIds var isTaskSet: Boolean = false - def tryTaskSet(numaInfo: GazelleNumaBindingInfo) = synchronized { + def tryTaskSet(numaInfo: GazelleNumaBindingInfo): Unit = synchronized { if (numaInfo.enableNumaBinding && !isTaskSet) { val cmd_output = Utils.executeAndGetOutput( @@ -41,19 +39,16 @@ object ExecutorManager { tmp.toList.distinct } val executorId = SparkEnv.get.executorId - val numCorePerExecutor = numaInfo.numCoresPerExecutor val coreRange = numaInfo.totalCoreRange val shouldBindNumaIdx = executorIdOnLocalNode.indexOf(executorId) % coreRange.size - //val coreStartIdx = coreRange(shouldBindNumaIdx)._1 - //val coreEndIdx = coreRange(shouldBindNumaIdx)._2 - System.out.println( - s"executorId is ${executorId}, executorIdOnLocalNode is ${executorIdOnLocalNode}") + logInfo(s"executorId is $executorId, executorIdOnLocalNode is $executorIdOnLocalNode") val taskSetCmd = s"taskset -cpa ${coreRange(shouldBindNumaIdx)} ${getProcessId()}" - System.out.println(taskSetCmd) + logInfo(s"taskSetCmd is $taskSetCmd") isTaskSet = true Utils.executeCommand(Seq("bash", "-c", taskSetCmd)) } } + def getProcessId(): Int = { val runtimeMXBean = ManagementFactory.getRuntimeMXBean() runtimeMXBean.getName().split("@")(0).toInt