diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 088a4965b6b13..eee298badcbad 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -140,6 +140,8 @@ def _testInputStream(self, test_inputs, numSlices=None): """ Generate multiple files to make "stream" in Scala side for test. Scala chooses one of the files and generates RDD using PythonRDD.readRDDFromFile. + + QueStream maybe good way to implement this function """ numSlices = numSlices or self._sc.defaultParallelism # Calling the Java parallelize() method with an ArrayList is too slow, diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 07429f477d310..02f35fac47ac0 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -35,25 +35,31 @@ def __init__(self, jdstream, ssc, jrdd_deserializer): self.ctx = ssc._sc self._jrdd_deserializer = jrdd_deserializer + def context(self): + """ + Return the StreamingContext associated with this DStream + """ + return self._ssc + def count(self): """ Return a new DStream which contains the number of elements in this DStream. """ - return self._mapPartitions(lambda i: [sum(1 for _ in i)])._sum() + return self.mapPartitions(lambda i: [sum(1 for _ in i)])._sum() def _sum(self): """ Add up the elements in this DStream. """ - return self._mapPartitions(lambda x: [sum(x)]).reduce(operator.add) + return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) def print_(self, label=None): """ Since print is reserved name for python, we cannot define a "print" method function. This function prints serialized data in RDD in DStream because Scala and Java cannot - deserialized pickled python object. Please use DStream.pyprint() instead to print results. + deserialized pickled python object. Please use DStream.pyprint() to print results. - Call DStream.print(). + Call DStream.print() and this function will print byte array in the DStream """ # a hack to call print function in DStream getattr(self._jdstream, "print")(label) @@ -63,29 +69,32 @@ def filter(self, f): Return a new DStream containing only the elements that satisfy predicate. """ def func(iterator): return ifilter(f, iterator) - return self._mapPartitions(func) + return self.mapPartitions(func) def flatMap(self, f, preservesPartitioning=False): """ Pass each value in the key-value pair DStream through flatMap function without changing the keys: this also retains the original RDD's partition. """ - def func(s, iterator): return chain.from_iterable(imap(f, iterator)) + def func(s, iterator): + return chain.from_iterable(imap(f, iterator)) return self._mapPartitionsWithIndex(func, preservesPartitioning) - def map(self, f): + def map(self, f, preservesPartitioning=False): """ Return a new DStream by applying a function to each element of DStream. """ - def func(iterator): return imap(f, iterator) - return self._mapPartitions(func) + def func(iterator): + return imap(f, iterator) + return self.mapPartitions(func, preservesPartitioning) - def _mapPartitions(self, f): + def mapPartitions(self, f, preservesPartitioning=False): """ Return a new DStream by applying a function to each partition of this DStream. """ - def func(s, iterator): return f(iterator) - return self._mapPartitionsWithIndex(func) + def func(s, iterator): + return f(iterator) + return self._mapPartitionsWithIndex(func, preservesPartitioning) def _mapPartitionsWithIndex(self, f, preservesPartitioning=False): """ @@ -131,7 +140,7 @@ def combineLocally(iterator): else: combiners[k] = mergeValue(combiners[k], v) return combiners.iteritems() - locally_combined = self._mapPartitions(combineLocally) + locally_combined = self.mapPartitions(combineLocally) shuffled = locally_combined.partitionBy(numPartitions) def _mergeCombiners(iterator): @@ -143,7 +152,7 @@ def _mergeCombiners(iterator): combiners[k] = mergeCombiners(combiners[k], v) return combiners.iteritems() - return shuffled._mapPartitions(_mergeCombiners) + return shuffled.mapPartitions(_mergeCombiners) def partitionBy(self, numPartitions, partitionFunc=None): """ @@ -246,6 +255,34 @@ def takeAndPrint(rdd, time): self.foreachRDD(takeAndPrint) + def mapValues(self, f): + """ + Pass each value in the key-value pair RDD through a map function + without changing the keys; this also retains the original RDD's + partitioning. + """ + map_values_fn = lambda (k, v): (k, f(v)) + return self.map(map_values_fn, preservesPartitioning=True) + + def flatMapValues(self, f): + """ + Pass each value in the key-value pair RDD through a flatMap function + without changing the keys; this also retains the original RDD's + partitioning. + """ + flat_map_fn = lambda (k, v): ((k, x) for x in f(v)) + return self.flatMap(flat_map_fn, preservesPartitioning=True) + + def glom(self): + """ + Return a new DStream in which RDD is generated by applying glom() to RDD of + this DStream. Applying glom() to an RDD coalesces all elements within each partition into + an list. + """ + def func(iterator): + yield list(iterator) + return self.mapPartitions(func) + #def transform(self, func): - TD # from utils import RDDFunction # wrapped_func = RDDFunction(self.ctx, self._jrdd_deserializer, func) @@ -255,7 +292,7 @@ def takeAndPrint(rdd, time): def _test_output(self, result): """ This function is only for test case. - Store data in a DStream to result to verify the result in tese case + Store data in a DStream to result to verify the result in test case """ def get_output(rdd, time): taken = rdd.collect() @@ -318,4 +355,4 @@ def _jdstream(self): return self._jdstream_val def _is_pipelinable(self): - return not (self.is_cached) + return not self.is_cached diff --git a/python/pyspark/streaming_tests.py b/python/pyspark/streaming_tests.py index ec45acec94dbf..25ea350ca425f 100644 --- a/python/pyspark/streaming_tests.py +++ b/python/pyspark/streaming_tests.py @@ -142,10 +142,54 @@ def test_func(dstream): output = self._run_stream(test_input, test_func, expected_output) self.assertEqual(expected_output, output) - def _run_stream(self, test_input, test_func, expected_output): + def test_mapValues(self): + """Basic operation test for DStream.mapValues""" + test_input = [["a", "a", "b"], ["", ""], []] + + def test_func(dstream): + return dstream.map(lambda x: (x, 1)).reduceByKey(operator.add).mapValues(lambda x: x + 10) + expected_output = [[("a", 12), ("b", 11)], [("", 12)], []] + output = self._run_stream(test_input, test_func, expected_output) + self.assertEqual(expected_output, output) + + def test_flatMapValues(self): + """Basic operation test for DStream.flatMapValues""" + test_input = [["a", "a", "b"], ["", ""], []] + + def test_func(dstream): + return dstream.map(lambda x: (x, 1)).reduceByKey(operator.add).flatMapValues(lambda x: (x, x + 10)) + expected_output = [[("a", 2), ("a", 12), ("b", 1), ("b", 11)], [("", 2), ("", 12)], []] + output = self._run_stream(test_input, test_func, expected_output) + self.assertEqual(expected_output, output) + + def test_glom(self): + """Basic operation test for DStream.glom""" + test_input = [range(1, 5), range(5, 9), range(9, 13)] + numSlices = 2 + + def test_func(dstream): + dstream.pyprint() + return dstream.glom() + expected_output = [[[1,2], [3,4]],[[5,6], [7,8]],[[9,10], [11,12]]] + output = self._run_stream(test_input, test_func, expected_output, numSlices) + self.assertEqual(expected_output, output) + + def test_mapPartitions(self): + """Basic operation test for DStream.mapPartitions""" + test_input = [range(1, 5), range(5, 9), range(9, 13)] + numSlices = 2 + + def test_func(dstream): + dstream.pyprint() + return dstream.mapPartitions(lambda x: reduce(operator.add, x)) + expected_output = [[3, 7],[11, 15],[19, 23]] + output = self._run_stream(test_input, test_func, expected_output, numSlices) + self.assertEqual(expected_output, output) + + def _run_stream(self, test_input, test_func, expected_output, numSlices=None): """Start stream and return the output""" # Generate input stream with user-defined input - test_input_stream = self.ssc._testInputStream(test_input) + test_input_stream = self.ssc._testInputStream(test_input, numSlices) # Apply test function to stream test_stream = test_func(test_input_stream) # Add job to get output from stream