From 3166d3146dbc194c098a4f7a8ad32cadffabdd54 Mon Sep 17 00:00:00 2001 From: giwa Date: Wed, 20 Aug 2014 16:07:42 -0700 Subject: [PATCH] clean up --- python/pyspark/streaming/context.py | 9 ++-- python/pyspark/streaming/dstream.py | 28 +++++------ python/pyspark/streaming/jtime.py | 3 +- python/pyspark/streaming/utils.py | 12 +++-- .../streaming/api/python/PythonDStream.scala | 48 +++---------------- .../api/python/PythonRDDFunction.java | 4 ++ .../api/python/PythonTransformedDStream.scala | 42 ---------------- 7 files changed, 37 insertions(+), 109 deletions(-) delete mode 100644 streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonTransformedDStream.scala diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index cdd6926bf1c7a..f7e356319ecac 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -142,9 +142,9 @@ def stop(self, stopSparkContext=True, stopGraceFully=False): def _testInputStream(self, test_inputs, numSlices=None): """ - This function is only for test. - This implementation is inspired by QueStream implementation. - Give list of RDD to generate DStream which contains the RDD. + This function is only for unittest. + It requires a sequence as input, and returns the i_th element at the i_th batch + under manual clock. """ test_rdds = list() test_rdd_deserializers = list() @@ -152,7 +152,8 @@ def _testInputStream(self, test_inputs, numSlices=None): test_rdd = self._sc.parallelize(test_input, numSlices) test_rdds.append(test_rdd._jrdd) test_rdd_deserializers.append(test_rdd._jrdd_deserializer) - + # All deserializer has to be the same. + # TODO: add deserializer validation jtest_rdds = ListConverter().convert(test_rdds, SparkContext._gateway._gateway_client) jinput_stream = self._jvm.PythonTestInputStream(self._jssc, jtest_rdds).asJavaDStream() diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 0e2641e33032f..0b01a9f02f51f 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -283,23 +283,6 @@ def func(iterator): yield list(iterator) return self.mapPartitions(func) - #def transform(self, func): - TD - # from utils import RDDFunction - # wrapped_func = RDDFunction(self.ctx, self._jrdd_deserializer, func) - # jdstream = self.ctx._jvm.PythonTransformedDStream(self._jdstream.dstream(), wrapped_func).toJavaDStream - # return DStream(jdstream, self._ssc, ...) ## DO NOT KNOW HOW - - def _test_output(self, result): - """ - This function is only for test case. - Store data in a DStream to result to verify the result in test case - """ - def get_output(rdd, time): - taken = rdd.collect() - result.append(taken) - - self.foreachRDD(get_output) - def cache(self): """ Persist this DStream with the default storage level (C{MEMORY_ONLY_SER}). @@ -404,6 +387,17 @@ def saveAsTextFile(rdd, time): return self.foreachRDD(saveAsTextFile) + def _test_output(self, result): + """ + This function is only for test case. + Store data in a DStream to result to verify the result in test case + """ + def get_output(rdd, time): + collected = rdd.collect() + result.append(collected) + + self.foreachRDD(get_output) + # TODO: implement updateStateByKey # TODO: implement slice diff --git a/python/pyspark/streaming/jtime.py b/python/pyspark/streaming/jtime.py index 32ef741051283..f169228e81868 100644 --- a/python/pyspark/streaming/jtime.py +++ b/python/pyspark/streaming/jtime.py @@ -19,10 +19,11 @@ from pyspark.streaming.duration import Duration """ -The name of this file, time is not good naming for python +The name of this file, time is not a good naming for python because if we do import time when we want to use native python time package, it does not import python time package. """ +# TODO: add doctest class Time(object): diff --git a/python/pyspark/streaming/utils.py b/python/pyspark/streaming/utils.py index 9178577743e0b..5ba179cae7f9c 100644 --- a/python/pyspark/streaming/utils.py +++ b/python/pyspark/streaming/utils.py @@ -19,6 +19,9 @@ class RDDFunction(): + """ + This class is for py4j callback. This + """ def __init__(self, ctx, jrdd_deserializer, func): self.ctx = ctx self.deserializer = jrdd_deserializer @@ -38,6 +41,7 @@ class Java: def msDurationToString(ms): + #TODO: add doctest """ Returns a human-readable string representing a duration such as "35ms" """ @@ -54,8 +58,10 @@ def msDurationToString(ms): else: return "%.2f h" % (float(ms) / hour) + def rddToFileName(prefix, suffix, time): - if suffix is not None: - return prefix + "-" + str(time) + "." + suffix - else: + #TODO: add doctest + if suffix is None: return prefix + "-" + str(time) + else: + return prefix + "-" + str(time) + "." + suffix diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index 2c44f6cc1d42f..3c9fff6f4bf5c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -18,10 +18,8 @@ package org.apache.spark.streaming.api.python import java.io._ -import java.io.{ObjectInputStream, IOException} -import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections} +import java.util.{List => JList, ArrayList => JArrayList, Map => JMap} -import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag import scala.collection.JavaConversions._ @@ -55,7 +53,9 @@ class PythonDStream[T: ClassTag]( override def compute(validTime: Time): Option[RDD[Array[Byte]]] = { parent.getOrCompute(validTime) match{ case Some(rdd) => - val pythonRDD = new PythonRDD(rdd, command, envVars, pythonIncludes, preservePartitoning, pythonExec, broadcastVars, accumulator) + // create PythonRDD to compute Python functions. + val pythonRDD = new PythonRDD(rdd, command, envVars, pythonIncludes, + preservePartitoning, pythonExec, broadcastVars, accumulator) Some(pythonRDD.asJavaRDD.rdd) case None => None } @@ -135,8 +135,8 @@ DStream[Array[Byte]](prev.ssc){ case Some(rdd)=>Some(rdd) val pairwiseRDD = new PairwiseRDD(rdd) /* - * Since python operation is executed by Scala after StreamingContext.start. - * What PythonPairwiseDStream does is equivalent to python code in pySpark. + * Since python function is executed by Scala after StreamingContext.start. + * What PythonPairwiseDStream does is equivalent to python code in pyspark. * * with _JavaStackTrace(self.context) as st: * pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD() @@ -154,23 +154,6 @@ DStream[Array[Byte]](prev.ssc){ } -class PythonTestInputStream3(ssc_ : JavaStreamingContext) - extends InputDStream[Any](JavaStreamingContext.toStreamingContext(ssc_)) { - - def start() {} - - def stop() {} - - def compute(validTime: Time): Option[RDD[Any]] = { - val index = ((validTime - zeroTime) / slideDuration - 1).toInt - val selectedInput = ArrayBuffer(1, 2, 3).toSeq - val rdd :RDD[Any] = ssc.sc.makeRDD(selectedInput, 2) - Some(rdd) - } - - val asJavaDStream = JavaDStream.fromDStream(this) -} - class PythonForeachDStream( prev: DStream[Array[Byte]], foreachFunction: PythonRDDFunction @@ -184,30 +167,11 @@ class PythonForeachDStream( this.register() } -class PythonTransformedDStream( - prev: DStream[Array[Byte]], - transformFunction: PythonRDDFunction - ) extends DStream[Array[Byte]](prev.ssc) { - - override def dependencies = List(prev) - - override def slideDuration: Duration = prev.slideDuration - - override def compute(validTime: Time): Option[RDD[Array[Byte]]] = { - prev.getOrCompute(validTime).map(rdd => { - transformFunction.call(rdd.toJavaRDD(), validTime.milliseconds).rdd - }) - } - - val asJavaDStream = JavaDStream.fromDStream(this) - //val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream.fromJavaDStream(this) -} /** * This is a input stream just for the unitest. This is equivalent to a checkpointable, * replayable, reliable message queue like Kafka. It requires a sequence as input, and * returns the i_th element at the i_th batch under manual clock. - * This implementation is inspired by QueStream */ class PythonTestInputStream(ssc_ : JavaStreamingContext, inputRDDs: JArrayList[JavaRDD[Array[Byte]]]) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonRDDFunction.java b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonRDDFunction.java index 88f7036c3a05b..b46a644dacb7c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonRDDFunction.java +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonRDDFunction.java @@ -3,6 +3,10 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.streaming.Time; +/* + * Interface for py4j callback function. + * This function is called by pyspark.streaming.dstream.DStream.foreachRDD . + */ public interface PythonRDDFunction { JavaRDD call(JavaRDD rdd, long time); } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonTransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonTransformedDStream.scala deleted file mode 100644 index bc07e09ec6d03..0000000000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonTransformedDStream.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - -package org.apache.spark.streaming.api.python - -import org.apache.spark.Accumulator -import org.apache.spark.api.python.PythonRDD -import org.apache.spark.broadcast.Broadcast -import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.api.java.JavaDStream -import org.apache.spark.streaming.{Time, Duration} -import org.apache.spark.streaming.dstream.DStream - -import scala.reflect.ClassTag - -class PythonTransformedDStream[T: ClassTag]( - parent: DStream[T], - command: Array[Byte], - envVars: JMap[String, String], - pythonIncludes: JList[String], - preservePartitoning: Boolean, - pythonExec: String, - broadcastVars: JList[Broadcast[Array[Byte]]], - accumulator: Accumulator[JList[Array[Byte]]] - ) extends DStream[Array[Byte]](parent.ssc) { - - override def dependencies = List(parent) - - override def slideDuration: Duration = parent.slideDuration - - //pythonDStream compute - override def compute(validTime: Time): Option[RDD[Array[Byte]]] = { - -// val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq -// parents.map(_.getOrCompute(validTime).orNull).to -// parent = parents.head.asInstanceOf[RDD] -// Some() - } - - val asJavaDStream = JavaDStream.fromDStream(this) -} - -*/