From 0375a413b8a009f5820897691570a1273ee25b97 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=AE=B8=E9=B9=8F?= Date: Thu, 26 Feb 2015 23:05:56 -0800 Subject: [PATCH 1/6] fix spark-6033, clarify the spark.worker.cleanup behavior in standalone mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit jira case spark-6033 https://issues.apache.org/jira/browse/SPARK-6033 In standalone deploy mode, the cleanup will only remove the stopped application's directories. The original description about the cleanup behavior is incorrect. Author: 许鹏 Closes #4803 from hseagle/spark-6033 and squashes the following commits: 927a6a0 [许鹏] fix the incorrect description about the spark.worker.cleanup in standalone mode --- docs/spark-standalone.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index 5c6084fb46255..74d8653a8b845 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -222,8 +222,7 @@ SPARK_WORKER_OPTS supports the following system properties: false Enable periodic cleanup of worker / application directories. Note that this only affects standalone - mode, as YARN works differently. Applications directories are cleaned up regardless of whether - the application is still running. + mode, as YARN works differently. Only the directories of stopped applications are cleaned up. From 8cd1692c9092150107bed27777951633cbf945f6 Mon Sep 17 00:00:00 2001 From: "Zhang, Liye" Date: Thu, 26 Feb 2015 23:11:43 -0800 Subject: [PATCH 2/6] [SPARK-6036][CORE] avoid race condition between eventlogListener and akka actor system For detail description, pls refer to [SPARK-6036](https://issues.apache.org/jira/browse/SPARK-6036). Author: Zhang, Liye Closes #4785 from liyezhang556520/EventLogInProcess and squashes the following commits: 8b0b0a6 [Zhang, Liye] stop listener after DAGScheduler 79b15b3 [Zhang, Liye] SPARK-6036 avoid race condition between eventlogListener and akka actor system --- core/src/main/scala/org/apache/spark/SparkContext.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d3948d4e6d91b..3cd0c218a36fd 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1389,17 +1389,17 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli stopped = true env.metricsSystem.report() metadataCleaner.cancel() - env.actorSystem.stop(heartbeatReceiver) cleaner.foreach(_.stop()) dagScheduler.stop() dagScheduler = null + listenerBus.stop() + eventLogger.foreach(_.stop()) + env.actorSystem.stop(heartbeatReceiver) progressBar.foreach(_.stop()) taskScheduler = null // TODO: Cache.stop()? env.stop() SparkEnv.set(null) - listenerBus.stop() - eventLogger.foreach(_.stop()) logInfo("Successfully stopped SparkContext") SparkContext.clearActiveContext() } else { From e747e98490f8ede23b0a9e0795e7445d0b597624 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Fri, 27 Feb 2015 13:31:46 +0000 Subject: [PATCH 3/6] [SPARK-6058][Yarn] Log the user class exception in ApplicationMaster Because ApplicationMaster doesn't set SparkUncaughtExceptionHandler, the exception in the user class won't be logged. This PR added a `logError` for it. Author: zsxwing Closes #4813 from zsxwing/SPARK-6058 and squashes the following commits: 806c932 [zsxwing] Log the user class exception --- .../scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 20fc19166ac4e..796422bc25b59 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -486,11 +486,10 @@ private[spark] class ApplicationMaster( case _: InterruptedException => // Reporter thread can interrupt to stop user class case cause: Throwable => + logError("User class threw exception: " + cause.getMessage, cause) finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_EXCEPTION_USER_CLASS, "User class threw exception: " + cause.getMessage) - // re-throw to get it logged - throw cause } } } From 57566d0af3008982a1e24a763ed2f6a700b40f8f Mon Sep 17 00:00:00 2001 From: zsxwing Date: Fri, 27 Feb 2015 13:33:39 +0000 Subject: [PATCH 4/6] [SPARK-6059][Yarn] Add volatile to ApplicationMaster's reporterThread and allocator `ApplicationMaster.reporterThread` and `ApplicationMaster.allocator` are accessed in multiple threads, so they should be marked as `volatile`. Author: zsxwing Closes #4814 from zsxwing/SPARK-6059 and squashes the following commits: 17d9386 [zsxwing] Add volatile to ApplicationMaster's reporterThread and allocator --- .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 796422bc25b59..e966bfba7bb7d 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -68,8 +68,8 @@ private[spark] class ApplicationMaster( @volatile private var finalMsg: String = "" @volatile private var userClassThread: Thread = _ - private var reporterThread: Thread = _ - private var allocator: YarnAllocator = _ + @volatile private var reporterThread: Thread = _ + @volatile private var allocator: YarnAllocator = _ // Fields used in client mode. private var actorSystem: ActorSystem = null From d17cb2ba33b363dd346ac5a5681e1757decd0f4d Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Fri, 27 Feb 2015 13:00:36 -0800 Subject: [PATCH 5/6] [SPARK-4587] [mllib] [docs] Fixed save,load calls in ML guide examples Should pass spark context to save/load CC: mengxr Author: Joseph K. Bradley Closes #4816 from jkbradley/ml-io-doc-fix and squashes the following commits: 83d369d [Joseph K. Bradley] added comment to save,load parts of ML guide examples 2841170 [Joseph K. Bradley] Fixed save,load calls in ML guide examples --- docs/mllib-collaborative-filtering.md | 10 ++++--- docs/mllib-decision-tree.md | 20 ++++++++------ docs/mllib-ensembles.md | 40 ++++++++++++++++----------- docs/mllib-linear-methods.md | 20 ++++++++------ docs/mllib-naive-bayes.md | 10 ++++--- 5 files changed, 60 insertions(+), 40 deletions(-) diff --git a/docs/mllib-collaborative-filtering.md b/docs/mllib-collaborative-filtering.md index 935cd8dad3b25..27aa4d38b7617 100644 --- a/docs/mllib-collaborative-filtering.md +++ b/docs/mllib-collaborative-filtering.md @@ -97,8 +97,9 @@ val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) => }.mean() println("Mean Squared Error = " + MSE) -model.save("myModelPath") -val sameModel = MatrixFactorizationModel.load("myModelPath") +// Save and load model +model.save(sc, "myModelPath") +val sameModel = MatrixFactorizationModel.load(sc, "myModelPath") {% endhighlight %} If the rating matrix is derived from another source of information (e.g., it is inferred from @@ -186,8 +187,9 @@ public class CollaborativeFiltering { ).rdd()).mean(); System.out.println("Mean Squared Error = " + MSE); - model.save("myModelPath"); - MatrixFactorizationModel sameModel = MatrixFactorizationModel.load("myModelPath"); + // Save and load model + model.save(sc.sc(), "myModelPath"); + MatrixFactorizationModel sameModel = MatrixFactorizationModel.load(sc.sc(), "myModelPath"); } } {% endhighlight %} diff --git a/docs/mllib-decision-tree.md b/docs/mllib-decision-tree.md index 4695d1cde4901..8e478ab035582 100644 --- a/docs/mllib-decision-tree.md +++ b/docs/mllib-decision-tree.md @@ -223,8 +223,9 @@ val testErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / testData. println("Test Error = " + testErr) println("Learned classification tree model:\n" + model.toDebugString) -model.save("myModelPath") -val sameModel = DecisionTreeModel.load("myModelPath") +// Save and load model +model.save(sc, "myModelPath") +val sameModel = DecisionTreeModel.load(sc, "myModelPath") {% endhighlight %} @@ -284,8 +285,9 @@ Double testErr = System.out.println("Test Error: " + testErr); System.out.println("Learned classification tree model:\n" + model.toDebugString()); -model.save("myModelPath"); -DecisionTreeModel sameModel = DecisionTreeModel.load("myModelPath"); +// Save and load model +model.save(sc.sc(), "myModelPath"); +DecisionTreeModel sameModel = DecisionTreeModel.load(sc.sc(), "myModelPath"); {% endhighlight %} @@ -362,8 +364,9 @@ val testMSE = labelsAndPredictions.map{ case(v, p) => math.pow((v - p), 2)}.mean println("Test Mean Squared Error = " + testMSE) println("Learned regression tree model:\n" + model.toDebugString) -model.save("myModelPath") -val sameModel = DecisionTreeModel.load("myModelPath") +// Save and load model +model.save(sc, "myModelPath") +val sameModel = DecisionTreeModel.load(sc, "myModelPath") {% endhighlight %} @@ -429,8 +432,9 @@ Double testMSE = System.out.println("Test Mean Squared Error: " + testMSE); System.out.println("Learned regression tree model:\n" + model.toDebugString()); -model.save("myModelPath"); -DecisionTreeModel sameModel = DecisionTreeModel.load("myModelPath"); +// Save and load model +model.save(sc.sc(), "myModelPath"); +DecisionTreeModel sameModel = DecisionTreeModel.load(sc.sc(), "myModelPath"); {% endhighlight %} diff --git a/docs/mllib-ensembles.md b/docs/mllib-ensembles.md index ddae84165f8a9..ec1ef38b453d3 100644 --- a/docs/mllib-ensembles.md +++ b/docs/mllib-ensembles.md @@ -129,8 +129,9 @@ val testErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / testData. println("Test Error = " + testErr) println("Learned classification forest model:\n" + model.toDebugString) -model.save("myModelPath") -val sameModel = RandomForestModel.load("myModelPath") +// Save and load model +model.save(sc, "myModelPath") +val sameModel = RandomForestModel.load(sc, "myModelPath") {% endhighlight %} @@ -193,8 +194,9 @@ Double testErr = System.out.println("Test Error: " + testErr); System.out.println("Learned classification forest model:\n" + model.toDebugString()); -model.save("myModelPath"); -RandomForestModel sameModel = RandomForestModel.load("myModelPath"); +// Save and load model +model.save(sc.sc(), "myModelPath"); +RandomForestModel sameModel = RandomForestModel.load(sc.sc(), "myModelPath"); {% endhighlight %} @@ -276,8 +278,9 @@ val testMSE = labelsAndPredictions.map{ case(v, p) => math.pow((v - p), 2)}.mean println("Test Mean Squared Error = " + testMSE) println("Learned regression forest model:\n" + model.toDebugString) -model.save("myModelPath") -val sameModel = RandomForestModel.load("myModelPath") +// Save and load model +model.save(sc, "myModelPath") +val sameModel = RandomForestModel.load(sc, "myModelPath") {% endhighlight %} @@ -343,8 +346,9 @@ Double testMSE = System.out.println("Test Mean Squared Error: " + testMSE); System.out.println("Learned regression forest model:\n" + model.toDebugString()); -model.save("myModelPath"); -RandomForestModel sameModel = RandomForestModel.load("myModelPath"); +// Save and load model +model.save(sc.sc(), "myModelPath"); +RandomForestModel sameModel = RandomForestModel.load(sc.sc(), "myModelPath"); {% endhighlight %} @@ -504,8 +508,9 @@ val testErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / testData. println("Test Error = " + testErr) println("Learned classification GBT model:\n" + model.toDebugString) -model.save("myModelPath") -val sameModel = GradientBoostedTreesModel.load("myModelPath") +// Save and load model +model.save(sc, "myModelPath") +val sameModel = GradientBoostedTreesModel.load(sc, "myModelPath") {% endhighlight %} @@ -568,8 +573,9 @@ Double testErr = System.out.println("Test Error: " + testErr); System.out.println("Learned classification GBT model:\n" + model.toDebugString()); -model.save("myModelPath"); -GradientBoostedTreesModel sameModel = GradientBoostedTreesModel.load("myModelPath"); +// Save and load model +model.save(sc.sc(), "myModelPath"); +GradientBoostedTreesModel sameModel = GradientBoostedTreesModel.load(sc.sc(), "myModelPath"); {% endhighlight %} @@ -647,8 +653,9 @@ val testMSE = labelsAndPredictions.map{ case(v, p) => math.pow((v - p), 2)}.mean println("Test Mean Squared Error = " + testMSE) println("Learned regression GBT model:\n" + model.toDebugString) -model.save("myModelPath") -val sameModel = GradientBoostedTreesModel.load("myModelPath") +// Save and load model +model.save(sc, "myModelPath") +val sameModel = GradientBoostedTreesModel.load(sc, "myModelPath") {% endhighlight %} @@ -717,8 +724,9 @@ Double testMSE = System.out.println("Test Mean Squared Error: " + testMSE); System.out.println("Learned regression GBT model:\n" + model.toDebugString()); -model.save("myModelPath"); -GradientBoostedTreesModel sameModel = GradientBoostedTreesModel.load("myModelPath"); +// Save and load model +model.save(sc.sc(), "myModelPath"); +GradientBoostedTreesModel sameModel = GradientBoostedTreesModel.load(sc.sc(), "myModelPath"); {% endhighlight %} diff --git a/docs/mllib-linear-methods.md b/docs/mllib-linear-methods.md index d9fc63b37d116..ffbd7ef1bff51 100644 --- a/docs/mllib-linear-methods.md +++ b/docs/mllib-linear-methods.md @@ -223,8 +223,9 @@ val auROC = metrics.areaUnderROC() println("Area under ROC = " + auROC) -model.save("myModelPath") -val sameModel = SVMModel.load("myModelPath") +// Save and load model +model.save(sc, "myModelPath") +val sameModel = SVMModel.load(sc, "myModelPath") {% endhighlight %} The `SVMWithSGD.train()` method by default performs L2 regularization with the @@ -308,8 +309,9 @@ public class SVMClassifier { System.out.println("Area under ROC = " + auROC); - model.save("myModelPath"); - SVMModel sameModel = SVMModel.load("myModelPath"); + // Save and load model + model.save(sc.sc(), "myModelPath"); + SVMModel sameModel = SVMModel.load(sc.sc(), "myModelPath"); } } {% endhighlight %} @@ -423,8 +425,9 @@ val valuesAndPreds = parsedData.map { point => val MSE = valuesAndPreds.map{case(v, p) => math.pow((v - p), 2)}.mean() println("training Mean Squared Error = " + MSE) -model.save("myModelPath") -val sameModel = LinearRegressionModel.load("myModelPath") +// Save and load model +model.save(sc, "myModelPath") +val sameModel = LinearRegressionModel.load(sc, "myModelPath") {% endhighlight %} [`RidgeRegressionWithSGD`](api/scala/index.html#org.apache.spark.mllib.regression.RidgeRegressionWithSGD) @@ -496,8 +499,9 @@ public class LinearRegression { ).rdd()).mean(); System.out.println("training Mean Squared Error = " + MSE); - model.save("myModelPath"); - LinearRegressionModel sameModel = LinearRegressionModel.load("myModelPath"); + // Save and load model + model.save(sc.sc(), "myModelPath"); + LinearRegressionModel sameModel = LinearRegressionModel.load(sc.sc(), "myModelPath"); } } {% endhighlight %} diff --git a/docs/mllib-naive-bayes.md b/docs/mllib-naive-bayes.md index 81173255b590d..5224a0b49a991 100644 --- a/docs/mllib-naive-bayes.md +++ b/docs/mllib-naive-bayes.md @@ -56,8 +56,9 @@ val model = NaiveBayes.train(training, lambda = 1.0) val predictionAndLabel = test.map(p => (model.predict(p.features), p.label)) val accuracy = 1.0 * predictionAndLabel.filter(x => x._1 == x._2).count() / test.count() -model.save("myModelPath") -val sameModel = NaiveBayesModel.load("myModelPath") +// Save and load model +model.save(sc, "myModelPath") +val sameModel = NaiveBayesModel.load(sc, "myModelPath") {% endhighlight %} @@ -97,8 +98,9 @@ double accuracy = predictionAndLabel.filter(new Function, } }).count() / (double) test.count(); -model.save("myModelPath"); -NaiveBayesModel sameModel = NaiveBayesModel.load("myModelPath"); +// Save and load model +model.save(sc.sc(), "myModelPath"); +NaiveBayesModel sameModel = NaiveBayesModel.load(sc.sc(), "myModelPath"); {% endhighlight %} From 5f7f3b938e1776168be866fc9ee87dc7494696cc Mon Sep 17 00:00:00 2001 From: Saisai Shao Date: Fri, 27 Feb 2015 13:01:42 -0800 Subject: [PATCH 6/6] [Streaming][Minor] Remove useless type signature of Java Kafka direct stream API cc tdas . Author: Saisai Shao Closes #4817 from jerryshao/signature-minor-fix and squashes the following commits: eebfaac [Saisai Shao] Remove useless type parameter --- .../scala/org/apache/spark/streaming/kafka/KafkaUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index 62a659518943d..5a9bd4214cf51 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -512,7 +512,7 @@ object KafkaUtils { * @param topics Names of the topics to consume */ @Experimental - def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R]( + def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V]]( jssc: JavaStreamingContext, keyClass: Class[K], valueClass: Class[V],