From 74df565e26e9bf7b107cc678e1668dfda7d534ef Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Sat, 27 Sep 2014 00:48:03 -0700 Subject: [PATCH] fix print and docs --- python/pyspark/streaming/dstream.py | 56 ++++++++++++----------------- 1 file changed, 23 insertions(+), 33 deletions(-) diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 8c79eece773ce..01ca56a7a0387 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -17,6 +17,7 @@ from itertools import chain, ifilter, imap import operator +from datetime import datetime from pyspark import RDD from pyspark.storagelevel import StorageLevel @@ -54,17 +55,6 @@ def sum(self): """ return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) - def print_(self, label=None): - """ - Since print is reserved name for python, we cannot define a "print" method function. - This function prints serialized data in RDD in DStream because Scala and Java cannot - deserialized pickled python object. Please use DStream.pyprint() to print results. - - Call DStream.print() and this function will print byte array in the DStream - """ - # a hack to call print function in DStream - getattr(self._jdstream, "print")(label) - def filter(self, f): """ Return a new DStream containing only the elements that satisfy predicate. @@ -154,19 +144,15 @@ def foreachRDD(self, func): jfunc = RDDFunction(self.ctx, func, self._jrdd_deserializer) self.ctx._jvm.PythonForeachDStream(self._jdstream.dstream(), jfunc) - def pyprint(self): + def pprint(self): """ Print the first ten elements of each RDD generated in this DStream. This is an output operator, so this DStream will be registered as an output stream and there materialized. """ def takeAndPrint(rdd, time): - """ - Closure to take element from RDD and print first 10 elements. - This closure is called by py4j callback server. - """ taken = rdd.take(11) print "-------------------------------------------" - print "Time: %s" % (str(time)) + print "Time: %s" % datetime.fromtimestamp(time / 1000.0) print "-------------------------------------------" for record in taken[:10]: print record @@ -176,6 +162,20 @@ def takeAndPrint(rdd, time): self.foreachRDD(takeAndPrint) + def collect(self): + """ + Collect each RDDs into the returned list. + + :return: list, which will have the collected items. + """ + result = [] + + def get_output(rdd, time): + r = rdd.collect() + result.append(r) + self.foreachRDD(get_output) + return result + def mapValues(self, f): """ Pass each value in the key-value pair RDD through a map function @@ -196,9 +196,9 @@ def flatMapValues(self, f): def glom(self): """ - Return a new DStream in which RDD is generated by applying glom() to RDD of - this DStream. Applying glom() to an RDD coalesces all elements within each partition into - an list. + Return a new DStream in which RDD is generated by applying glom() + to RDD of this DStream. Applying glom() to an RDD coalesces all + elements within each partition into an list. """ def func(iterator): yield list(iterator) @@ -228,11 +228,11 @@ def checkpoint(self, interval): Mark this DStream for checkpointing. It will be saved to a file inside the checkpoint directory set with L{SparkContext.setCheckpointDir()} - @param interval: Time interval after which generated RDD will be checkpointed - interval has to be pyspark.streaming.duration.Duration + @param interval: time in seconds, after which generated RDD will + be checkpointed """ self.is_checkpointed = True - self._jdstream.checkpoint(interval._jduration) + self._jdstream.checkpoint(self._ssc._jduration(interval)) return self def groupByKey(self, numPartitions=None): @@ -245,7 +245,6 @@ def groupByKey(self, numPartitions=None): Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey will provide much better performance. - """ return self.transform(lambda rdd: rdd.groupByKey(numPartitions)) @@ -288,15 +287,6 @@ def saveAsPickleFile(rdd, time): return self.foreachRDD(saveAsPickleFile) - def collect(self): - result = [] - - def get_output(rdd, time): - r = rdd.collect() - result.append(r) - self.foreachRDD(get_output) - return result - def transform(self, func): return TransformedDStream(self, lambda a, t: func(a), True)