From 4bcb31888aa03a6441711f291c64e78e8b259308 Mon Sep 17 00:00:00 2001 From: Ken Takagiwa Date: Wed, 16 Jul 2014 11:07:42 -0700 Subject: [PATCH] implementing transform function in Python --- python/pyspark/streaming/dstream.py | 1 - .../api/python/PythonTransformedDStream.scala | 37 +++++++++++++++++++ .../spark/streaming/dstream/DStream.scala | 1 + 3 files changed, 38 insertions(+), 1 deletion(-) create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonTransformedDStream.scala diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 224d2bbdeeb53..5a56a3d958254 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -379,7 +379,6 @@ def saveAsTextFiles(self, prefix, suffix=None): """ Save this DStream as a text file, using string representations of elements. """ - def saveAsTextFile(rdd, time): path = rddToFileName(prefix, suffix, time) rdd.saveAsTextFile(path) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonTransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonTransformedDStream.scala new file mode 100644 index 0000000000000..ff70483b771a4 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonTransformedDStream.scala @@ -0,0 +1,37 @@ +package org.apache.spark.streaming.api.python + +import org.apache.spark.Accumulator +import org.apache.spark.api.python.PythonRDD +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.api.java.JavaDStream +import org.apache.spark.streaming.{Time, Duration} +import org.apache.spark.streaming.dstream.DStream + +import scala.reflect.ClassTag + +/** + * Created by ken on 7/15/14. + */ +class PythonTransformedDStream[T: ClassTag]( + parents: Seq[DStream[T]], + command: Array[Byte], + envVars: JMap[String, String], + pythonIncludes: JList[String], + preservePartitoning: Boolean, + pythonExec: String, + broadcastVars: JList[Broadcast[Array[Byte]]], + accumulator: Accumulator[JList[Array[Byte]]] + ) extends DStream[Array[Byte]](parent.ssc) { + + override def dependencies = List(parent) + + override def slideDuration: Duration = parent.slideDuration + + //pythonDStream compute + override def compute(validTime: Time): Option[RDD[Array[Byte]]] = { + val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq + Some() + } + val asJavaDStream = JavaDStream.fromDStream(this) +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 5377dfa52d461..13032fca15616 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -563,6 +563,7 @@ abstract class DStream[T: ClassTag] ( val cleanedF = context.sparkContext.clean(transformFunc, false) val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { assert(rdds.length == 1) + // if transformfunc is fine, it is okay cleanedF(rdds.head.asInstanceOf[RDD[T]], time) } new TransformedDStream[U](Seq(this), realTransformFunc)