Skip to content

Commit

Permalink
fixed PEP-008 violation
Browse files Browse the repository at this point in the history
  • Loading branch information
giwa committed Sep 20, 2014
1 parent f21cab3 commit 3d37822
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 30 deletions.
5 changes: 0 additions & 5 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,7 @@
from signal import signal, SIGTERM, SIGINT
from tempfile import NamedTemporaryFile

from pyspark.conf import SparkConf
from pyspark.files import SparkFiles
from pyspark.java_gateway import launch_gateway
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer
from pyspark.storagelevel import *
from pyspark.rdd import RDD
from pyspark.context import SparkContext
from pyspark.streaming.dstream import DStream

Expand Down
19 changes: 12 additions & 7 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def _sum(self):

def print_(self, label=None):
"""
Since print is reserved name for python, we cannot define a print method function.
Since print is reserved name for python, we cannot define a "print" method function.
This function prints serialized data in RDD in DStream because Scala and Java cannot
deserialized pickled python object. Please use DStream.pyprint() instead to print results.
Expand Down Expand Up @@ -159,8 +159,8 @@ def partitionBy(self, numPartitions, partitionFunc=None):
# form the hash buckets in Python, transferring O(numPartitions) objects
# to Java. Each object is a (splitNumber, [objects]) pair.
outputSerializer = self.ctx._unbatched_serializer
def add_shuffle_key(split, iterator):

def add_shuffle_key(split, iterator):
buckets = defaultdict(list)

for (k, v) in iterator:
Expand Down Expand Up @@ -218,6 +218,11 @@ def getNumPartitions(self):

def foreachRDD(self, func):
"""
Apply userdefined function to all RDD in a DStream.
This python implementation could be expensive because it uses callback server
in order to apply function to RDD in DStream.
This is an output operator, so this DStream will be registered as an output
stream and there materialized.
"""
from utils import RDDFunction
wrapped_func = RDDFunction(self.ctx, self._jrdd_deserializer, func)
Expand All @@ -227,7 +232,6 @@ def pyprint(self):
"""
Print the first ten elements of each RDD generated in this DStream. This is an output
operator, so this DStream will be registered as an output stream and there materialized.
"""
def takeAndPrint(rdd, time):
taken = rdd.take(11)
Expand All @@ -248,14 +252,15 @@ def takeAndPrint(rdd, time):
# jdstream = self.ctx._jvm.PythonTransformedDStream(self._jdstream.dstream(), wrapped_func).toJavaDStream
# return DStream(jdstream, self._ssc, ...) ## DO NOT KNOW HOW

def _test_output(self, buff):
def _test_output(self, result):
"""
This function is only for testcase.
Store data in dstream to buffer to valify the result in tesecase
This function is only for test case.
Store data in a DStream to result to verify the result in tese case
"""
def get_output(rdd, time):
taken = rdd.collect()
buff.append(taken)
result.append(taken)

self.foreachRDD(get_output)


Expand Down
37 changes: 19 additions & 18 deletions python/pyspark/streaming_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,10 @@
to focusing to streaming test case
"""
from fileinput import input
from glob import glob
from itertools import chain
import os
import re
import shutil
import subprocess
import sys
import tempfile
import time
import unittest
import zipfile
import operator

from pyspark.context import SparkContext
Expand All @@ -44,12 +36,14 @@

SPARK_HOME = os.environ["SPARK_HOME"]


class StreamOutput:
"""
a class to store the output from stream
"""
result = list()


class PySparkStreamingTestCase(unittest.TestCase):
def setUp(self):
class_name = self.__class__.__name__
Expand All @@ -69,6 +63,7 @@ def tearDownClass(cls):
time.sleep(5)
SparkContext._gateway._shutdown_callback_server()


class TestBasicOperationsSuite(PySparkStreamingTestCase):
"""
Input and output of this TestBasicOperationsSuite is the equivalent to
Expand All @@ -77,7 +72,7 @@ class TestBasicOperationsSuite(PySparkStreamingTestCase):
def setUp(self):
PySparkStreamingTestCase.setUp(self)
StreamOutput.result = list()
self.timeout = 10 # seconds
self.timeout = 10 # seconds

def tearDown(self):
PySparkStreamingTestCase.tearDown(self)
Expand All @@ -88,7 +83,8 @@ def tearDownClass(cls):

def test_map(self):
"""Basic operation test for DStream.map"""
test_input = [range(1,5), range(5,9), range(9, 13)]
test_input = [range(1, 5), range(5, 9), range(9, 13)]

def test_func(dstream):
return dstream.map(lambda x: str(x))
expected_output = map(lambda x: map(lambda y: str(y), x), test_input)
Expand All @@ -97,17 +93,19 @@ def test_func(dstream):

def test_flatMap(self):
"""Basic operation test for DStream.faltMap"""
test_input = [range(1,5), range(5,9), range(9, 13)]
test_input = [range(1, 5), range(5, 9), range(9, 13)]

def test_func(dstream):
return dstream.flatMap(lambda x: (x, x * 2))
expected_output = map(lambda x: list(chain.from_iterable((map(lambda y: [y, y * 2], x)))),
test_input)
test_input)
output = self._run_stream(test_input, test_func, expected_output)
self.assertEqual(expected_output, output)

def test_filter(self):
"""Basic operation test for DStream.filter"""
test_input = [range(1,5), range(5,9), range(9, 13)]
test_input = [range(1, 5), range(5, 9), range(9, 13)]

def test_func(dstream):
return dstream.filter(lambda x: x % 2 == 0)
expected_output = map(lambda x: filter(lambda y: y % 2 == 0, x), test_input)
Expand All @@ -116,7 +114,8 @@ def test_func(dstream):

def test_count(self):
"""Basic operation test for DStream.count"""
test_input = [[], [1], range(1, 3), range(1,4), range(1,5)]
test_input = [[], [1], range(1, 3), range(1, 4), range(1, 5)]

def test_func(dstream):
return dstream.count()
expected_output = map(lambda x: [len(x)], test_input)
Expand All @@ -125,7 +124,8 @@ def test_func(dstream):

def test_reduce(self):
"""Basic operation test for DStream.reduce"""
test_input = [range(1,5), range(5,9), range(9, 13)]
test_input = [range(1, 5), range(5, 9), range(9, 13)]

def test_func(dstream):
return dstream.reduce(operator.add)
expected_output = map(lambda x: [reduce(operator.add, x)], test_input)
Expand All @@ -135,19 +135,20 @@ def test_func(dstream):
def test_reduceByKey(self):
"""Basic operation test for DStream.reduceByKey"""
test_input = [["a", "a", "b"], ["", ""], []]

def test_func(dstream):
return dstream.map(lambda x: (x, 1)).reduceByKey(operator.add)
expected_output = [[("a", 2), ("b", 1)],[("", 2)], []]
expected_output = [[("a", 2), ("b", 1)], [("", 2)], []]
output = self._run_stream(test_input, test_func, expected_output)
self.assertEqual(expected_output, output)

def _run_stream(self, test_input, test_func, expected_output):
"""Start stream and return the output"""
# Generate input stream with user-defined input
test_input_stream = self.ssc._testInputStream(test_input)
# Applyed test function to stream
# Applied test function to stream
test_stream = test_func(test_input_stream)
# Add job to get outpuf from stream
# Add job to get output from stream
test_stream._test_output(StreamOutput.result)
self.ssc.start()

Expand Down

0 comments on commit 3d37822

Please sign in to comment.