Skip to content

Commit

Permalink
improve test case
Browse files Browse the repository at this point in the history
  • Loading branch information
giwa committed Aug 21, 2014
1 parent 583e66d commit b7dab85
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 11 deletions.
2 changes: 1 addition & 1 deletion python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from pyspark.rdd import _JavaStackTrace
from pyspark.storagelevel import StorageLevel
from pyspark.resultiterable import ResultIterable
from pyspark.streaming.utils import rddToFileName, RDDFunction
from pyspark.streaming.util import rddToFileName, RDDFunction


from py4j.java_collections import ListConverter, MapConverter
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/streaming/duration.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# limitations under the License.
#

from pyspark.streaming import utils
from pyspark.streaming import util


class Duration(object):
Expand Down Expand Up @@ -82,7 +82,7 @@ def prettyPrint(self):
>>> d_1hour.prettyPrint()
'1.00 h'
"""
return utils.msDurationToString(self._millis)
return util.msDurationToString(self._millis)

def milliseconds(self):
"""
Expand Down
7 changes: 3 additions & 4 deletions python/pyspark/streaming/jtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
# limitations under the License.
#

from pyspark.streaming import utils
from pyspark.streaming.duration import Duration

"""
Expand Down Expand Up @@ -87,7 +86,7 @@ def __sub__(self, other):
if isinstance(other, Duration):
return Time(self._millis - other._millis)
elif isinstance(other, Time):
return Duration(self._mills, other._millis)
return Duration(self._millis, other._millis)
else:
raise TypeError

Expand All @@ -99,7 +98,7 @@ def __lt__(self, other):
def __le__(self, other):
""" Time <= Time """
Time._is_time(other)
return self.millis <= other._millis
return self._millis <= other._millis

def __eq__(self, other):
""" Time == Time """
Expand All @@ -121,7 +120,7 @@ def __ge__(self, other):
Time._is_time(other)
return self._millis >= other._millis

def isMultipbleOf(duration):
def isMultipbleOf(self, duration):
""" is multiple by Duration """
Duration._is_duration(duration)
return self._millis % duration._millis == 0
Expand Down
8 changes: 7 additions & 1 deletion python/pyspark/streaming/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,13 @@
"""
from itertools import chain
import time
import unittest
import operator
import sys

if sys.version_info[:2] <= (2, 6):
import unittest2 as unittest
else:
import unittest

from pyspark.context import SparkContext
from pyspark.streaming.context import StreamingContext
Expand Down Expand Up @@ -451,3 +456,4 @@ def _run_stream(self, test_input, test_func, expected_output, numSlices=None):

if __name__ == "__main__":
unittest.main()
SparkContext._gateway._shutdown_callback_server()
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

class RDDFunction():
"""
This class is for py4j callback. This
This class is for py4j callback. This class is related with
org.apache.spark.streaming.api.python.PythonRDDFunction.
"""
def __init__(self, ctx, jrdd_deserializer, func):
self.ctx = ctx
Expand All @@ -41,10 +42,19 @@ class Java:


def msDurationToString(ms):
#TODO: add doctest
"""
Returns a human-readable string representing a duration such as "35ms"
>> msDurationToString(10)
'10 ms'
>>> msDurationToString(1000)
'1.0 s'
>>> msDurationToString(60000)
'1.0 m'
>>> msDurationToString(3600000)
'1.00 h'
"""
#TODO: add doctest
second = 1000
minute = 60 * second
hour = 60 * minute
Expand All @@ -60,7 +70,15 @@ def msDurationToString(ms):


def rddToFileName(prefix, suffix, time):
#TODO: add doctest
"""
Return string prefix-time(.suffix)
>>> rddToFileName("spark", None, 12345678910)
'spark-12345678910'
>>> rddToFileName("spark", "tmp", 12345678910)
'spark-12345678910.tmp'
"""
if suffix is None:
return prefix + "-" + str(time)
else:
Expand Down
2 changes: 2 additions & 0 deletions python/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ run_test "pyspark/broadcast.py"
run_test "pyspark/accumulators.py"
run_test "pyspark/serializers.py"
run_test "pyspark/streaming/duration.py"
run_test "pyspark/streaming/util.py"
unset PYSPARK_DOC_TEST
run_test "pyspark/shuffle.py"
run_test "pyspark/tests.py"
Expand All @@ -81,6 +82,7 @@ run_test "pyspark/mllib/recommendation.py"
run_test "pyspark/mllib/regression.py"
run_test "pyspark/mllib/tests.py"
run_test "pyspark/mllib/util.py"
run_test "pyspark/streaming/tests.py"

if [[ $FAILED == 0 ]]; then
echo -en "\033[32m" # Green
Expand Down

0 comments on commit b7dab85

Please sign in to comment.