Skip to content

Commit

Permalink
Address comments to fix code formats
Browse files Browse the repository at this point in the history
  • Loading branch information
colorant committed Jan 14, 2014
1 parent 161ab93 commit 4c22c55
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,15 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
appContext
}

def calculateAMMemory(newApp: GetNewApplicationResponse) :Int = {
val minResMemory: Int = newApp.getMinimumResourceCapability().getMemory()
def calculateAMMemory(newApp: GetNewApplicationResponse): Int = {
val minResMemory = newApp.getMinimumResourceCapability().getMemory()
val amMemory = ((args.amMemory / minResMemory) * minResMemory) +
((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) -
YarnAllocationHandler.MEMORY_OVERHEAD)
YarnAllocationHandler.MEMORY_OVERHEAD)
amMemory
}

def setupSecurityToken(amContainer :ContainerLaunchContext) = {
def setupSecurityToken(amContainer: ContainerLaunchContext) = {
// Setup security tokens.
val dob = new DataOutputBuffer()
credentials.writeTokenStorageToStream(dob)
Expand Down Expand Up @@ -154,7 +154,6 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
)

val state = report.getYarnApplicationState()
val dsStatus = report.getFinalApplicationStatus()
if (state == YarnApplicationState.FINISHED ||
state == YarnApplicationState.FAILED ||
state == YarnApplicationState.KILLED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ trait ClientBase extends Logging {
val args: ClientArguments
val conf: Configuration
val sparkConf: SparkConf

//var rpc: YarnRPC = YarnRPC.create(conf)
val yarnConf: YarnConfiguration
val credentials = UserGroupInformation.getCurrentUser().getCredentials()
private val SPARK_STAGING: String = ".sparkStaging"
Expand Down Expand Up @@ -140,9 +138,10 @@ trait ClientBase extends Logging {
}
//check for ports
if (srcUri.getPort() != dstUri.getPort()) {
return false
false
} else {
true
}
return true
}

/** Copy the file into HDFS if needed. */
Expand All @@ -169,7 +168,7 @@ trait ClientBase extends Logging {
destPath
}

def qualifyForLocal(localURI : URI): Path = {
def qualifyForLocal(localURI: URI): Path = {
var qualifiedURI = localURI
// If not specified assume these are in the local filesystem to keep behavior like Hadoop
if (qualifiedURI.getScheme() == null) {
Expand Down Expand Up @@ -296,9 +295,9 @@ trait ClientBase extends Logging {
retval.toString
}

def calculateAMMemory(newApp: GetNewApplicationResponse) :Int
def calculateAMMemory(newApp: GetNewApplicationResponse): Int

def setupSecurityToken(amContainer :ContainerLaunchContext)
def setupSecurityToken(amContainer: ContainerLaunchContext)

def createContainerLaunchContext(
newApp: GetNewApplicationResponse,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ trait WorkerRunnableUtil extends Logging {
val sparkConf: SparkConf
lazy val env = prepareEnvironment

def prepareCommand(masterAddress: String,
slaveId: String,
hostname: String,
workerMemory: Int,
workerCores: Int) = {
def prepareCommand(
masterAddress: String,
slaveId: String,
hostname: String,
workerMemory: Int,
workerCores: Int) = {
// Extra options for the JVM
var JAVA_OPTS = ""
// Set the JVM memory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,11 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue)
logInfo( """Queue info ... queueName: %s, queueCurrentCapacity: %s, queueMaxCapacity: %s,
queueApplicationCount = %s, queueChildQueueCount = %s""".format(
queueInfo.getQueueName,
queueInfo.getCurrentCapacity,
queueInfo.getMaximumCapacity,
queueInfo.getApplications.size,
queueInfo.getChildQueues.size))
queueInfo.getQueueName,
queueInfo.getCurrentCapacity,
queueInfo.getMaximumCapacity,
queueInfo.getApplications.size,
queueInfo.getChildQueues.size))
}

def calculateAMMemory(newApp: GetNewApplicationResponse) :Int = {
Expand All @@ -124,7 +124,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
args.amMemory
}

def setupSecurityToken(amContainer :ContainerLaunchContext) = {
def setupSecurityToken(amContainer: ContainerLaunchContext) = {
// Setup security tokens.
val dob = new DataOutputBuffer()
credentials.writeTokenStorageToStream(dob)
Expand Down Expand Up @@ -160,7 +160,6 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
)

val state = report.getYarnApplicationState()
val dsStatus = report.getFinalApplicationStatus()
if (state == YarnApplicationState.FINISHED ||
state == YarnApplicationState.FAILED ||
state == YarnApplicationState.KILLED) {
Expand Down

0 comments on commit 4c22c55

Please sign in to comment.