diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 32d6c3e45eb54..66024d539ce5c 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -17,7 +17,6 @@ from collections import defaultdict from itertools import chain, ifilter, imap -import time import operator from pyspark.serializers import NoOpSerializer,\ @@ -246,8 +245,6 @@ def takeAndPrint(rdd, time): taken = rdd.take(11) print "-------------------------------------------" print "Time: %s" % (str(time)) - print rdd.glom().collect() - print "-------------------------------------------" print "-------------------------------------------" for record in taken[:10]: print record @@ -447,6 +444,7 @@ def pipeline_func(split, iterator): self._prev_jdstream = prev._prev_jdstream # maintain the pipeline self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer self.is_cached = False + self.is_checkpointed = False self._ssc = prev._ssc self.ctx = prev.ctx self.prev = prev @@ -483,4 +481,4 @@ def _jdstream(self): return self._jdstream_val def _is_pipelinable(self): - return not self.is_cached + return not (self.is_cached or self.is_checkpointed)