Skip to content

Commit

Permalink
added atexit to handle callback server
Browse files Browse the repository at this point in the history
  • Loading branch information
giwa committed Aug 31, 2014
1 parent fdc9125 commit ee50c5a
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 13 deletions.
28 changes: 17 additions & 11 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import sys
from signal import signal, SIGTERM, SIGINT
import atexit
import time

from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer
from pyspark.context import SparkContext
Expand Down Expand Up @@ -73,29 +75,30 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
# Callback sever is need only by SparkStreming; therefore the callback sever
# is started in StreamingContext.
SparkContext._gateway.restart_callback_server()
self._set_clean_up_trigger()
self._set_clean_up_handler()
self._jvm = self._sc._jvm
self._jssc = self._initialize_context(self._sc._jsc, duration._jduration)

# Initialize StremaingContext in function to allow subclass specific initialization
def _initialize_context(self, jspark_context, jduration):
return self._jvm.JavaStreamingContext(jspark_context, jduration)

def _set_clean_up_trigger(self):
"""Kill py4j callback server properly using signal lib"""
def _set_clean_up_handler(self):
""" set clean up hander using atexit """

def clean_up_handler(*args):
# Make sure stop callback server.
def clean_up_handler():
SparkContext._gateway.shutdown()
sys.exit(0)

atexit.register(clean_up_handler)
# atext is not called when the program is killed by a signal not handled by
# Python.
for sig in (SIGINT, SIGTERM):
signal(sig, clean_up_handler)

@property
def sparkContext(self):
"""
Return SparkContext which is associated this StreamingContext
Return SparkContext which is associated with this StreamingContext.
"""
return self._sc

Expand Down Expand Up @@ -152,11 +155,14 @@ def stop(self, stopSparkContext=True, stopGraceFully=False):
Stop the execution of the streams immediately (does not wait for all received data
to be processed).
"""
try:
self._jssc.stop(stopSparkContext, stopGraceFully)
finally:
SparkContext._gateway.shutdown()
self._jssc.stop(stopSparkContext, stopGraceFully)
if stopSparkContext:
self._sc.stop()

# Shutdown only callback server and all py3j client is shutdowned
# clean up handler
SparkContext._gateway._shutdown_callback_server()

def _testInputStream(self, test_inputs, numSlices=None):
"""
This function is only for unittest.
Expand Down
68 changes: 66 additions & 2 deletions python/pyspark/streaming/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import unittest

from pyspark.context import SparkContext
from pyspark.conf import SparkConf
from pyspark.streaming.context import StreamingContext
from pyspark.streaming.duration import *

Expand All @@ -47,8 +48,6 @@ def tearDown(self):
# we do not wait to shutdown py4j client.
self.ssc._jssc.stop()
self.ssc._sc.stop()
# Why does it long time to terminate StremaingContext and SparkContext?
# Should we change the sleep time if this depends on machine spec?
time.sleep(1)

@classmethod
Expand Down Expand Up @@ -455,6 +454,71 @@ def _run_stream(self, test_input, test_func, expected_output, numSlices=None):

return result


class TestStreamingContextSuite(unittest.TestCase):
"""
Should we have conf property in SparkContext?
@property
def conf(self):
return self._conf
"""
def setUp(self):
self.master = "local[2]"
self.appName = self.__class__.__name__
self.batachDuration = Milliseconds(500)
self.sparkHome = "SomeDir"
self.envPair = {"key": "value"}

def tearDown(self):
# Do not call pyspark.streaming.context.StreamingContext.stop directly because
# we do not wait to shutdown py4j client.
self.ssc._jssc.stop()
self.ssc._sc.stop()
# Why does it long time to terminate StremaingContext and SparkContext?
# Should we change the sleep time if this depends on machine spec?
time.sleep(1)

@classmethod
def tearDownClass(cls):
# Make sure tp shutdown the callback server
SparkContext._gateway._shutdown_callback_server()


def test_from_no_conf_constructor(self):
ssc = StreamingContext(master=self.master, appName=self.appName, duration=batachDuration)
# Alternative call master: ssc.sparkContext.master
# I try to make code close to Scala.
self.assertEqual(ssc.sparkContext._conf.get("spark.master"), self.master)
self.assertEqual(ssc.sparkContext._conf.get("spark.app.name"), self.appName)

def test_from_no_conf_plus_spark_home(self):
ssc = StreamingContext(master=self.master, appName=self.appName,
sparkHome=self.sparkHome, duration=batachDuration)
self.assertEqual(ssc.sparkContext._conf.get("spark.home"), self.sparkHome)

def test_from_existing_spark_context(self):
sc = SparkContext(master=self.master, appName=self.appName)
ssc = StreamingContext(sparkContext=sc)

def test_existing_spark_context_with_settings(self):
conf = SparkConf()
conf.set("spark.cleaner.ttl", "10")
sc = SparkContext(master=self.master, appName=self.appName, conf=conf)
ssc = StreamingContext(context=sc)
self.assertEqual(int(ssc.sparkContext._conf.get("spark.cleaner.ttl")), 10)

def _addInputStream(self, s):
test_inputs = map(lambda x: range(1, x), range(5, 101))
# make sure numSlice is 2 due to deserializer proglem in pyspark
s._testInputStream(test_inputs, 2)







if __name__ == "__main__":
unittest.main()
SparkContext._gateway._shutdown_callback_server()

0 comments on commit ee50c5a

Please sign in to comment.