diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index f320f09f58702..de65ef23ad1ce 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -17,8 +17,6 @@ package org.apache.spark.deploy.yarn -import com.google.common.util.concurrent.ThreadFactoryBuilder - import java.util.concurrent._ import java.util.concurrent.atomic.AtomicInteger import java.util.regex.Pattern @@ -26,6 +24,8 @@ import java.util.regex.Pattern import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import com.google.common.util.concurrent.ThreadFactoryBuilder + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse @@ -53,7 +53,6 @@ object AllocationType extends Enumeration { // Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for // more info on how we are requesting for containers. - /** * Acquires resources for executors from a ResourceManager and launches executors in new containers. */ @@ -505,8 +504,7 @@ private[yarn] class YarnAllocator( amClient.allocate(progressIndicator) } - private def createRackResourceRequests( - hostContainers: ArrayBuffer[ContainerRequest]) + private def createRackResourceRequests(hostContainers: ArrayBuffer[ContainerRequest]) : ArrayBuffer[ContainerRequest] = { // Generate modified racks and new set of hosts under it before issuing requests. val rackToCounts = new HashMap[String, Int]() @@ -602,8 +600,7 @@ private[yarn] class YarnAllocator( requestType: AllocationType.AllocationType, resource: String, numExecutors: Int, - priority: Int) - : ArrayBuffer[ContainerRequest] = { + priority: Int): ArrayBuffer[ContainerRequest] = { // If hostname is specified, then we need at least two requests - node local and rack local. // There must be a third request, which is ANY. That will be specially handled. requestType match { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala index cef065c22efe2..bf4e15908bb46 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala @@ -19,6 +19,10 @@ package org.apache.spark.deploy.yarn import java.util.{List => JList} +import scala.collection.JavaConversions._ +import scala.collection.{Map, Set} +import scala.util.Try + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.yarn.api.ApplicationConstants import org.apache.hadoop.yarn.api.records._ @@ -32,10 +36,6 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.scheduler.SplitInfo import org.apache.spark.util.Utils -import scala.collection.JavaConversions._ -import scala.collection.{Map, Set} -import scala.util.Try - /** * Handles registering and unregistering the application with the YARN ResourceManager. */ @@ -120,7 +120,7 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg } /** Returns the maximum number of attempts to register the AM. */ - def getMaxRegAttempts(conf: YarnConfiguration) = + def getMaxRegAttempts(conf: YarnConfiguration): Int = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS) }