Skip to content

Commit

Permalink
added reducedByKey not working yet
Browse files Browse the repository at this point in the history
  • Loading branch information
Ken Takagiwa authored and giwa committed Aug 18, 2014
1 parent 6e0d9c7 commit 9af03f4
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 6 deletions.
10 changes: 9 additions & 1 deletion examples/src/main/python/streaming/wordcount.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,30 @@
import sys
from operator import add

from pyspark.conf import SparkConf
from pyspark.streaming.context import StreamingContext
from pyspark.streaming.duration import *

if __name__ == "__main__":
if len(sys.argv) != 2:
print >> sys.stderr, "Usage: wordcount <directory>"
exit(-1)
ssc = StreamingContext(appName="PythonStreamingWordCount", duration=Seconds(1))
conf = SparkConf()
conf.setAppName("PythonStreamingWordCount")
conf.set("spark.default.parallelism", 1)

# ssc = StreamingContext(appName="PythonStreamingWordCount", duration=Seconds(1))
ssc = StreamingContext(conf=conf, duration=Seconds(1))

lines = ssc.textFileStream(sys.argv[1])
fm_lines = lines.flatMap(lambda x: x.split(" "))
filtered_lines = fm_lines.filter(lambda line: "Spark" in line)
mapped_lines = fm_lines.map(lambda x: (x, 1))
reduced_lines = mapped_lines.reduce(add)

fm_lines.pyprint()
filtered_lines.pyprint()
mapped_lines.pyprint()
reduced_lines.pyprint()
ssc.start()
ssc.awaitTermination()
27 changes: 25 additions & 2 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

__all__ = ["DStream"]


class DStream(object):
def __init__(self, jdstream, ssc, jrdd_deserializer):
self._jdstream = jdstream
Expand Down Expand Up @@ -149,7 +150,7 @@ def _combineByKey(self, createCombiner, mergeValue, mergeCombiners,
"""
"""
if numPartitions is None:
numPartitions = self.ctx._defaultParallelism()
numPartitions = self._defaultReducePartitions()
def combineLocally(iterator):
combiners = {}
for x in iterator:
Expand Down Expand Up @@ -211,7 +212,6 @@ def add_shuffle_key(split, iterator):
return dstream



def reduceByWindow(self, reduceFunc, windowDuration, slideDuration, inReduceTunc):
"""
"""
Expand Down Expand Up @@ -254,8 +254,31 @@ def wrapRDD(self, rdd):
raise NotImplementedError

def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
"""
"""
return PipelinedDStream(self, f, preservesPartitioning)

def _defaultReducePartitions(self):
"""
"""
# hard code to avoid the error
return 2
if self.ctx._conf.contains("spark.default.parallelism"):
return self.ctx.defaultParallelism
else:
return self.getNumPartitions()

def getNumPartitions(self):
"""
Returns the number of partitions in RDD
>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
>>> rdd.getNumPartitions()
2
"""
return self._jdstream.partitions().size()


class PipelinedDStream(DStream):
def __init__(self, prev, func, preservesPartitioning=False):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class PythonDStream[T: ClassTag](
}
}

/*

private class PairwiseDStream(prev:DStream[Array[Byte]]) extends
DStream[(Long, Array[Byte])](prev.ssc){
override def dependencies = List(prev)
Expand All @@ -144,9 +144,9 @@ DStream[(Long, Array[Byte])](prev.ssc){
case None => None
}
}
val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream(this)
val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream.fromJavaDStream(this)
}
*/




Expand Down

0 comments on commit 9af03f4

Please sign in to comment.