Skip to content

Commit

Permalink
Merge pull request alteryx#191 from hsaputra/removesemicolonscala
Browse files Browse the repository at this point in the history
Cleanup to remove semicolons (;) from Scala code

-) The main reason for this PR is to remove semicolons from single statements of Scala code.
-) Remove unused imports as I see them
-) Fix ASF comment header from some of files (bad copy paste I suppose)
(cherry picked from commit 4b89501)

Conflicts:

	examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala

Squash into 191
  • Loading branch information
mateiz authored and pwendell committed Dec 7, 2013
1 parent 2b76315 commit 20d1f8b
Show file tree
Hide file tree
Showing 19 changed files with 64 additions and 68 deletions.
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one or more
* * contributor license agreements. See the NOTICE file distributed with
* * this work for additional information regarding copyright ownership.
* * The ASF licenses this file to You under the Apache License, Version 2.0
* * (the "License"); you may not use this file except in compliance with
* * the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private[spark] object ShuffleCopier extends Logging {
extends FileClientHandler with Logging {

override def handle(ctx: ChannelHandlerContext, in: ByteBuf, header: FileHeader) {
logDebug("Received Block: " + header.blockId + " (" + header.fileLen + "B)");
logDebug("Received Block: " + header.blockId + " (" + header.fileLen + "B)")
resultCollectCallBack(header.blockId, header.fileLen.toLong, in.readBytes(header.fileLen))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest](
override def compute(split: Partition, context: TaskContext) = {
val currSplit = split.asInstanceOf[CartesianPartition]
for (x <- rdd1.iterator(currSplit.s1, context);
y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
}

override def getDependencies: Seq[Dependency[_]] = List(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
summary ++
<h4>Summary Metrics for {numCompleted} Completed Tasks</h4> ++
<div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++
<h4>Tasks</h4> ++ taskTable;
<h4>Tasks</h4> ++ taskTable

headerSparkPage(content, parent.sc, "Details for Stage %d".format(stageId), Stages)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self
@transient var sc: SparkContext = _

override def beforeAll() {
InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory())
super.beforeAll()
}

Expand Down
10 changes: 5 additions & 5 deletions core/src/test/scala/org/apache/spark/PartitioningSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,11 @@ class PartitioningSuite extends FunSuite with SharedSparkContext {
.filter(_ >= 0.0)

// Run the partitions, including the consecutive empty ones, through StatCounter
val stats: StatCounter = rdd.stats();
assert(abs(6.0 - stats.sum) < 0.01);
assert(abs(6.0/2 - rdd.mean) < 0.01);
assert(abs(1.0 - rdd.variance) < 0.01);
assert(abs(1.0 - rdd.stdev) < 0.01);
val stats: StatCounter = rdd.stats()
assert(abs(6.0 - stats.sum) < 0.01)
assert(abs(6.0/2 - rdd.mean) < 0.01)
assert(abs(1.0 - rdd.variance) < 0.01)
assert(abs(1.0 - rdd.stdev) < 0.01)

// Add other tests here for classes that should be able to handle empty partitions correctly
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ object LocalALS {
System.exit(1)
}
}
printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS);
printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS)

val R = generateR()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ object SparkTC {
oldCount = nextCount
// Perform the join, obtaining an RDD of (y, (z, x)) pairs,
// then project the result to obtain the new (x, z) paths.
tc = tc.union(tc.join(edges).map(x => (x._2._2, x._2._1))).distinct().cache();
tc = tc.union(tc.join(edges).map(x => (x._2._2, x._2._1))).distinct().cache()
nextCount = tc.count()
} while (nextCount != oldCount)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ object FeederActor {

println("Feeder started as:" + feeder)

actorSystem.awaitTermination();
actorSystem.awaitTermination()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
fs.delete(file, false)
fs.rename(writeFile, file)

val finishTime = System.currentTimeMillis();
val finishTime = System.currentTimeMillis()
logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file +
"', took " + bytes.length + " bytes and " + (finishTime - startTime) + " milliseconds")
return
Expand Down Expand Up @@ -122,7 +122,9 @@ class CheckpointWriter(checkpointDir: String) extends Logging {

def stop() {
synchronized {
if (stopped) return ;
if (stopped) {
return
}
stopped = true
}
executor.shutdown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.streaming.api.java

import java.lang.{Long => JLong, Integer => JInt}
import java.lang.{Integer => JInt}
import java.io.InputStream
import java.util.{Map => JMap, List => JList}

Expand All @@ -33,10 +33,9 @@ import twitter4j.auth.Authorization
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike, JavaSparkContext, JavaRDD}
import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaRDD}
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receivers.{ActorReceiver, ReceiverSupervisorStrategy}

/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
Expand Down Expand Up @@ -302,7 +301,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
implicit val cmf: ClassManifest[F] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[F]]
ssc.fileStream[K, V, F](directory);
ssc.fileStream[K, V, F](directory)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ class FlumeReceiver(

protected override def onStart() {
val responder = new SpecificResponder(
classOf[AvroSourceProtocol], new FlumeEventServer(this));
val server = new NettyServer(responder, new InetSocketAddress(host, port));
classOf[AvroSourceProtocol], new FlumeEventServer(this))
val server = new NettyServer(responder, new InetSocketAddress(host, port))
blockGenerator.start()
server.start()
logInfo("Flume receiver started")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq(1, 2, 3, 4, 5)
Thread.sleep(1000)
val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort));
val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
val client = SpecificRequestor.getClient(
classOf[AvroSourceProtocol], transceiver);
classOf[AvroSourceProtocol], transceiver)

for (i <- 0 until input.size) {
val event = new AvroFlumeEvent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {

Thread.sleep(500) // Give some time for the forgetting old RDDs to complete
} catch {
case e: Exception => e.printStackTrace(); throw e;
case e: Exception => {e.printStackTrace(); throw e}
} finally {
ssc.stop()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.IOException
import java.net.Socket
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.net.NetUtils
Expand All @@ -33,6 +34,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import org.apache.spark.{SparkContext, Logging}
import org.apache.spark.util.Utils

import scala.collection.JavaConversions._

class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
Expand Down Expand Up @@ -65,7 +67,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)

appAttemptId = getApplicationAttemptId()
isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts;
isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts
resourceManager = registerWithResourceManager()

// Workaround until hadoop moves to something which has
Expand Down Expand Up @@ -195,7 +197,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
successed = true
} finally {
logDebug("finishing main")
isLastAMRetry = true;
isLastAMRetry = true
if (successed) {
ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
} else {
Expand Down
29 changes: 13 additions & 16 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@

package org.apache.spark.deploy.yarn

import java.net.{InetAddress, InetSocketAddress, UnknownHostException, URI}
import java.net.{InetAddress, UnknownHostException, URI}
import java.nio.ByteBuffer

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path, FileUtil}
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.mapred.Master
import org.apache.hadoop.net.NetUtils
import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api._
Expand All @@ -40,9 +39,7 @@ import scala.collection.mutable.HashMap
import scala.collection.mutable.Map
import scala.collection.JavaConversions._

import org.apache.spark.Logging
import org.apache.spark.util.Utils
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.Logging

class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging {

Expand Down Expand Up @@ -123,7 +120,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl

// if we have requested more then the clusters max for a single resource then exit.
if (args.workerMemory > maxMem) {
logError("the worker size is to large to run on this cluster " + args.workerMemory);
logError("the worker size is to large to run on this cluster " + args.workerMemory)
System.exit(1)
}
val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
Expand Down Expand Up @@ -160,8 +157,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
var dstHost = dstUri.getHost()
if ((srcHost != null) && (dstHost != null)) {
try {
srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
srcHost = InetAddress.getByName(srcHost).getCanonicalHostName()
dstHost = InetAddress.getByName(dstHost).getCanonicalHostName()
} catch {
case e: UnknownHostException =>
return false
Expand All @@ -178,7 +175,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
if (srcUri.getPort() != dstUri.getPort()) {
return false
}
return true;
return true
}

/**
Expand All @@ -190,13 +187,13 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
replication: Short,
setPerms: Boolean = false): Path = {
val fs = FileSystem.get(conf)
val remoteFs = originalPath.getFileSystem(conf);
val remoteFs = originalPath.getFileSystem(conf)
var newPath = originalPath
if (! compareFs(remoteFs, fs)) {
newPath = new Path(dstDir, originalPath.getName())
logInfo("Uploading " + originalPath + " to " + newPath)
FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf);
fs.setReplication(newPath, replication);
FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf)
fs.setReplication(newPath, replication)
if (setPerms) fs.setPermission(newPath, new FsPermission(APP_FILE_PERMISSION))
}
// resolve any symlinks in the URI path so using a "current" symlink
Expand All @@ -214,7 +211,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
// Add them as local resources to the AM
val fs = FileSystem.get(conf)

val delegTokenRenewer = Master.getMasterPrincipal(conf);
val delegTokenRenewer = Master.getMasterPrincipal(conf)
if (UserGroupInformation.isSecurityEnabled()) {
if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
logError("Can't get Master Kerberos principal for use as renewer")
Expand All @@ -226,7 +223,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl

if (UserGroupInformation.isSecurityEnabled()) {
val dstFs = dst.getFileSystem(conf)
dstFs.addDelegationTokens(delegTokenRenewer, credentials);
dstFs.addDelegationTokens(delegTokenRenewer, credentials)
}
val localResources = HashMap[String, LocalResource]()
FileSystem.mkdirs(fs, dst, new FsPermission(STAGING_DIR_PERMISSION))
Expand Down Expand Up @@ -286,7 +283,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}
}

UserGroupInformation.getCurrentUser().addCredentials(credentials);
UserGroupInformation.getCurrentUser().addCredentials(credentials)
return localResources
}

Expand Down Expand Up @@ -366,7 +363,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}

// Command for the ApplicationMaster
var javaCommand = "java";
var javaCommand = "java"
val javaHome = System.getenv("JAVA_HOME")
if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) {
javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,11 +197,11 @@ class ClientDistributedCacheManager() extends Logging {
*/
def checkPermissionOfOther(fs: FileSystem, path: Path,
action: FsAction, statCache: Map[URI, FileStatus]): Boolean = {
val status = getFileStatus(fs, path.toUri(), statCache);
val status = getFileStatus(fs, path.toUri(), statCache)
val perms = status.getPermission()
val otherAction = perms.getOtherAction()
if (otherAction.implies(action)) {
return true;
return true
}
return false
}
Expand Down
Loading

0 comments on commit 20d1f8b

Please sign in to comment.