From 02f618a3967c39656b772bfa6b384779bbbad1d7 Mon Sep 17 00:00:00 2001 From: Ken Takagiwa Date: Wed, 16 Jul 2014 16:23:08 -0700 Subject: [PATCH] initial commit for socketTextStream --- .../python/streaming/nerwork_wordcount.py | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 examples/src/main/python/streaming/nerwork_wordcount.py diff --git a/examples/src/main/python/streaming/nerwork_wordcount.py b/examples/src/main/python/streaming/nerwork_wordcount.py new file mode 100644 index 0000000000000..2e5048ccad213 --- /dev/null +++ b/examples/src/main/python/streaming/nerwork_wordcount.py @@ -0,0 +1,22 @@ +import sys +from operator import add + +from pyspark.streaming.context import StreamingContext +from pyspark.streaming.duration import * + +if __name__ == "__main__": + if len(sys.argv) != 3: + print >> sys.stderr, "Usage: wordcount " + exit(-1) + ssc = StreamingContext(appName="PythonStreamingNetworkWordCount", duration=Seconds(1)) + + lines = ssc.socketTextStream(sys.argv[1], sys.argv[2]) + fm_lines = lines.flatMap(lambda x: x.split(" ")) + filtered_lines = fm_lines.filter(lambda line: "Spark" in line) + mapped_lines = fm_lines.map(lambda x: (x, 1)) + + fm_lines.pyprint() + filtered_lines.pyprint() + mapped_lines.pyprint() + ssc.start() + ssc.awaitTermination()