From ea9c8731b3d997ead7015d721c66231064e19ff9 Mon Sep 17 00:00:00 2001 From: giwa Date: Fri, 15 Aug 2014 22:30:58 -0700 Subject: [PATCH] added TODO coments --- python/pyspark/streaming/context.py | 3 ++- python/pyspark/streaming/dstream.py | 16 ++++++++++++++-- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 691f9b06ad4e9..470ed270cdbfb 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -17,7 +17,6 @@ import sys from signal import signal, SIGTERM, SIGINT -from tempfile import NamedTemporaryFile from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer from pyspark.context import SparkContext @@ -79,6 +78,7 @@ def _clean_up_trigger(self): """Kill py4j callback server properly using signal lib""" def clean_up_handler(*args): + SparkContext._gateway._shutdown_callback_server() SparkContext._gateway.shutdown() sys.exit(0) @@ -128,6 +128,7 @@ def stop(self, stopSparkContext=True, stopGraceFully=False): self._jssc.stop(stopSparkContext, stopGraceFully) finally: # Stop Callback server + SparkContext._gateway._shutdown_callback_server() SparkContext._gateway.shutdown() def _testInputStream(self, test_inputs, numSlices=None): diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 679360dbca08d..ef0e2258e9922 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -376,15 +376,27 @@ def saveAsTextFile(rdd, time): return self.foreachRDD(saveAsTextFile) +# TODO: implement updateStateByKey +# TODO: implement slice + +# Window Operations +# TODO: implement window +# TODO: implement groupByKeyAndWindow +# TODO: implement reduceByKeyAndWindow +# TODO: implement countByValueAndWindow +# TODO: implement countByWindow +# TODO: implement reduceByWindow + # Following operation has dependency to transform -# TODO: impelment union +# TODO: implement transform +# TODO: implement transformWith +# TODO: implement union # TODO: implement repertitions # TODO: implement cogroup # TODO: implement join # TODO: implement leftOuterJoin # TODO: implemtnt rightOuterJoin - class PipelinedDStream(DStream): def __init__(self, prev, func, preservesPartitioning=False): if not isinstance(prev, PipelinedDStream) or not prev._is_pipelinable():