From d8fb40edea7c8c811814f1ff288d59178928964b Mon Sep 17 00:00:00 2001 From: Saisai Shao Date: Mon, 2 Mar 2015 08:49:19 +0000 Subject: [PATCH 1/3] [Streaming][Minor]Fix some error docs in streaming examples Small changes, please help to review, thanks a lot. Author: Saisai Shao Closes #4837 from jerryshao/doc-fix and squashes the following commits: 545291a [Saisai Shao] Fix some error docs in streaming examples --- .../apache/spark/examples/streaming/DirectKafkaWordCount.scala | 3 ++- examples/src/main/python/streaming/kafka_wordcount.py | 2 +- python/pyspark/streaming/kafka.py | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala index deb08fd57b8c7..1c8a20bf8f1ae 100644 --- a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala +++ b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala @@ -30,7 +30,8 @@ import org.apache.spark.SparkConf * is a list of one or more kafka topics to consume from * * Example: - * $ bin/run-example streaming.KafkaWordCount broker1-host:port,broker2-host:port topic1,topic2 + * $ bin/run-example streaming.DirectKafkaWordCount broker1-host:port,broker2-host:port \ + * topic1,topic2 */ object DirectKafkaWordCount { def main(args: Array[String]) { diff --git a/examples/src/main/python/streaming/kafka_wordcount.py b/examples/src/main/python/streaming/kafka_wordcount.py index ed398a82b8bb0..f82f161cb8844 100644 --- a/examples/src/main/python/streaming/kafka_wordcount.py +++ b/examples/src/main/python/streaming/kafka_wordcount.py @@ -23,7 +23,7 @@ http://kafka.apache.org/documentation.html#quickstart and then run the example - `$ bin/spark-submit --driver-class-path external/kafka-assembly/target/scala-*/\ + `$ bin/spark-submit --jars external/kafka-assembly/target/scala-*/\ spark-streaming-kafka-assembly-*.jar examples/src/main/python/streaming/kafka_wordcount.py \ localhost:2181 test` """ diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py index 0002dc10e8a17..f083ed149effb 100644 --- a/python/pyspark/streaming/kafka.py +++ b/python/pyspark/streaming/kafka.py @@ -82,7 +82,7 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={}, 2. Download the JAR of the artifact from Maven Central http://search.maven.org/, Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-assembly, Version = %s. - Then, innclude the jar in the spark-submit command as + Then, include the jar in the spark-submit command as $ bin/spark-submit --jars ... From 948c2390ab004ad5cf3876d87c05d3e43a9aa9e0 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Mon, 2 Mar 2015 08:51:03 +0000 Subject: [PATCH 2/3] SPARK-3357 [CORE] Internal log messages should be set at DEBUG level instead of INFO Demote some 'noisy' log messages to debug level. I added a few more, to include everything that gets logged in stanzas like this: ``` 15/03/01 00:03:54 INFO BlockManager: Removing broadcast 0 15/03/01 00:03:54 INFO BlockManager: Removing block broadcast_0_piece0 15/03/01 00:03:54 INFO MemoryStore: Block broadcast_0_piece0 of size 839 dropped from memory (free 277976091) 15/03/01 00:03:54 INFO BlockManagerInfo: Removed broadcast_0_piece0 on localhost:49524 in memory (size: 839.0 B, free: 265.1 MB) 15/03/01 00:03:54 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/03/01 00:03:54 INFO BlockManager: Removing block broadcast_0 15/03/01 00:03:54 INFO MemoryStore: Block broadcast_0 of size 1088 dropped from memory (free 277977179) 15/03/01 00:03:54 INFO ContextCleaner: Cleaned broadcast 0 ``` as well as regular messages like ``` 15/03/01 00:02:33 INFO MemoryStore: ensureFreeSpace(2640) called with curMem=47322, maxMem=278019440 ``` WDYT? good or should some be left alone? CC mengxr who suggested some of this. Author: Sean Owen Closes #4838 from srowen/SPARK-3357 and squashes the following commits: dce75c1 [Sean Owen] Back out some debug level changes d9b784d [Sean Owen] Demote some 'noisy' log messages to debug level --- core/src/main/scala/org/apache/spark/ContextCleaner.scala | 4 ++-- .../main/scala/org/apache/spark/storage/BlockManager.scala | 4 ++-- .../scala/org/apache/spark/storage/BlockManagerMaster.scala | 2 +- .../src/main/scala/org/apache/spark/storage/MemoryStore.scala | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index 434f1e47cf822..4a9d007353373 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -188,10 +188,10 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { /** Perform broadcast cleanup. */ def doCleanupBroadcast(broadcastId: Long, blocking: Boolean) { try { - logDebug("Cleaning broadcast " + broadcastId) + logDebug(s"Cleaning broadcast $broadcastId") broadcastManager.unbroadcast(broadcastId, true, blocking) listeners.foreach(_.broadcastCleaned(broadcastId)) - logInfo("Cleaned broadcast " + broadcastId) + logDebug(s"Cleaned broadcast $broadcastId") } catch { case e: Exception => logError("Error cleaning broadcast " + broadcastId, e) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 86dbd89f0ffb8..c8b7763f03fb7 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1074,7 +1074,7 @@ private[spark] class BlockManager( * Remove all blocks belonging to the given broadcast. */ def removeBroadcast(broadcastId: Long, tellMaster: Boolean): Int = { - logInfo(s"Removing broadcast $broadcastId") + logDebug(s"Removing broadcast $broadcastId") val blocksToRemove = blockInfo.keys.collect { case bid @ BroadcastBlockId(`broadcastId`, _) => bid } @@ -1086,7 +1086,7 @@ private[spark] class BlockManager( * Remove a block from both memory and disk. */ def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = { - logInfo(s"Removing block $blockId") + logDebug(s"Removing block $blockId") val info = blockInfo.get(blockId).orNull if (info != null) { info.synchronized { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index b63c7f191155c..654796f23c96e 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -61,7 +61,7 @@ class BlockManagerMaster( tachyonSize: Long): Boolean = { val res = askDriverWithReply[Boolean]( UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize, tachyonSize)) - logInfo("Updated info of block " + blockId) + logDebug(s"Updated info of block $blockId") res } diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 71305a46bf570..7f4b6e8bd3683 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -184,7 +184,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) val entry = entries.remove(blockId) if (entry != null) { currentMemory -= entry.size - logInfo(s"Block $blockId of size ${entry.size} dropped from memory (free $freeMemory)") + logDebug(s"Block $blockId of size ${entry.size} dropped from memory (free $freeMemory)") true } else { false From 49c7a8f6f33d64d7e6c35391f83121440844a41d Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Mon, 2 Mar 2015 09:06:56 +0000 Subject: [PATCH 3/3] [SPARK-6103][Graphx]remove unused class to import in EdgeRDDImpl Class TaskContext is unused in EdgeRDDImpl, so we need to remove it from import list. Author: Lianhui Wang Closes #4846 from lianhuiwang/SPARK-6103 and squashes the following commits: 31aed64 [Lianhui Wang] remove unused class to import in EdgeRDDImpl --- .../main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala index 56cb41661e300..43a3aea0f6196 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala @@ -19,7 +19,7 @@ package org.apache.spark.graphx.impl import scala.reflect.{classTag, ClassTag} -import org.apache.spark.{OneToOneDependency, HashPartitioner, TaskContext} +import org.apache.spark.{OneToOneDependency, HashPartitioner} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel