From 69e9cd33a58b880f96cc9c3e5e62eaa415c49843 Mon Sep 17 00:00:00 2001 From: Ken Takagiwa Date: Wed, 16 Jul 2014 11:07:42 -0700 Subject: [PATCH] implementing transform function in Python --- python/pyspark/mllib/_common.py | 2 +- python/pyspark/streaming/dstream.py | 3 +- .../api/python/PythonTransformedDStream.scala | 37 +++++++++++++++++++ .../spark/streaming/dstream/DStream.scala | 3 ++ 4 files changed, 42 insertions(+), 3 deletions(-) create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonTransformedDStream.scala diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py index e609b60a0f968..4b723693f43e3 100644 --- a/python/pyspark/mllib/_common.py +++ b/python/pyspark/mllib/_common.py @@ -164,7 +164,7 @@ def _deserialize_double_vector(ba, offset=0): nb = len(ba) - offset if nb < 5: raise TypeError("_deserialize_double_vector called on a %d-byte array, " - "which is too short" % nb) + "which is too short" % nb) if ba[offset] == DENSE_VECTOR_MAGIC: return _deserialize_dense_vector(ba, offset) elif ba[offset] == SPARSE_VECTOR_MAGIC: diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index e144f8bc1cc09..3365c6d69c1a2 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -172,7 +172,6 @@ def _mergeCombiners(iterator): return combiners.iteritems() return shuffled.mapPartitions(_mergeCombiners) - def partitionBy(self, numPartitions, partitionFunc=None): """ Return a copy of the DStream partitioned using the specified partitioner. @@ -231,6 +230,7 @@ def slice(self, fromTime, toTime): def transform(self, transformFunc): """ """ + self._jdstream.transform(transformFunc) raise NotImplementedError def transformWith(self, other, transformFunc): @@ -264,7 +264,6 @@ def _defaultReducePartitions(self): """ # hard code to avoid the error - return 2 if self.ctx._conf.contains("spark.default.parallelism"): return self.ctx.defaultParallelism else: diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonTransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonTransformedDStream.scala new file mode 100644 index 0000000000000..ff70483b771a4 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonTransformedDStream.scala @@ -0,0 +1,37 @@ +package org.apache.spark.streaming.api.python + +import org.apache.spark.Accumulator +import org.apache.spark.api.python.PythonRDD +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.api.java.JavaDStream +import org.apache.spark.streaming.{Time, Duration} +import org.apache.spark.streaming.dstream.DStream + +import scala.reflect.ClassTag + +/** + * Created by ken on 7/15/14. + */ +class PythonTransformedDStream[T: ClassTag]( + parents: Seq[DStream[T]], + command: Array[Byte], + envVars: JMap[String, String], + pythonIncludes: JList[String], + preservePartitoning: Boolean, + pythonExec: String, + broadcastVars: JList[Broadcast[Array[Byte]]], + accumulator: Accumulator[JList[Array[Byte]]] + ) extends DStream[Array[Byte]](parent.ssc) { + + override def dependencies = List(parent) + + override def slideDuration: Duration = parent.slideDuration + + //pythonDStream compute + override def compute(validTime: Time): Option[RDD[Array[Byte]]] = { + val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq + Some() + } + val asJavaDStream = JavaDStream.fromDStream(this) +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index d9d5446b62e9f..67977244ef420 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -561,9 +561,12 @@ abstract class DStream[T: ClassTag] ( // because the DStream is reachable from the outer object here, and because // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean + + // serialized python val cleanedF = context.sparkContext.clean(transformFunc, false) val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { assert(rdds.length == 1) + // if transformfunc is fine, it is okay cleanedF(rdds.head.asInstanceOf[RDD[T]], time) } new TransformedDStream[U](Seq(this), realTransformFunc)