diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index bafff80adc54b..46ef05d9c37a1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -17,14 +17,11 @@ package org.apache.spark.streaming.dstream - -import java.io._ +import java.io.{IOException, ObjectInputStream, ObjectOutputStream} import scala.deprecated import scala.collection.mutable.HashMap import scala.reflect.ClassTag -import java.io.{IOException, ObjectInputStream, ObjectOutputStream} -import scala.util.control.Breaks._ import org.apache.spark.{Logging, SparkException} import org.apache.spark.rdd.{BlockRDD, RDD} @@ -34,7 +31,6 @@ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.scheduler.Job import org.apache.spark.util.MetadataCleaner import org.apache.spark.streaming.Duration -import org.apache.spark.api.python.PythonRDD /** * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous @@ -562,11 +558,9 @@ abstract class DStream[T: ClassTag] ( // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean - // serialized python val cleanedF = context.sparkContext.clean(transformFunc, false) val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { assert(rdds.length == 1) - // if transformfunc is fine, it is okay cleanedF(rdds.head.asInstanceOf[RDD[T]], time) } new TransformedDStream[U](Seq(this), realTransformFunc)