diff --git a/bin/spark-submit b/bin/spark-submit index ec4e10787cff0..a297714c67da0 100755 --- a/bin/spark-submit +++ b/bin/spark-submit @@ -42,10 +42,9 @@ DEPLOY_MODE=${DEPLOY_MODE:-"client"} # This will be removed after pyprint is moved to PythonDStream. # Problem is that print function is in (Scala)DStream. # Whenever python code is executed, we call PythonDStream which passes -# pythonExec(which python Spark should execute). +# pythonExec(which python Spark should execute). pythonExec is used to call python. # Since pyprint is located in DStream, Spark does not know which python should use. # In that case, get python path from PYSPARK_PYTHON, environmental variable. -# This fix is ongoing in print branch in https://github.com/giwa/spark/tree/print. # Figure out which Python executable to use if [[ -z "$PYSPARK_PYTHON" ]]; then @@ -53,6 +52,7 @@ if [[ -z "$PYSPARK_PYTHON" ]]; then fi export PYSPARK_PYTHON + if [ -n "$DRIVER_MEMORY" ] && [ $DEPLOY_MODE == "client" ]; then export SPARK_DRIVER_MEMORY=$DRIVER_MEMORY fi diff --git a/examples/src/main/python/streaming/network_wordcount.py b/examples/src/main/python/streaming/network_wordcount.py index a1458e06f13d2..c6ededc24db21 100644 --- a/examples/src/main/python/streaming/network_wordcount.py +++ b/examples/src/main/python/streaming/network_wordcount.py @@ -11,20 +11,15 @@ exit(-1) conf = SparkConf() conf.setAppName("PythonStreamingNetworkWordCount") - conf.set("spark.default.parallelism", 1) ssc = StreamingContext(conf=conf, duration=Seconds(1)) lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) fm_lines = lines.flatMap(lambda x: x.split(" ")) - filtered_lines = fm_lines.filter(lambda line: "Spark" in line) mapped_lines = fm_lines.map(lambda x: (x, 1)) - reduced_lines = mapped_lines.reduce(add) - counted_lines = reduced_lines.count() + reduced_lines = mapped_lines.reduceByKey(add) fm_lines.pyprint() - filtered_lines.pyprint() mapped_lines.pyprint() reduced_lines.pyprint() - counted_lines.pyprint() ssc.start() ssc.awaitTermination() diff --git a/examples/src/main/python/streaming/wordcount.py b/examples/src/main/python/streaming/wordcount.py index 9ff8bc5ac9ab2..ee52c4e178142 100644 --- a/examples/src/main/python/streaming/wordcount.py +++ b/examples/src/main/python/streaming/wordcount.py @@ -21,7 +21,7 @@ fm_lines = lines.flatMap(lambda x: x.split(" ")) filtered_lines = fm_lines.filter(lambda line: "Spark" in line) mapped_lines = fm_lines.map(lambda x: (x, 1)) - reduced_lines = mapped_lines.reduce(add) + reduced_lines = mapped_lines.reduceByKey(add) fm_lines.pyprint() filtered_lines.pyprint() diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index a640df7394bcf..08de8dbe9d542 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -22,13 +22,15 @@ def count(self): """ """ - #TODO make sure count implementation, thiis different from what pyspark does - return self._mapPartitions(lambda i: [sum(1 for _ in i)]).map(lambda x: (None, 1)) + pass + #TODO: make sure count implementation, thiis different from what pyspark does + #return self._mapPartitions(lambda i: [sum(1 for _ in i)]).map(lambda x: (None, 1)) def _sum(self): """ """ - return self._mapPartitions(lambda x: [sum(x)]).reduce(operator.add) + pass + #return self._mapPartitions(lambda x: [sum(x)]).reduce(operator.add) def print_(self): """ @@ -85,7 +87,6 @@ def _mapPartitionsWithIndex(self, f, preservesPartitioning=False): """ return PipelinedDStream(self, f, preservesPartitioning) - def reduceByKey(self, func, numPartitions=None): """ Merge the value for each key using an associative reduce function. @@ -121,7 +122,7 @@ def combineLocally(iterator): else: combiners[k] = mergeValue(combiners[k], v) return combiners.iteritems() - locally_combined = self.mapPartitions(combineLocally) + locally_combined = self._mapPartitions(combineLocally) shuffled = locally_combined.partitionBy(numPartitions) def _mergeCombiners(iterator): combiners = {} @@ -131,12 +132,11 @@ def _mergeCombiners(iterator): else: combiners[k] = mergeCombiners(combiners[k], v) return combiners.iteritems() - return shuffled.mapPartitions(_mergeCombiners) + return shuffled._mapPartitions(_mergeCombiners) def partitionBy(self, numPartitions, partitionFunc=None): """ Return a copy of the DStream partitioned using the specified partitioner. - """ if numPartitions is None: numPartitions = self.ctx._defaultReducePartitions() diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index fc7a2055025c1..f539bc9aa147d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -623,7 +623,7 @@ abstract class DStream[T: ClassTag] ( new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register() } -//TODO move pyprint to PythonDStream and executed by py4j call back function +//TODO: move pyprint to PythonDStream and executed by py4j call back function /** * Print the first ten elements of each PythonRDD generated in this PythonDStream. This is an output * operator, so this PythonDStream will be registered as an output stream and there materialized. @@ -647,6 +647,7 @@ abstract class DStream[T: ClassTag] ( // pythonExec should be passed from python. Move pyprint to PythonDStream val pythonExec = new ProcessBuilder().environment().get("PYSPARK_PYTHON") + val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME") // Call python script to deserialize and print result in stdout val pb = new ProcessBuilder(pythonExec, sparkHome + "/python/pyspark/streaming/pyprint.py", tempFile.getAbsolutePath)