From 268a6a567f7a00ad990cd8c70946557b733c654c Mon Sep 17 00:00:00 2001 From: giwa Date: Tue, 19 Aug 2014 15:33:04 -0700 Subject: [PATCH] Changed awaitTermination not to call awaitTermincation in Scala. Just use time.sleep instread --- python/pyspark/streaming/context.py | 4 +++- python/pyspark/streaming_tests.py | 5 +++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 3f455a3e06072..5d6740893ada5 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -16,6 +16,7 @@ # import sys +import time from signal import signal, SIGTERM, SIGINT from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer @@ -102,11 +103,12 @@ def start(self): def awaitTermination(self, timeout=None): """ Wait for the execution to stop. + timeout is milliseconds """ if timeout is None: self._jssc.awaitTermination() else: - self._jssc.awaitTermination(timeout) + time.sleep(timeout/1000) #TODO: add storageLevel def socketTextStream(self, hostname, port): diff --git a/python/pyspark/streaming_tests.py b/python/pyspark/streaming_tests.py index 8396c4f960e81..2964107f2d92e 100644 --- a/python/pyspark/streaming_tests.py +++ b/python/pyspark/streaming_tests.py @@ -48,7 +48,7 @@ def tearDown(self): self.ssc._sc.stop() # Why does it long time to terminate StremaingContext and SparkContext? # Should we change the sleep time if this depends on machine spec? - time.sleep(10) + time.sleep(1) @classmethod def tearDownClass(cls): @@ -436,7 +436,8 @@ def _run_stream(self, test_input, test_func, expected_output, numSlices=None): # Check time out. if (current_time - start_time) > self.timeout: break - self.ssc.awaitTermination(50) + #self.ssc.awaitTermination(50) + time.sleep(0.05) # Check if the output is the same length of expexted output. if len(expected_output) == len(result): break