diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 5dcc9ba35a653..a4900191d1730 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -22,15 +22,15 @@ from pyspark.storagelevel import * from pyspark.rdd import RDD from pyspark.context import SparkContext +from pyspark.streaming.dstream import DStream from py4j.java_collections import ListConverter -from pyspark.streaming.dstream import DStream class StreamingContext(object): """ Main entry point for Spark Streaming functionality. A StreamingContext represents the - connection to a Spark cluster, and can be used to create L{RDD}s and + connection to a Spark cluster, and can be used to create L{DStream}s and broadcast variables on that cluster. """ @@ -71,13 +71,16 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, def _initialize_context(self, jspark_context, jduration): return self._jvm.JavaStreamingContext(jspark_context, jduration) - def actorStream(self, props, name, storageLevel, supervisorStrategy): - raise NotImplementedError - - def addStreamingListener(self, streamingListener): - raise NotImplementedError + def start(self): + """ + Start the execution of the streams. + """ + self._jssc.start() def awaitTermination(self, timeout=None): + """ + Wait for the execution to stop. + """ if timeout: self._jssc.awaitTermination(timeout) else: @@ -85,20 +88,18 @@ def awaitTermination(self, timeout=None): # start from simple one. storageLevel is not passed for now. def socketTextStream(self, hostname, port): + """ + Create an input from TCP source hostname:port. Data is received using + a TCP socket and receive byte is interpreted as UTF8 encoded '\n' delimited + lines. + """ return DStream(self._jssc.socketTextStream(hostname, port), self, UTF8Deserializer()) - def start(self): - self._jssc.start() - - def stop(self, stopSparkContext=True): - raise NotImplementedError - def textFileStream(self, directory): + """ + Create an input stream that monitors a Hadoop-compatible file system + for new files and reads them as text files. Files must be wrriten to the + monitored directory by "moving" them from another location within the same + file system. FIle names starting with . are ignored. + """ return DStream(self._jssc.textFileStream(directory), self, UTF8Deserializer()) - - def transform(self, seq): - raise NotImplementedError - - def union(self, seq): - raise NotImplementedError - diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index e3ad323e06015..a640df7394bcf 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -2,8 +2,6 @@ from itertools import chain, ifilter, imap import operator -import logging - from pyspark.serializers import NoOpSerializer,\ BatchedSerializer, CloudPickleSerializer, pack_long from pyspark.rdd import _JavaStackTrace @@ -25,64 +23,86 @@ def count(self): """ #TODO make sure count implementation, thiis different from what pyspark does - return self.mapPartitions(lambda i: [sum(1 for _ in i)]).map(lambda x: (None, 1)) + return self._mapPartitions(lambda i: [sum(1 for _ in i)]).map(lambda x: (None, 1)) def _sum(self): """ """ - return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) + return self._mapPartitions(lambda x: [sum(x)]).reduce(operator.add) def print_(self): """ + Since print is reserved name for python, we cannot make a print method function. + This function prints serialized data in RDD in DStream because Scala and Java cannot + deserialized pickled python object. Please use DStream.pyprint() instead to print result. + + Call DStream.print(). """ - # print is a reserved name of Python. We cannot give print to function name + #hack to call print function in DStream getattr(self._jdstream, "print")() def pyprint(self): """ + Print the first ten elements of each RDD generated in this DStream. This is an output + operator, so this DStream will be registered as an output stream and there materialized. + """ self._jdstream.pyprint() def filter(self, f): """ + Return DStream containing only the elements that satisfy predicate. """ def func(iterator): return ifilter(f, iterator) - return self.mapPartitions(func) + return self._mapPartitions(func) def flatMap(self, f, preservesPartitioning=False): """ + Pass each value in the key-value pair DStream through flatMap function + without changing the keys: this also retains the original RDD's partition. """ def func(s, iterator): return chain.from_iterable(imap(f, iterator)) - return self.mapPartitionsWithIndex(func, preservesPartitioning) + return self._mapPartitionsWithIndex(func, preservesPartitioning) - def map(self, f, preservesPartitioning=False): + def map(self, f): """ + Return DStream by applying a function to each element of DStream. """ def func(iterator): return imap(f, iterator) - return self.mapPartitions(func) - #return PipelinedDStream(self, func, preservesPartitioning) + return self._mapPartitions(func) - def mapPartitions(self, f): + def _mapPartitions(self, f): """ + Return a new DStream by applying a function to each partition of this DStream. """ def func(s, iterator): return f(iterator) - return self.mapPartitionsWithIndex(func) + return self._mapPartitionsWithIndex(func) - def mapPartitionsWithIndex(self, f, preservesPartitioning=False): + def _mapPartitionsWithIndex(self, f, preservesPartitioning=False): """ - + Return a new DStream by applying a function to each partition of this DStream, + While tracking the index of the original partition. """ return PipelinedDStream(self, f, preservesPartitioning) - def reduce(self, func, numPartitions=None): + + def reduceByKey(self, func, numPartitions=None): """ + Merge the value for each key using an associative reduce function. + + This will also perform the merging locally on each mapper before + sending resuls to reducer, similarly to a "combiner" in MapReduce. + Output will be hash-partitioned with C{numPartitions} partitions, or + the default parallelism level if C{numPartitions} is not specified. """ return self.combineByKey(lambda x:x, func, func, numPartitions) def combineByKey(self, createCombiner, mergeValue, mergeCombiners, numPartitions = None): """ + Count the number of elements for each key, and return the result to the + master as a dictionary """ if numPartitions is None: numPartitions = self._defaultReducePartitions() @@ -148,30 +168,27 @@ def add_shuffle_key(split, iterator): dstream._partitionFunc = partitionFunc return dstream - def mapPartitionsWithIndex(self, f, preservesPartitioning=False): - """ - - """ - return PipelinedDStream(self, f, preservesPartitioning) - def _defaultReducePartitions(self): """ + Returns the default number of partitions to use during reduce tasks (e.g., groupBy). + If spark.default.parallelism is set, then we'll use the value from SparkContext + defaultParallelism, otherwise we'll use the number of partitions in this RDD. + This mirrors the behavior of the Scala Partitioner#defaultPartitioner, intended to reduce + the likelihood of OOMs. Once PySpark adopts Partitioner-based APIs, this behavior will + be inherent. """ - # hard code to avoid the error if self.ctx._conf.contains("spark.default.parallelism"): return self.ctx.defaultParallelism else: return self.getNumPartitions() def getNumPartitions(self): - """ - Returns the number of partitions in RDD - >>> rdd = sc.parallelize([1, 2, 3, 4], 2) - >>> rdd.getNumPartitions() - 2 - """ - return self._jdstream.partitions().size() + """ + Return the number of partitions in RDD + """ + # TODO: remove hardcoding. RDD has NumPartitions but DStream does not have. + return 2 class PipelinedDStream(DStream): diff --git a/python/pyspark/streaming/duration.py b/python/pyspark/streaming/duration.py index 06a169e5215ac..a7f1036e4b856 100644 --- a/python/pyspark/streaming/duration.py +++ b/python/pyspark/streaming/duration.py @@ -17,6 +17,7 @@ from pyspark.streaming import utils + class Duration(object): """ Duration for Spark Streaming application. Used to set duration diff --git a/python/pyspark/streaming/pyprint.py b/python/pyspark/streaming/pyprint.py index 1aeb8e50375ed..49517b3e5c247 100644 --- a/python/pyspark/streaming/pyprint.py +++ b/python/pyspark/streaming/pyprint.py @@ -21,16 +21,22 @@ from pyspark.serializers import PickleSerializer + def collect(binary_file_path): + """ + Read pickled file written by SparkStreaming + """ dse = PickleSerializer() with open(binary_file_path, 'rb') as tempFile: for item in dse.load_stream(tempFile): yield item + + def main(): try: binary_file_path = sys.argv[1] except: - print "Missed FilePath in argement" + print "Missed FilePath in argements" if not binary_file_path: return @@ -43,5 +49,6 @@ def main(): print "..." break + if __name__ =="__main__": exit(main()) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index cfa336df8674f..a2b9d581f609c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -59,7 +59,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * operator, so this PythonDStream will be registered as an output stream and there materialized. * This function is for PythonAPI. */ - + //TODO move this function to PythonDStream def pyprint() = dstream.pyprint() /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index d305797bb4a0f..e2602117f3f86 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -71,7 +71,9 @@ DStream[Array[Byte]](prev.ssc){ case Some(rdd)=>Some(rdd) val pairwiseRDD = new PairwiseRDD(rdd) /* - * This is equivalent to following python code + * Since python operation is executed by Scala after StreamingContext.start. + * What PairwiseDStream does is equivalent to following python code in pySpark. + * * with _JavaStackTrace(self.context) as st: * pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD() * partitioner = self.ctx._jvm.PythonPartitioner(numPartitions, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 67977244ef420..fc7a2055025c1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -623,37 +623,36 @@ abstract class DStream[T: ClassTag] ( new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register() } -//TODO move pyprint to PythonDStream +//TODO move pyprint to PythonDStream and executed by py4j call back function /** * Print the first ten elements of each PythonRDD generated in this PythonDStream. This is an output * operator, so this PythonDStream will be registered as an output stream and there materialized. * Since serialized Python object is readable by Python, pyprint writes out binary data to * temporary file and run python script to deserialized and print the first ten elements + * + * Currently call python script directly. We should avoid this */ private[streaming] def pyprint() { def foreachFunc = (rdd: RDD[T], time: Time) => { val iter = rdd.take(11).iterator - // make a temporary file + // Generate a temporary file val prefix = "spark" val suffix = ".tmp" val tempFile = File.createTempFile(prefix, suffix) val tempFileStream = new DataOutputStream(new FileOutputStream(tempFile.getAbsolutePath)) - //write out serialized python object + // Write out serialized python object to temporary file PythonRDD.writeIteratorToStream(iter, tempFileStream) tempFileStream.close() - // This value has to be passed from python - // Python currently does not do cluster deployment. But what happened + // pythonExec should be passed from python. Move pyprint to PythonDStream val pythonExec = new ProcessBuilder().environment().get("PYSPARK_PYTHON") val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME") - //val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/streaming/pyprint.py", tempFile.getAbsolutePath())) // why this fails to compile??? - //absolute path to the python script is needed to change because we do not use pysparkstreaming + // Call python script to deserialize and print result in stdout val pb = new ProcessBuilder(pythonExec, sparkHome + "/python/pyspark/streaming/pyprint.py", tempFile.getAbsolutePath) val workerEnv = pb.environment() - //envVars also need to be pass - //workerEnv.putAll(envVars) + // envVars also should be pass from python val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH") workerEnv.put("PYTHONPATH", pythonPath) val worker = pb.start() @@ -665,7 +664,7 @@ abstract class DStream[T: ClassTag] ( println ("Time: " + time) println ("-------------------------------------------") - //print value from python std out + // Print values which is from python std out var line = "" breakable { while (true) { @@ -674,7 +673,7 @@ abstract class DStream[T: ClassTag] ( println(line) } } - //delete temporary file + // Delete temporary file tempFile.delete() println()