Skip to content

Commit

Permalink
removed unnesessary changes
Browse files Browse the repository at this point in the history
  • Loading branch information
giwa committed Aug 18, 2014
1 parent 3a671cc commit 774f18d
Showing 1 changed file with 1 addition and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@

package org.apache.spark.streaming.dstream


import java.io._
import java.io.{IOException, ObjectInputStream, ObjectOutputStream}

import scala.deprecated
import scala.collection.mutable.HashMap
import scala.reflect.ClassTag
import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
import scala.util.control.Breaks._

import org.apache.spark.{Logging, SparkException}
import org.apache.spark.rdd.{BlockRDD, RDD}
Expand All @@ -34,7 +31,6 @@ import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.scheduler.Job
import org.apache.spark.util.MetadataCleaner
import org.apache.spark.streaming.Duration
import org.apache.spark.api.python.PythonRDD

/**
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
Expand Down Expand Up @@ -562,11 +558,9 @@ abstract class DStream[T: ClassTag] (
// DStreams can't be serialized with closures, we can't proactively check
// it for serializability and so we pass the optional false to SparkContext.clean

// serialized python
val cleanedF = context.sparkContext.clean(transformFunc, false)
val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
assert(rdds.length == 1)
// if transformfunc is fine, it is okay
cleanedF(rdds.head.asInstanceOf[RDD[T]], time)
}
new TransformedDStream[U](Seq(this), realTransformFunc)
Expand Down

0 comments on commit 774f18d

Please sign in to comment.