From ee50c5a9f76b2c10a6ba430be9d0b3ae01deed85 Mon Sep 17 00:00:00 2001 From: giwa Date: Sun, 31 Aug 2014 20:48:10 +0900 Subject: [PATCH] added atexit to handle callback server --- python/pyspark/streaming/context.py | 28 +++++++----- python/pyspark/streaming/tests.py | 68 ++++++++++++++++++++++++++++- 2 files changed, 83 insertions(+), 13 deletions(-) diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index bbb4f6764e266..19e97f38861a6 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -17,6 +17,8 @@ import sys from signal import signal, SIGTERM, SIGINT +import atexit +import time from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer from pyspark.context import SparkContext @@ -73,7 +75,7 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, # Callback sever is need only by SparkStreming; therefore the callback sever # is started in StreamingContext. SparkContext._gateway.restart_callback_server() - self._set_clean_up_trigger() + self._set_clean_up_handler() self._jvm = self._sc._jvm self._jssc = self._initialize_context(self._sc._jsc, duration._jduration) @@ -81,21 +83,22 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, def _initialize_context(self, jspark_context, jduration): return self._jvm.JavaStreamingContext(jspark_context, jduration) - def _set_clean_up_trigger(self): - """Kill py4j callback server properly using signal lib""" + def _set_clean_up_handler(self): + """ set clean up hander using atexit """ - def clean_up_handler(*args): - # Make sure stop callback server. + def clean_up_handler(): SparkContext._gateway.shutdown() - sys.exit(0) + atexit.register(clean_up_handler) + # atext is not called when the program is killed by a signal not handled by + # Python. for sig in (SIGINT, SIGTERM): signal(sig, clean_up_handler) @property def sparkContext(self): """ - Return SparkContext which is associated this StreamingContext + Return SparkContext which is associated with this StreamingContext. """ return self._sc @@ -152,11 +155,14 @@ def stop(self, stopSparkContext=True, stopGraceFully=False): Stop the execution of the streams immediately (does not wait for all received data to be processed). """ - try: - self._jssc.stop(stopSparkContext, stopGraceFully) - finally: - SparkContext._gateway.shutdown() + self._jssc.stop(stopSparkContext, stopGraceFully) + if stopSparkContext: + self._sc.stop() + # Shutdown only callback server and all py3j client is shutdowned + # clean up handler + SparkContext._gateway._shutdown_callback_server() + def _testInputStream(self, test_inputs, numSlices=None): """ This function is only for unittest. diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 3c9174e64cf77..d7f86fc8f5923 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -33,6 +33,7 @@ import unittest from pyspark.context import SparkContext +from pyspark.conf import SparkConf from pyspark.streaming.context import StreamingContext from pyspark.streaming.duration import * @@ -47,8 +48,6 @@ def tearDown(self): # we do not wait to shutdown py4j client. self.ssc._jssc.stop() self.ssc._sc.stop() - # Why does it long time to terminate StremaingContext and SparkContext? - # Should we change the sleep time if this depends on machine spec? time.sleep(1) @classmethod @@ -455,6 +454,71 @@ def _run_stream(self, test_input, test_func, expected_output, numSlices=None): return result + +class TestStreamingContextSuite(unittest.TestCase): + """ + Should we have conf property in SparkContext? + @property + def conf(self): + return self._conf + + """ + def setUp(self): + self.master = "local[2]" + self.appName = self.__class__.__name__ + self.batachDuration = Milliseconds(500) + self.sparkHome = "SomeDir" + self.envPair = {"key": "value"} + + def tearDown(self): + # Do not call pyspark.streaming.context.StreamingContext.stop directly because + # we do not wait to shutdown py4j client. + self.ssc._jssc.stop() + self.ssc._sc.stop() + # Why does it long time to terminate StremaingContext and SparkContext? + # Should we change the sleep time if this depends on machine spec? + time.sleep(1) + + @classmethod + def tearDownClass(cls): + # Make sure tp shutdown the callback server + SparkContext._gateway._shutdown_callback_server() + + + def test_from_no_conf_constructor(self): + ssc = StreamingContext(master=self.master, appName=self.appName, duration=batachDuration) + # Alternative call master: ssc.sparkContext.master + # I try to make code close to Scala. + self.assertEqual(ssc.sparkContext._conf.get("spark.master"), self.master) + self.assertEqual(ssc.sparkContext._conf.get("spark.app.name"), self.appName) + + def test_from_no_conf_plus_spark_home(self): + ssc = StreamingContext(master=self.master, appName=self.appName, + sparkHome=self.sparkHome, duration=batachDuration) + self.assertEqual(ssc.sparkContext._conf.get("spark.home"), self.sparkHome) + + def test_from_existing_spark_context(self): + sc = SparkContext(master=self.master, appName=self.appName) + ssc = StreamingContext(sparkContext=sc) + + def test_existing_spark_context_with_settings(self): + conf = SparkConf() + conf.set("spark.cleaner.ttl", "10") + sc = SparkContext(master=self.master, appName=self.appName, conf=conf) + ssc = StreamingContext(context=sc) + self.assertEqual(int(ssc.sparkContext._conf.get("spark.cleaner.ttl")), 10) + + def _addInputStream(self, s): + test_inputs = map(lambda x: range(1, x), range(5, 101)) + # make sure numSlice is 2 due to deserializer proglem in pyspark + s._testInputStream(test_inputs, 2) + + + + + + + if __name__ == "__main__": unittest.main() SparkContext._gateway._shutdown_callback_server()