From 4e497db8f3826cf5142b2165a08d02c6f3c2cd90 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 13 Jan 2014 23:23:46 -0800 Subject: [PATCH 1/2] Removed StreamingContext.registerInputStream and registerOutputStream - they were useless as InputDStream has been made to register itself. Also made DStream.register() private[streaming] - not useful to expose the confusing function. Updated a lot of documentation. --- docs/streaming-programming-guide.md | 32 ++++++++------- .../streaming/examples/NetworkWordCount.scala | 3 +- .../spark/streaming/flume/FlumeUtils.scala | 1 - .../streaming/flume/FlumeStreamSuite.scala | 2 +- .../spark/streaming/kafka/KafkaUtils.scala | 4 +- .../spark/streaming/mqtt/MQTTUtils.scala | 4 +- .../streaming/twitter/TwitterUtils.scala | 4 +- .../spark/streaming/StreamingContext.scala | 40 +++---------------- .../api/java/JavaStreamingContext.scala | 7 ---- .../spark/streaming/dstream/DStream.scala | 25 +++++++----- .../streaming/dstream/InputDStream.scala | 24 ++++++----- .../dstream/NetworkInputDStream.scala | 9 +++-- .../dstream/PairDStreamFunctions.scala | 5 +++ .../org/apache/spark/streaming/package.scala | 38 ++++++++++++++++++ .../streaming/util/MasterFailureTest.scala | 2 +- .../spark/streaming/JavaTestUtils.scala | 3 +- .../streaming/BasicOperationsSuite.scala | 1 - .../spark/streaming/CheckpointSuite.scala | 2 +- .../spark/streaming/InputStreamsSuite.scala | 8 ++-- .../streaming/StreamingContextSuite.scala | 1 - .../spark/streaming/TestSuiteBase.scala | 7 +--- 21 files changed, 115 insertions(+), 107 deletions(-) create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/package.scala diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 4e8a680a75d07..1495af2267b38 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -21,6 +21,8 @@ Add the following SBT or Maven dependency to your project to use Spark Streaming artifactId = spark-streaming_{{site.SCALA_VERSION}} version = {{site.SPARK_VERSION}} +For ingesting data from sources like Kafka and Flume, add the corresponding artifact `spark-streaming-xyz_{{site.SCALA_VERSION}}` to the dependencies. For example, `spark-streaming-kafka_{{site.SCALA_VERSION}}` for Kafka, `spark-streaming-flume_{{site.SCALA_VERSION}}`, etc. Please refer to the [Apache repository](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.spark%22%20AND%20v%3A%22{{site.SPARK_VERSION}}%22) for the full list of supported sources / artifacts. + # Initializing Spark Streaming The first thing a Spark Streaming program must do is create a `StreamingContext` object, which tells Spark how to access a cluster. A `StreamingContext` can be created by using @@ -28,26 +30,28 @@ The first thing a Spark Streaming program must do is create a `StreamingContext` new StreamingContext(master, appName, batchDuration, [sparkHome], [jars]) {% endhighlight %} -The `master` parameter is a standard [Spark cluster URL](scala-programming-guide.html#master-urls) and can be "local" for local testing. The `appName` is a name of your program, which will be shown on your cluster's web UI. The `batchDuration` is the size of the batches (as explained earlier). This must be set carefully such that the cluster can keep up with the processing of the data streams. Start with something conservative like 5 seconds. See the [Performance Tuning](#setting-the-right-batch-size) section for a detailed discussion. Finally, `sparkHome` and `jars` are necessary when running on a cluster to specify the location of your code, as described in the [Spark programming guide](scala-programming-guide.html#deploying-code-on-a-cluster). +The `master` parameter is a standard [Spark cluster URL](scala-programming-guide.html#master-urls) and can be "local" for local testing. The `appName` is a name of your program, which will be shown on your cluster's web UI. The `batchDuration` is the size of the batches (as explained earlier). This must be set carefully such that the cluster can keep up with the processing of the data streams. Start with something conservative like 5 seconds. See the [Performance Tuning](#setting-the-right-batch-size) section for a detailed discussion. Finally, `sparkHome` and `jars` are optional parameters, which need to be set when running on a cluster to specify the location of your code, as described in the [Spark programming guide](scala-programming-guide.html#deploying-code-on-a-cluster). -This constructor creates a SparkContext for your job as well, which can be accessed with `streamingContext.sparkContext`. +{% highlight scala %} +new SparkConf(conf, batchDuration) +{% endhighlight %} +where `conf` is a [SparkConf](api/core/index.html#org.apache.spark.SparkConf) +object used for more advanced configuration. In both cases, a [SparkContext](api/core/index.html#org.apache.spark.SparkContext) is created as well which can be accessed with `streamingContext.sparkContext`. -# Attaching Input Sources - InputDStreams -The StreamingContext is used to creating InputDStreams from input sources: +# Attaching Input Sources +The StreamingContext is used to creating input streams from data sources: {% highlight scala %} // Assuming ssc is the StreamingContext -ssc.textFileStream(directory) // Creates a stream by monitoring and processing new files in a HDFS directory -ssc.socketStream(hostname, port) // Creates a stream that uses a TCP socket to read data from hostname:port +ssc.textFileStream(directory) // Creates a stream that monitors and processes new files in a HDFS directory +ssc.socketStream(hostname, port) // Creates a stream that uses a TCP socket to read data from hostname:port {% endhighlight %} -We also provide a input streams for Kafka, Flume, Akka actor, etc. For a complete list of input streams, take a look at the [StreamingContext API documentation](api/streaming/index.html#org.apache.spark.streaming.StreamingContext). - - +The core Spark Streaming API provides input streams for files, sockets, Akka actors. Additional functionality for Kafka, Flume, ZeroMQ, Twitter, etc. can be imported by adding the right dependencies as explained in the [linking](#linking-with-spark-streaming) section. # DStream Operations -Data received from the input streams can be processed using _DStream operations_. There are two kinds of operations - _transformations_ and _output operations_. Similar to RDD transformations, DStream transformations operate on one or more DStreams to create new DStreams with transformed data. After applying a sequence of transformations to the input streams, you'll need to call the output operations, which writies data out to an external source. +Data received from the input streams can be processed using _DStream operations_. There are two kinds of operations - _transformations_ and _output operations_. Similar to RDD transformations, DStream transformations operate on one or more DStreams to create new DStreams with transformed data. After applying a sequence of transformations to the input streams, output operations need to called, which writes data out to an external data sink like a file system or a database. ## Transformations @@ -234,7 +238,7 @@ wordCounts.print() ssc.start() {% endhighlight %} -The `socketTextStream` returns a DStream of lines received from a TCP socket-based source. The `lines` DStream is _transformed_ into a DStream using the `flatMap` operation, where each line is split into words. This `words` DStream is then mapped to a DStream of `(word, 1)` pairs, which is finally reduced to get the word counts. `wordCounts.print()` will print 10 of the counts generated every second. +The `socketTextStream` returns a DStream of text data received from a TCP server socket. The `lines` DStream is _transformed_ into a DStream using the `flatMap` operation, where each line is split into words. This `words` DStream is then mapped to a DStream of `(word, 1)` pairs, which is finally reduced to get the word counts. `wordCounts.print()` will print 10 of the counts generated every second. To run this example on your local machine, you need to first run a Netcat server by using @@ -270,14 +274,12 @@ hello world {% highlight bash %} # TERMINAL 2: RUNNING NetworkWordCount ... -2012-12-31 18:47:10,446 INFO SparkContext: Job finished: run at ThreadPoolExecutor.java:886, took 0.038817 s ------------------------------------------- Time: 1357008430000 ms ------------------------------------------- (hello,1) (world,1) -2012-12-31 18:47:10,447 INFO JobManager: Total delay: 0.44700 s for job 8 (execution: 0.44000 s) ... {% endhighlight %} @@ -384,7 +386,7 @@ A system that is required to operate 24/7 needs to be able tolerate the failure 1. The configuration of each DStream (checkpoint interval, etc.) 1. The RDD checkpoint files of each DStream -All this is periodically saved in the file `/graph`. To recover, a new Streaming Context can be created with this directory by using +All this is periodically saved in the checkpoint directory. To recover, a new `StreamingContext` can be created with this directory by using {% highlight scala %} val ssc = new StreamingContext(checkpointDirectory) @@ -395,7 +397,7 @@ On calling `ssc.start()` on this new context, the following steps are taken by t 1. Schedule the transformations and output operations for all the time steps between the time when the driver failed and when it last checkpointed. This is also done for those time steps that were previously scheduled but not processed due to the failure. This will make the system recompute all the intermediate data from the checkpointed RDD files, etc. 1. Restart the network receivers, if any, and continue receiving new data. -In the current _alpha_ release, there are two different failure behaviors based on which input sources are used. +There are two different failure behaviors based on which input sources are used. 1. _Using HDFS files as input source_ - Since the data is reliably stored on HDFS, all data can re-computed and therefore no data will be lost due to any failure. 1. _Using any input source that receives data through a network_ - The received input data is replicated in memory to multiple nodes. Since, all the data in the Spark worker's memory is lost when the Spark driver fails, the past input data will not be accessible and driver recovers. Hence, if stateful and window-based operations are used (like `updateStateByKey`, `window`, `countByValueAndWindow`, etc.), then the intermediate state will not be recovered completely. diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala index 25f7013307fef..02264757123db 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala @@ -19,6 +19,7 @@ package org.apache.spark.streaming.examples import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.storage.StorageLevel /** * Counts words in text encoded with UTF8 received from the network every second. @@ -48,7 +49,7 @@ object NetworkWordCount { // Create a NetworkInputDStream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') - val lines = ssc.socketTextStream(args(1), args(2).toInt) + val lines = ssc.socketTextStream(args(1), args(2).toInt, StorageLevel.MEMORY_ONLY_SER) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala index d53b66dd46771..654ba451e72fb 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala @@ -37,7 +37,6 @@ object FlumeUtils { storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): DStream[SparkFlumeEvent] = { val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel) - ssc.registerInputStream(inputStream) inputStream } diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala index 2e8e9fac45553..8bc43972ab6a0 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala @@ -43,7 +43,7 @@ class FlumeStreamSuite extends TestSuiteBase { val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]] with SynchronizedBuffer[Seq[SparkFlumeEvent]] val outputStream = new TestOutputStream(flumeStream, outputBuffer) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc.start() val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index 37c03be4e77ad..15a2daa008ad4 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -71,9 +71,7 @@ object KafkaUtils { topics: Map[String, Int], storageLevel: StorageLevel ): DStream[(K, V)] = { - val inputStream = new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel) - ssc.registerInputStream(inputStream) - inputStream + new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel) } /** diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala index 3636e46bb8257..1b09ee5dc8f65 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala @@ -37,9 +37,7 @@ object MQTTUtils { topic: String, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): DStream[String] = { - val inputStream = new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel) - ssc.registerInputStream(inputStream) - inputStream + new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel) } /** diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala index b8bae7b6d3385..e8433b7e9f6fa 100644 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala @@ -41,9 +41,7 @@ object TwitterUtils { filters: Seq[String] = Nil, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): DStream[Status] = { - val inputStream = new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel) - ssc.registerInputStream(inputStream) - inputStream + new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel) } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 7b279334034ab..26257e652e537 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -199,10 +199,7 @@ class StreamingContext private[streaming] ( */ def networkStream[T: ClassTag]( receiver: NetworkReceiver[T]): DStream[T] = { - val inputStream = new PluggableInputDStream[T](this, - receiver) - graph.addInputStream(inputStream) - inputStream + new PluggableInputDStream[T](this, receiver) } /** @@ -259,9 +256,7 @@ class StreamingContext private[streaming] ( converter: (InputStream) => Iterator[T], storageLevel: StorageLevel ): DStream[T] = { - val inputStream = new SocketInputDStream[T](this, hostname, port, converter, storageLevel) - registerInputStream(inputStream) - inputStream + new SocketInputDStream[T](this, hostname, port, converter, storageLevel) } /** @@ -280,9 +275,7 @@ class StreamingContext private[streaming] ( port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): DStream[T] = { - val inputStream = new RawInputDStream[T](this, hostname, port, storageLevel) - registerInputStream(inputStream) - inputStream + new RawInputDStream[T](this, hostname, port, storageLevel) } /** @@ -300,9 +293,7 @@ class StreamingContext private[streaming] ( V: ClassTag, F <: NewInputFormat[K, V]: ClassTag ] (directory: String): DStream[(K, V)] = { - val inputStream = new FileInputDStream[K, V, F](this, directory) - registerInputStream(inputStream) - inputStream + new FileInputDStream[K, V, F](this, directory) } /** @@ -322,9 +313,7 @@ class StreamingContext private[streaming] ( V: ClassTag, F <: NewInputFormat[K, V]: ClassTag ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): DStream[(K, V)] = { - val inputStream = new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly) - registerInputStream(inputStream) - inputStream + new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly) } /** @@ -367,9 +356,7 @@ class StreamingContext private[streaming] ( oneAtATime: Boolean, defaultRDD: RDD[T] ): DStream[T] = { - val inputStream = new QueueInputDStream(this, queue, oneAtATime, defaultRDD) - registerInputStream(inputStream) - inputStream + new QueueInputDStream(this, queue, oneAtATime, defaultRDD) } /** @@ -390,21 +377,6 @@ class StreamingContext private[streaming] ( new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc)) } - /** - * Register an input stream that will be started (InputDStream.start() called) to get the - * input data. - */ - def registerInputStream(inputStream: InputDStream[_]) { - graph.addInputStream(inputStream) - } - - /** - * Register an output stream that will be computed every interval - */ - def registerOutputStream(outputStream: DStream[_]) { - graph.addOutputStream(outputStream) - } - /** Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for * receiving system events related to streaming. */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 108950466ac12..4edf8fa13a205 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -319,13 +319,6 @@ class JavaStreamingContext(val ssc: StreamingContext) { ssc.actorStream[T](props, name) } - /** - * Registers an output stream that will be computed every interval - */ - def registerOutputStream(outputStream: JavaDStreamLike[_, _, _]) { - ssc.registerOutputStream(outputStream.dstream) - } - /** * Creates a input stream from an queue of RDDs. In each batch, * it will process either one or all of the RDDs returned by the queue. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 9dfcc08abea95..299628ce9fa21 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -36,11 +36,12 @@ import org.apache.spark.streaming.Duration /** * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous * sequence of RDDs (of the same type) representing a continuous stream of data (see - * [[org.apache.spark.rdd.RDD]] for more details on RDDs). DStreams can either be created from - * live data (such as, data from * HDFS, Kafka or Flume) or it can be generated by transformation - * existing DStreams using operations such as `map`, `window` and `reduceByKeyAndWindow`. - * While a Spark Streaming program is running, each DStream periodically generates a RDD, - * either from live data or by transforming the RDD generated by a parent DStream. + * org.apache.spark.rdd.RDD in the Spark core documentation for more details on RDDs). + * DStreams can either be created from live data (such as, data from Kafka, Flume, sockets, HDFS) + * or it can be generated by transforming existing DStreams using operations such as `map`, + * `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each DStream + * periodically generates a RDD, either from live data or by transforming the RDD generated by a + * parent DStream. * * This class contains the basic operations available on all DStreams, such as `map`, `filter` and * `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains @@ -53,6 +54,8 @@ import org.apache.spark.streaming.Duration * - A list of other DStreams that the DStream depends on * - A time interval at which the DStream generates an RDD * - A function that is used to generate an RDD after each time interval + * + * There are two types of DStream operations - __transformations__ */ abstract class DStream[T: ClassTag] ( @@ -519,7 +522,7 @@ abstract class DStream[T: ClassTag] ( * 'this' DStream will be registered as an output stream and therefore materialized. */ def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) { - ssc.registerOutputStream(new ForEachDStream(this, context.sparkContext.clean(foreachFunc))) + new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register() } /** @@ -586,8 +589,7 @@ abstract class DStream[T: ClassTag] ( if (first11.size > 10) println("...") println() } - val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc)) - ssc.registerOutputStream(newStream) + new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register() } /** @@ -760,7 +762,10 @@ abstract class DStream[T: ClassTag] ( this.foreachRDD(saveFunc) } - def register() { - ssc.registerOutputStream(this) + /** + * Register this DStream as an output DStream. + */ + private[streaming] def register() { + ssc.graph.addOutputStream(this) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala index a1075ad304ef6..27303390d9e03 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala @@ -22,20 +22,24 @@ import org.apache.spark.streaming.{Time, Duration, StreamingContext} import scala.reflect.ClassTag /** - * This is the abstract base class for all input streams. This class provides to methods - * start() and stop() which called by the scheduler to start and stop receiving data/ - * Input streams that can generated RDDs from new data just by running a service on - * the driver node (that is, without running a receiver onworker nodes) can be - * implemented by directly subclassing this InputDStream. For example, - * FileInputDStream, a subclass of InputDStream, monitors a HDFS directory for - * new files and generates RDDs on the new files. For implementing input streams - * that requires running a receiver on the worker nodes, use NetworkInputDStream - * as the parent class. + * This is the abstract base class for all input streams. This class provides methods + * start() and stop() which is called by Spark Streaming system to start and stop receiving data. + * Input streams that can generate RDDs from new data by running a service/thread only on + * the driver node (that is, without running a receiver on worker nodes), can be + * implemented by directly inheriting this InputDStream. For example, + * FileInputDStream, a subclass of InputDStream, monitors a HDFS directory from the driver for + * new files and generates RDDs with the new files. For implementing input streams + * that requires running a receiver on the worker nodes, use + * [[org.apache.spark.streaming.dstream.NetworkInputDStream]] as the parent class. + * + * @param ssc_ Streaming context that will execute this input stream */ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext) extends DStream[T](ssc_) { - var lastValidTime: Time = null + private[streaming] var lastValidTime: Time = null + + ssc.graph.addInputStream(this) /** * Checks whether the 'time' is valid wrt slideDuration for generating RDD. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala index 0f1f6fc2cec4a..ce153f065d35a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala @@ -36,11 +36,12 @@ import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId} import org.apache.spark.streaming.scheduler.{DeregisterReceiver, AddBlocks, RegisterReceiver} /** - * Abstract class for defining any InputDStream that has to start a receiver on worker - * nodes to receive external data. Specific implementations of NetworkInputDStream must + * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]] + * that has to start a receiver on worker nodes to receive external data. + * Specific implementations of NetworkInputDStream must * define the getReceiver() function that gets the receiver object of type - * [[org.apache.spark.streaming.dstream.NetworkReceiver]] that will be sent to the workers to receive - * data. + * [[org.apache.spark.streaming.dstream.NetworkReceiver]] that will be sent + * to the workers to receive data. * @param ssc_ Streaming context that will execute this input stream * @tparam T Class type of the object of this stream */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index 6b3e48382e0c4..f57762321c40e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -35,6 +35,11 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.conf.Configuration import org.apache.spark.streaming.{Time, Duration} +/** + * Extra functions available on DStream of (key, value) pairs through an implicit conversion. + * Import `org.apache.spark.streaming.StreamingContext._` at the top of your program to use + * these functions. + */ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) extends Serializable { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/package.scala b/streaming/src/main/scala/org/apache/spark/streaming/package.scala new file mode 100644 index 0000000000000..4dd985cf5a178 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/package.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +/** + * Spark Streaming functionality. [[org.apache.spark.streaming.StreamingContext]] serves as the main + * entry point to Spark Streaming, while [[org.apache.spark.streaming.dstream.DStream]] is the data + * type representing a continuous sequence of RDDs, representing a continuous stream of data. + * + * In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains operations + * available only on DStreams + * of key-value pairs, such as `groupByKey` and `reduceByKey`. These operations are automatically + * available on any DStream of the right type (e.g. DStream[(Int, Int)] through implicit + * conversions when you `import org.apache.spark.streaming.StreamingContext._`. + * + * For the Java API of Spark Streaming, take a look at the + * [[org.apache.spark.streaming.api.java.JavaStreamingContext]] which serves as the entry point, and + * the [[org.apache.spark.streaming.api.java.JavaDStream]] and the + * [[org.apache.spark.streaming.api.java.JavaPairDStream]] which have the DStream functionality. + */ +package object streaming { + // For package docs only +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala index be67af3a6466a..54813934b8c95 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala @@ -191,7 +191,7 @@ object MasterFailureTest extends Logging { val inputStream = ssc.textFileStream(testDir.toString) val operatedStream = operation(inputStream) val outputStream = new TestOutputStream(operatedStream) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc } diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala index 42ab9590d6df5..33f6df8f88177 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala @@ -43,7 +43,6 @@ trait JavaTestBase extends TestSuiteBase { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val dstream = new TestInputStream[T](ssc.ssc, seqData, numPartitions) - ssc.ssc.registerInputStream(dstream) new JavaDStream[T](dstream) } @@ -57,7 +56,7 @@ trait JavaTestBase extends TestSuiteBase { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val ostream = new TestOutputStreamWithPartitions(dstream.dstream) - dstream.dstream.ssc.registerOutputStream(ostream) + ostream.register() } /** diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 7037aae234208..cb53555f5c364 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -379,7 +379,6 @@ class BasicOperationsSuite extends TestSuiteBase { val ssc = new StreamingContext(conf, Seconds(1)) val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4)) val stream = new TestInputStream[Int](ssc, input, 2) - ssc.registerInputStream(stream) stream.foreachRDD(_ => {}) // Dummy output stream ssc.start() Thread.sleep(2000) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 0c68c44ddb6da..89daf4758661b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -237,7 +237,7 @@ class CheckpointSuite extends TestSuiteBase { val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1)) val outputBuffer = new ArrayBuffer[Seq[Int]] var outputStream = new TestOutputStream(reducedStream, outputBuffer) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc.start() // Create files and advance manual clock to process them diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index a8e053278a2b2..95bf40ba75956 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -54,7 +54,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] val outputStream = new TestOutputStream(networkStream, outputBuffer) def output = outputBuffer.flatMap(x => x) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc.start() // Feed data to the server to send to the network receiver @@ -103,7 +103,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] def output = outputBuffer.flatMap(x => x) val outputStream = new TestOutputStream(fileStream, outputBuffer) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc.start() // Create files in the temporary directory so that Spark Streaming can read data from it @@ -156,7 +156,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] val outputStream = new TestOutputStream(networkStream, outputBuffer) def output = outputBuffer.flatMap(x => x) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc.start() // Feed data to the server to send to the network receiver @@ -209,7 +209,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val outputBuffer = new ArrayBuffer[Seq[Long]] with SynchronizedBuffer[Seq[Long]] val outputStream = new TestOutputStream(countStream, outputBuffer) def output = outputBuffer.flatMap(x => x) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc.start() // Let the data from the receiver be received diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index f7f3346f81db5..717da8e00462b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -211,7 +211,6 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { def addInputStream(s: StreamingContext): DStream[Int] = { val input = (1 to 100).map(i => (1 to i)) val inputStream = new TestInputStream(s, input, 1) - s.registerInputStream(inputStream) inputStream } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 535e5bd1f1f2e..201630672ab4c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -181,8 +181,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { val operatedStream = operation(inputStream) val outputStream = new TestOutputStreamWithPartitions(operatedStream, new ArrayBuffer[Seq[Seq[V]]] with SynchronizedBuffer[Seq[Seq[V]]]) - ssc.registerInputStream(inputStream) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc } @@ -207,9 +206,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { val operatedStream = operation(inputStream1, inputStream2) val outputStream = new TestOutputStreamWithPartitions(operatedStream, new ArrayBuffer[Seq[Seq[W]]] with SynchronizedBuffer[Seq[Seq[W]]]) - ssc.registerInputStream(inputStream1) - ssc.registerInputStream(inputStream2) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc } From f8bd828c7ccf1ff69bc35bf95d07183cb35a7c72 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 14 Jan 2014 00:03:46 -0800 Subject: [PATCH 2/2] Fixed loose ends in docs. --- docs/streaming-programming-guide.md | 4 ++-- .../scala/org/apache/spark/streaming/dstream/DStream.scala | 2 -- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 1495af2267b38..07c4c55633929 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -48,10 +48,10 @@ ssc.textFileStream(directory) // Creates a stream that monitors and processes ssc.socketStream(hostname, port) // Creates a stream that uses a TCP socket to read data from hostname:port {% endhighlight %} -The core Spark Streaming API provides input streams for files, sockets, Akka actors. Additional functionality for Kafka, Flume, ZeroMQ, Twitter, etc. can be imported by adding the right dependencies as explained in the [linking](#linking-with-spark-streaming) section. +The core Spark Streaming API provides input streams for files, sockets, and Akka actors. Additional functionality for Kafka, Flume, ZeroMQ, Twitter, etc. can be imported by adding the right dependencies as explained in the [linking](#linking-with-spark-streaming) section. # DStream Operations -Data received from the input streams can be processed using _DStream operations_. There are two kinds of operations - _transformations_ and _output operations_. Similar to RDD transformations, DStream transformations operate on one or more DStreams to create new DStreams with transformed data. After applying a sequence of transformations to the input streams, output operations need to called, which writes data out to an external data sink like a file system or a database. +Data received from the input streams can be processed using _DStream operations_. There are two kinds of operations - _transformations_ and _output operations_. Similar to RDD transformations, DStream transformations operate on one or more DStreams to create new DStreams with transformed data. After applying a sequence of transformations to the input streams, output operations need to called, which write data out to an external data sink like a file system or a database. ## Transformations diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 844316a1c7b93..71a4c5c93e76a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -54,8 +54,6 @@ import org.apache.spark.streaming.Duration * - A list of other DStreams that the DStream depends on * - A time interval at which the DStream generates an RDD * - A function that is used to generate an RDD after each time interval - * - * There are two types of DStream operations - __transformations__ */ abstract class DStream[T: ClassTag] (