From 1f0cfe9617128cf091fdc4afefd7839c3acbaa47 Mon Sep 17 00:00:00 2001 From: Prabeesh K Date: Sat, 11 Jul 2015 10:26:40 +0400 Subject: [PATCH] python style fix --- dev/sparktestsupport/modules.py | 8 +++++++- python/pyspark/streaming/tests.py | 9 ++++++--- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 45a03c7ea7447..b283753f2dfd7 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -291,7 +291,13 @@ def contains_file(self, filename): pyspark_streaming = Module( name="pyspark-streaming", - dependencies=[pyspark_core, streaming, streaming_kafka, streaming_flume_assembly, streaming_mqtt], + dependencies=[ + pyspark_core, + streaming, + streaming_kafka, + streaming_flume_assembly, + streaming_mqtt + ], source_file_regexes=[ "python/pyspark/streaming" ], diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 3012cd1e1a1b7..a4324a748ae32 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -827,6 +827,7 @@ def test_flume_polling(self): def test_flume_polling_multiple_hosts(self): self._testMultipleTimes(self._testFlumePollingMultipleHosts) + class MQTTStreamTests(PySparkStreamingTestCase): timeout = 20 # seconds duration = 1 @@ -841,8 +842,8 @@ def setUp(self): def tearDown(self): if self._MQTTTestUtils is not None: - self._MQTTTestUtils.teardown() - self._MQTTTestUtils = None + self._MQTTTestUtils.teardown() + self._MQTTTestUtils = None super(MQTTStreamTests, self).tearDown() @@ -905,10 +906,11 @@ def search_flume_assembly_jar(): "'build/mvn package' before running this test") elif len(jars) > 1: raise Exception(("Found multiple Spark Streaming Flume assembly JARs in %s; please " - "remove all but one") % flume_assembly_dir) + "remove all but one") % flume_assembly_dir) else: return jars[0] + def search_mqtt_assembly_jar(): SPARK_HOME = os.environ["SPARK_HOME"] mqtt_assembly_dir = os.path.join(SPARK_HOME, "external/mqtt-assembly") @@ -926,6 +928,7 @@ def search_mqtt_assembly_jar(): else: return jars[0] + if __name__ == "__main__": kafka_assembly_jar = search_kafka_assembly_jar() flume_assembly_jar = search_flume_assembly_jar()