Skip to content

Commit

Permalink
remove reuse of PythonRDD
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Oct 2, 2014
1 parent be5e5ff commit d05871e
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 51 deletions.
28 changes: 11 additions & 17 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,13 +286,11 @@ def transform(self, func):
`func` can have one argument of `rdd`, or have two arguments of
(`time`, `rdd`)
"""
resue = False
if func.func_code.co_argcount == 1:
reuse = True
oldfunc = func
func = lambda t, rdd: oldfunc(rdd)
assert func.func_code.co_argcount == 2, "func should take one or two arguments"
return TransformedDStream(self, func, reuse)
return TransformedDStream(self, func)

def transformWith(self, func, other, keepSerializer=False):
"""
Expand Down Expand Up @@ -597,34 +595,30 @@ class TransformedDStream(DStream):
Multiple continuous transformations of DStream can be combined into
one transformation.
"""
def __init__(self, prev, func, reuse=False):
def __init__(self, prev, func):
ssc = prev._ssc
self._ssc = ssc
self.ctx = ssc._sc
self._jrdd_deserializer = self.ctx.serializer
self.is_cached = False
self.is_checkpointed = False
self._jdstream_val = None

if (isinstance(prev, TransformedDStream) and
not prev.is_cached and not prev.is_checkpointed):
prev_func = prev.func
old_func = func
func = lambda t, rdd: old_func(t, prev_func(t, rdd))
reuse = reuse and prev.reuse
prev = prev.prev

self.prev = prev
self.func = func
self.reuse = reuse
self._jdstream_val = None
self.func = lambda t, rdd: func(t, prev_func(t, rdd))
self.prev = prev.prev
else:
self.prev = prev
self.func = func

@property
def _jdstream(self):
if self._jdstream_val is not None:
return self._jdstream_val

jfunc = TransformFunction(self.ctx, self.func, self.prev._jrdd_deserializer)
jdstream = self.ctx._jvm.PythonTransformedDStream(self.prev._jdstream.dstream(),
jfunc, self.reuse).asJavaDStream()
self._jdstream_val = jdstream
return jdstream
dstream = self.ctx._jvm.PythonTransformedDStream(self.prev._jdstream.dstream(), jfunc)
self._jdstream_val = dstream.asJavaDStream()
return self._jdstream_val
4 changes: 2 additions & 2 deletions python/pyspark/streaming/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ def setup():
return ssc

cpd = tempfile.mkdtemp("test_streaming_cps")
ssc = StreamingContext.getOrCreate(cpd, setup)
self.ssc = ssc = StreamingContext.getOrCreate(cpd, setup)
ssc.start()

def check_output(n):
Expand Down Expand Up @@ -539,7 +539,7 @@ def check_output(n):
ssc.stop(True, True)

time.sleep(1)
ssc = StreamingContext.getOrCreate(cpd, setup)
self.ssc = ssc = StreamingContext.getOrCreate(cpd, setup)
ssc.start()
check_output(3)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,43 +157,18 @@ private[python] abstract class PythonDStream(

/**
* Transformed DStream in Python.
*
* If `reuse` is true and the result of the `func` is an PythonRDD, then it will cache it
* as an template for future use, this can reduce the Python callbacks.
*/
private[python] class PythonTransformedDStream (
parent: DStream[_],
@transient pfunc: PythonTransformFunction,
var reuse: Boolean = false)
@transient pfunc: PythonTransformFunction)
extends PythonDStream(parent, pfunc) {

// rdd returned by func
var lastResult: PythonRDD = _

override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
val rdd = parent.getOrCompute(validTime)
if (rdd.isEmpty) {
return None
}
if (reuse && lastResult != null) {
// use the previous result as the template to generate new RDD
Some(lastResult.copyTo(rdd.get))
if (rdd.isDefined) {
func(rdd, validTime)
} else {
val r = func(rdd, validTime)
if (reuse && r.isDefined && lastResult == null) {
// try to use the result as a template
r.get match {
case pyrdd: PythonRDD =>
if (pyrdd.firstParent == rdd) {
// only one PythonRDD
lastResult = pyrdd
} else {
// maybe have multiple stages, don't check it anymore
reuse = false
}
}
}
r
None
}
}
}
Expand All @@ -209,18 +184,18 @@ private[python] class PythonTransformed2DStream(

val func = new TransformFunction(pfunc)

override def slideDuration: Duration = parent.slideDuration

override def dependencies = List(parent, parent2)

override def slideDuration: Duration = parent.slideDuration

override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
val empty: RDD[_] = ssc.sparkContext.emptyRDD
val rdd1 = parent.getOrCompute(validTime).getOrElse(empty)
val rdd2 = parent2.getOrCompute(validTime).getOrElse(empty)
func(Some(rdd1), Some(rdd2), validTime)
}

val asJavaDStream = JavaDStream.fromDStream(this)
val asJavaDStream = JavaDStream.fromDStream(this)
}

/**
Expand Down

0 comments on commit d05871e

Please sign in to comment.