From c5ecfc1f5d65f8458c3919a2371ada4d32e8f03d Mon Sep 17 00:00:00 2001 From: giwa Date: Thu, 14 Aug 2014 23:42:34 -0700 Subject: [PATCH] basic function test cases are passed --- python/pyspark/worker.py | 10 --- .../streaming/api/python/PythonDStream.scala | 62 +------------------ 2 files changed, 1 insertion(+), 71 deletions(-) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 78e56143fb3ab..90ea7b453d401 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -86,16 +86,6 @@ def main(infile, outfile): (func, deserializer, serializer) = command init_time = time.time() iterator = deserializer.load_stream(infile) - print "deserializer in worker: %s" % str(deserializer) - iterator, walk = itertools.tee(iterator) - if isinstance(walk, int): - print "this is int" - print walk - else: - try: - print list(walk) - except: - print list(walk) serializer.dump_stream(func(split_index, iterator), outfile) except Exception: try: diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index 0bafe3f846793..0186003f70c49 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -206,48 +206,6 @@ class PythonTransformedDStream( } */ -<<<<<<< HEAD -======= -/** - * This is a input stream just for the unitest. This is equivalent to a checkpointable, - * replayable, reliable message queue like Kafka. It requires a sequence as input, and - * returns the i_th element at the i_th batch under manual clock. - */ -class PythonTestInputStream(ssc_ : JavaStreamingContext, inputFiles: JArrayList[String], numPartitions: Int) - extends InputDStream[Array[Byte]](JavaStreamingContext.toStreamingContext(ssc_)){ - - def start() {} - - def stop() {} - - def compute(validTime: Time): Option[RDD[Array[Byte]]] = { - logInfo("Computing RDD for time " + validTime) - inputFiles.foreach(logInfo(_)) - // make a temporary file - // make empty RDD - val prefix = "spark" - val suffix = ".tmp" - val tempFile = File.createTempFile(prefix, suffix) - val index = ((validTime - zeroTime) / slideDuration - 1).toInt - logInfo("Index: " + index) - - val selectedInputFile: String = { - if (inputFiles.isEmpty){ - tempFile.getAbsolutePath - }else if (index < inputFiles.size()) { - inputFiles.get(index) - } else { - tempFile.getAbsolutePath - } - } - val rdd = PythonRDD.readRDDFromFile(JavaSparkContext.fromSparkContext(ssc_.sparkContext), selectedInputFile, numPartitions).rdd - logInfo("Created RDD " + rdd.id + " with " + selectedInputFile) - Some(rdd) - } - - val asJavaDStream = JavaDStream.fromDStream(this) -} - /** * This is a input stream just for the unitest. This is equivalent to a checkpointable, * replayable, reliable message queue like Kafka. It requires a sequence as input, and @@ -255,7 +213,7 @@ class PythonTestInputStream(ssc_ : JavaStreamingContext, inputFiles: JArrayList[ * This implementation is close to QueStream */ -class PythonTestInputStream2(ssc_ : JavaStreamingContext, inputRDDs: JArrayList[JavaRDD[Array[Byte]]]) +class PythonTestInputStream(ssc_ : JavaStreamingContext, inputRDDs: JArrayList[JavaRDD[Array[Byte]]]) extends InputDStream[Array[Byte]](JavaStreamingContext.toStreamingContext(ssc_)) { def start() {} @@ -280,21 +238,3 @@ class PythonTestInputStream2(ssc_ : JavaStreamingContext, inputRDDs: JArrayList[ val asJavaDStream = JavaDStream.fromDStream(this) } - - -class PythonTestInputStream3(ssc_ : JavaStreamingContext) - extends InputDStream[Any](JavaStreamingContext.toStreamingContext(ssc_)) { - - def start() {} - - def stop() {} - - def compute(validTime: Time): Option[RDD[Any]] = { - val index = ((validTime - zeroTime) / slideDuration - 1).toInt - val selectedInput = ArrayBuffer(1, 2, 3).toSeq - val rdd :RDD[Any] = ssc.sc.makeRDD(selectedInput, 2) - Some(rdd) - } - - val asJavaDStream = JavaDStream.fromDStream(this) -}>>>>>>> broke something