Skip to content

Commit

Permalink
clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
giwa committed Sep 20, 2014

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent c00e091 commit 3166d31
Showing 7 changed files with 37 additions and 109 deletions.
9 changes: 5 additions & 4 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
@@ -142,17 +142,18 @@ def stop(self, stopSparkContext=True, stopGraceFully=False):

def _testInputStream(self, test_inputs, numSlices=None):
"""
This function is only for test.
This implementation is inspired by QueStream implementation.
Give list of RDD to generate DStream which contains the RDD.
This function is only for unittest.
It requires a sequence as input, and returns the i_th element at the i_th batch
under manual clock.
"""
test_rdds = list()
test_rdd_deserializers = list()
for test_input in test_inputs:
test_rdd = self._sc.parallelize(test_input, numSlices)
test_rdds.append(test_rdd._jrdd)
test_rdd_deserializers.append(test_rdd._jrdd_deserializer)

# All deserializer has to be the same.
# TODO: add deserializer validation
jtest_rdds = ListConverter().convert(test_rdds, SparkContext._gateway._gateway_client)
jinput_stream = self._jvm.PythonTestInputStream(self._jssc, jtest_rdds).asJavaDStream()

28 changes: 11 additions & 17 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
@@ -283,23 +283,6 @@ def func(iterator):
yield list(iterator)
return self.mapPartitions(func)

#def transform(self, func): - TD
# from utils import RDDFunction
# wrapped_func = RDDFunction(self.ctx, self._jrdd_deserializer, func)
# jdstream = self.ctx._jvm.PythonTransformedDStream(self._jdstream.dstream(), wrapped_func).toJavaDStream
# return DStream(jdstream, self._ssc, ...) ## DO NOT KNOW HOW

def _test_output(self, result):
"""
This function is only for test case.
Store data in a DStream to result to verify the result in test case
"""
def get_output(rdd, time):
taken = rdd.collect()
result.append(taken)

self.foreachRDD(get_output)

def cache(self):
"""
Persist this DStream with the default storage level (C{MEMORY_ONLY_SER}).
@@ -404,6 +387,17 @@ def saveAsTextFile(rdd, time):

return self.foreachRDD(saveAsTextFile)

def _test_output(self, result):
"""
This function is only for test case.
Store data in a DStream to result to verify the result in test case
"""
def get_output(rdd, time):
collected = rdd.collect()
result.append(collected)

self.foreachRDD(get_output)


# TODO: implement updateStateByKey
# TODO: implement slice
3 changes: 2 additions & 1 deletion python/pyspark/streaming/jtime.py
Original file line number Diff line number Diff line change
@@ -19,10 +19,11 @@
from pyspark.streaming.duration import Duration

"""
The name of this file, time is not good naming for python
The name of this file, time is not a good naming for python
because if we do import time when we want to use native python time package, it does
not import python time package.
"""
# TODO: add doctest


class Time(object):
12 changes: 9 additions & 3 deletions python/pyspark/streaming/utils.py
Original file line number Diff line number Diff line change
@@ -19,6 +19,9 @@


class RDDFunction():
"""
This class is for py4j callback. This
"""
def __init__(self, ctx, jrdd_deserializer, func):
self.ctx = ctx
self.deserializer = jrdd_deserializer
@@ -38,6 +41,7 @@ class Java:


def msDurationToString(ms):
#TODO: add doctest
"""
Returns a human-readable string representing a duration such as "35ms"
"""
@@ -54,8 +58,10 @@ def msDurationToString(ms):
else:
return "%.2f h" % (float(ms) / hour)


def rddToFileName(prefix, suffix, time):
if suffix is not None:
return prefix + "-" + str(time) + "." + suffix
else:
#TODO: add doctest
if suffix is None:
return prefix + "-" + str(time)
else:
return prefix + "-" + str(time) + "." + suffix
Original file line number Diff line number Diff line change
@@ -18,10 +18,8 @@
package org.apache.spark.streaming.api.python

import java.io._
import java.io.{ObjectInputStream, IOException}
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap}

import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import scala.collection.JavaConversions._

@@ -55,7 +53,9 @@ class PythonDStream[T: ClassTag](
override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
parent.getOrCompute(validTime) match{
case Some(rdd) =>
val pythonRDD = new PythonRDD(rdd, command, envVars, pythonIncludes, preservePartitoning, pythonExec, broadcastVars, accumulator)
// create PythonRDD to compute Python functions.
val pythonRDD = new PythonRDD(rdd, command, envVars, pythonIncludes,
preservePartitoning, pythonExec, broadcastVars, accumulator)
Some(pythonRDD.asJavaRDD.rdd)
case None => None
}
@@ -135,8 +135,8 @@ DStream[Array[Byte]](prev.ssc){
case Some(rdd)=>Some(rdd)
val pairwiseRDD = new PairwiseRDD(rdd)
/*
* Since python operation is executed by Scala after StreamingContext.start.
* What PythonPairwiseDStream does is equivalent to python code in pySpark.
* Since python function is executed by Scala after StreamingContext.start.
* What PythonPairwiseDStream does is equivalent to python code in pyspark.
*
* with _JavaStackTrace(self.context) as st:
* pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD()
@@ -154,23 +154,6 @@ DStream[Array[Byte]](prev.ssc){
}


class PythonTestInputStream3(ssc_ : JavaStreamingContext)
extends InputDStream[Any](JavaStreamingContext.toStreamingContext(ssc_)) {

def start() {}

def stop() {}

def compute(validTime: Time): Option[RDD[Any]] = {
val index = ((validTime - zeroTime) / slideDuration - 1).toInt
val selectedInput = ArrayBuffer(1, 2, 3).toSeq
val rdd :RDD[Any] = ssc.sc.makeRDD(selectedInput, 2)
Some(rdd)
}

val asJavaDStream = JavaDStream.fromDStream(this)
}

class PythonForeachDStream(
prev: DStream[Array[Byte]],
foreachFunction: PythonRDDFunction
@@ -184,30 +167,11 @@ class PythonForeachDStream(
this.register()
}

class PythonTransformedDStream(
prev: DStream[Array[Byte]],
transformFunction: PythonRDDFunction
) extends DStream[Array[Byte]](prev.ssc) {

override def dependencies = List(prev)

override def slideDuration: Duration = prev.slideDuration

override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
prev.getOrCompute(validTime).map(rdd => {
transformFunction.call(rdd.toJavaRDD(), validTime.milliseconds).rdd
})
}

val asJavaDStream = JavaDStream.fromDStream(this)
//val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream.fromJavaDStream(this)
}

/**
* This is a input stream just for the unitest. This is equivalent to a checkpointable,
* replayable, reliable message queue like Kafka. It requires a sequence as input, and
* returns the i_th element at the i_th batch under manual clock.
* This implementation is inspired by QueStream
*/

class PythonTestInputStream(ssc_ : JavaStreamingContext, inputRDDs: JArrayList[JavaRDD[Array[Byte]]])
Original file line number Diff line number Diff line change
@@ -3,6 +3,10 @@
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.streaming.Time;

/*
* Interface for py4j callback function.
* This function is called by pyspark.streaming.dstream.DStream.foreachRDD .
*/
public interface PythonRDDFunction {
JavaRDD<byte[]> call(JavaRDD<byte[]> rdd, long time);
}

This file was deleted.

0 comments on commit 3166d31

Please sign in to comment.