Skip to content

Commit

Permalink
clean up dstream.py
Browse files Browse the repository at this point in the history
  • Loading branch information
giwa committed Sep 20, 2014
1 parent c462bb3 commit 4d40d63
Showing 1 changed file with 1 addition and 10 deletions.
11 changes: 1 addition & 10 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ def __init__(self, jdstream, ssc, jrdd_deserializer):

def count(self):
"""
"""
# TODO: make sure count implementation, this different from what pyspark does
return self._mapPartitions(lambda i: [sum(1 for _ in i)])._sum()

def _sum(self):
Expand Down Expand Up @@ -79,7 +77,6 @@ def _mapPartitionsWithIndex(self, f, preservesPartitioning=False):

def reduce(self, func):
"""
"""
return self.map(lambda x: (None, x)).reduceByKey(func, 1).map(lambda x: x[1])

Expand Down Expand Up @@ -107,12 +104,6 @@ def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
def combineLocally(iterator):
combiners = {}
for x in iterator:

#TODO for count operation make sure count implementation
# This is different from what pyspark does
#if isinstance(x, int):
# x = ("", x)

(k, v) = x
if k not in combiners:
combiners[k] = createCombiner(v)
Expand Down Expand Up @@ -142,6 +133,7 @@ def partitionBy(self, numPartitions, partitionFunc=None):

if partitionFunc is None:
partitionFunc = lambda x: 0 if x is None else hash(x)

# Transferring O(n) objects to Java is too expensive. Instead, we'll
# form the hash buckets in Python, transferring O(numPartitions) objects
# to Java. Each object is a (splitNumber, [objects]) pair.
Expand Down Expand Up @@ -228,7 +220,6 @@ def takeAndPrint(rdd, time):

self.foreachRDD(takeAndPrint)


#def transform(self, func):
# from utils import RDDFunction
# wrapped_func = RDDFunction(self.ctx, self._jrdd_deserializer, func)
Expand Down

0 comments on commit 4d40d63

Please sign in to comment.