diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 0123e4e8633fc..8b355bf6b7d79 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -253,53 +253,6 @@ def _test_func(self, input, func, expected, numSlices=None, sort=False): self.assertEqual(expected, result) -class TestTransform(PySparkStreamingTestCase): - def setUp(self): - PySparkStreamingTestCase.setUp(self) - self.timeout = 10 - - def test_transform(self): - input = [range(1, 5), range(5, 9), range(9, 13)] - - def func(stream): - return stream.transform(lambda r: r and r.map(str)) - - expected = map(lambda x: map(str, x), input) - self._test_func(input, func, expected) - self.assertEqual(expected, output) - - def _test_func(self, input, func, expected, numSlices=None): - """ - Start stream and return the result. - @param input: dataset for the test. This should be list of lists. - @param func: wrapped function. This function should return PythonDStream object. - @param expected: expected output for this testcase. - @param numSlices: the number of slices in the rdd in the dstream. - """ - # Generate input stream with user-defined input. - input_stream = self.ssc._makeStream(input, numSlices) - # Apply test function to stream. - stream = func(input_stream) - result = stream.collect() - self.ssc.start() - - start_time = time.time() - # Loop until get the expected the number of the result from the stream. - while True: - current_time = time.time() - # Check time out. - if (current_time - start_time) > self.timeout: - break - # StreamingContext.awaitTermination is not used to wait because - # if py4j server is called every 50 milliseconds, it gets an error. - time.sleep(0.05) - # Check if the output is the same length of expected output. - if len(expected) == len(result): - break - - return result - - class TestStreamingContext(unittest.TestCase): """ Should we have conf property in SparkContext?