From 28aa56dbc6f7d07c86aa5c9095c6cd9c43d99e8f Mon Sep 17 00:00:00 2001 From: giwa Date: Mon, 4 Aug 2014 09:47:48 -0700 Subject: [PATCH] WIP --- .../main/python/streaming/test_oprations.py | 24 +++++++++++++++++++ python/pyspark/streaming/dstream.py | 1 - .../streaming/api/python/PythonDStream.scala | 3 ++- 3 files changed, 26 insertions(+), 2 deletions(-) create mode 100644 examples/src/main/python/streaming/test_oprations.py diff --git a/examples/src/main/python/streaming/test_oprations.py b/examples/src/main/python/streaming/test_oprations.py new file mode 100644 index 0000000000000..cb338ced5f228 --- /dev/null +++ b/examples/src/main/python/streaming/test_oprations.py @@ -0,0 +1,24 @@ +import sys +from operator import add + +from pyspark.conf import SparkConf +from pyspark.streaming.context import StreamingContext +from pyspark.streaming.duration import * + +if __name__ == "__main__": + if len(sys.argv) != 3: + print >> sys.stderr, "Usage: wordcount " + exit(-1) + conf = SparkConf() + conf.setAppName("PythonStreamingNetworkWordCount") + ssc = StreamingContext(conf=conf, duration=Seconds(1)) + + lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) + words = lines.flatMap(lambda line: line.split(" ")) + mapped_words = words.map(lambda word: (word, 1)) + count = mapped_words.reduceByKey(add) + + count.pyprint() + ssc.start() +# ssc.awaitTermination() + ssc.stop() diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 7233ae5249e6d..c5452b952cac4 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -120,7 +120,6 @@ def _mergeCombiners(iterator): combiners[k] = v else: combiners[k] = mergeCombiners(combiners[k], v) - return combiners.iteritems() return shuffled._mapPartitions(_mergeCombiners) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index 751b7504f1cea..59ac8ffa7924b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -124,4 +124,5 @@ class PythonTransformedDStream( val asJavaDStream = JavaDStream.fromDStream(this) } -*/ \ No newline at end of file +*/ +