Skip to content

Commit

Permalink
Merge pull request apache#416 from tdas/filestream-fix
Browse files Browse the repository at this point in the history
Removed unnecessary DStream operations and updated docs

Removed StreamingContext.registerInputStream and registerOutputStream - they were useless. InputDStream has been made to register itself, and just registering a DStream as output stream cause RDD objects to be created but the RDDs will not be computed at all.. Also made DStream.register() private[streaming] for the same reasons.

Updated docs, specially added package documentation for streaming package.

Also, changed NetworkWordCount's input storage level to use MEMORY_ONLY, replication on the local machine causes warning messages (as replication fails) which is scary for a new user trying out his/her first example.
  • Loading branch information
pwendell committed Jan 14, 2014
2 parents 055be5c + f8bd828 commit 980250b
Show file tree
Hide file tree
Showing 21 changed files with 110 additions and 107 deletions.
32 changes: 17 additions & 15 deletions docs/streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,33 +21,37 @@ Add the following SBT or Maven dependency to your project to use Spark Streaming
artifactId = spark-streaming_{{site.SCALA_VERSION}}
version = {{site.SPARK_VERSION}}

For ingesting data from sources like Kafka and Flume, add the corresponding artifact `spark-streaming-xyz_{{site.SCALA_VERSION}}` to the dependencies. For example, `spark-streaming-kafka_{{site.SCALA_VERSION}}` for Kafka, `spark-streaming-flume_{{site.SCALA_VERSION}}`, etc. Please refer to the [Apache repository](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.spark%22%20AND%20v%3A%22{{site.SPARK_VERSION}}%22) for the full list of supported sources / artifacts.

# Initializing Spark Streaming
The first thing a Spark Streaming program must do is create a `StreamingContext` object, which tells Spark how to access a cluster. A `StreamingContext` can be created by using

{% highlight scala %}
new StreamingContext(master, appName, batchDuration, [sparkHome], [jars])
{% endhighlight %}

The `master` parameter is a standard [Spark cluster URL](scala-programming-guide.html#master-urls) and can be "local" for local testing. The `appName` is a name of your program, which will be shown on your cluster's web UI. The `batchDuration` is the size of the batches (as explained earlier). This must be set carefully such that the cluster can keep up with the processing of the data streams. Start with something conservative like 5 seconds. See the [Performance Tuning](#setting-the-right-batch-size) section for a detailed discussion. Finally, `sparkHome` and `jars` are necessary when running on a cluster to specify the location of your code, as described in the [Spark programming guide](scala-programming-guide.html#deploying-code-on-a-cluster).
The `master` parameter is a standard [Spark cluster URL](scala-programming-guide.html#master-urls) and can be "local" for local testing. The `appName` is a name of your program, which will be shown on your cluster's web UI. The `batchDuration` is the size of the batches (as explained earlier). This must be set carefully such that the cluster can keep up with the processing of the data streams. Start with something conservative like 5 seconds. See the [Performance Tuning](#setting-the-right-batch-size) section for a detailed discussion. Finally, `sparkHome` and `jars` are optional parameters, which need to be set when running on a cluster to specify the location of your code, as described in the [Spark programming guide](scala-programming-guide.html#deploying-code-on-a-cluster).

This constructor creates a SparkContext for your job as well, which can be accessed with `streamingContext.sparkContext`.
{% highlight scala %}
new SparkConf(conf, batchDuration)
{% endhighlight %}

where `conf` is a [SparkConf](api/core/index.html#org.apache.spark.SparkConf)
object used for more advanced configuration. In both cases, a [SparkContext](api/core/index.html#org.apache.spark.SparkContext) is created as well which can be accessed with `streamingContext.sparkContext`.

# Attaching Input Sources - InputDStreams
The StreamingContext is used to creating InputDStreams from input sources:
# Attaching Input Sources
The StreamingContext is used to creating input streams from data sources:

{% highlight scala %}
// Assuming ssc is the StreamingContext
ssc.textFileStream(directory) // Creates a stream by monitoring and processing new files in a HDFS directory
ssc.socketStream(hostname, port) // Creates a stream that uses a TCP socket to read data from hostname:port
ssc.textFileStream(directory) // Creates a stream that monitors and processes new files in a HDFS directory
ssc.socketStream(hostname, port) // Creates a stream that uses a TCP socket to read data from hostname:port
{% endhighlight %}

We also provide a input streams for Kafka, Flume, Akka actor, etc. For a complete list of input streams, take a look at the [StreamingContext API documentation](api/streaming/index.html#org.apache.spark.streaming.StreamingContext).


The core Spark Streaming API provides input streams for files, sockets, and Akka actors. Additional functionality for Kafka, Flume, ZeroMQ, Twitter, etc. can be imported by adding the right dependencies as explained in the [linking](#linking-with-spark-streaming) section.

# DStream Operations
Data received from the input streams can be processed using _DStream operations_. There are two kinds of operations - _transformations_ and _output operations_. Similar to RDD transformations, DStream transformations operate on one or more DStreams to create new DStreams with transformed data. After applying a sequence of transformations to the input streams, you'll need to call the output operations, which writies data out to an external source.
Data received from the input streams can be processed using _DStream operations_. There are two kinds of operations - _transformations_ and _output operations_. Similar to RDD transformations, DStream transformations operate on one or more DStreams to create new DStreams with transformed data. After applying a sequence of transformations to the input streams, output operations need to called, which write data out to an external data sink like a file system or a database.

## Transformations

Expand Down Expand Up @@ -234,7 +238,7 @@ wordCounts.print()
ssc.start()
{% endhighlight %}

The `socketTextStream` returns a DStream of lines received from a TCP socket-based source. The `lines` DStream is _transformed_ into a DStream using the `flatMap` operation, where each line is split into words. This `words` DStream is then mapped to a DStream of `(word, 1)` pairs, which is finally reduced to get the word counts. `wordCounts.print()` will print 10 of the counts generated every second.
The `socketTextStream` returns a DStream of text data received from a TCP server socket. The `lines` DStream is _transformed_ into a DStream using the `flatMap` operation, where each line is split into words. This `words` DStream is then mapped to a DStream of `(word, 1)` pairs, which is finally reduced to get the word counts. `wordCounts.print()` will print 10 of the counts generated every second.

To run this example on your local machine, you need to first run a Netcat server by using

Expand Down Expand Up @@ -270,14 +274,12 @@ hello world
{% highlight bash %}
# TERMINAL 2: RUNNING NetworkWordCount
...
2012-12-31 18:47:10,446 INFO SparkContext: Job finished: run at ThreadPoolExecutor.java:886, took 0.038817 s
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)

2012-12-31 18:47:10,447 INFO JobManager: Total delay: 0.44700 s for job 8 (execution: 0.44000 s)
...
{% endhighlight %}
</td>
Expand Down Expand Up @@ -384,7 +386,7 @@ A system that is required to operate 24/7 needs to be able tolerate the failure
1. The configuration of each DStream (checkpoint interval, etc.)
1. The RDD checkpoint files of each DStream

All this is periodically saved in the file `<checkpoint directory>/graph`. To recover, a new Streaming Context can be created with this directory by using
All this is periodically saved in the checkpoint directory. To recover, a new `StreamingContext` can be created with this directory by using

{% highlight scala %}
val ssc = new StreamingContext(checkpointDirectory)
Expand All @@ -395,7 +397,7 @@ On calling `ssc.start()` on this new context, the following steps are taken by t
1. Schedule the transformations and output operations for all the time steps between the time when the driver failed and when it last checkpointed. This is also done for those time steps that were previously scheduled but not processed due to the failure. This will make the system recompute all the intermediate data from the checkpointed RDD files, etc.
1. Restart the network receivers, if any, and continue receiving new data.

In the current _alpha_ release, there are two different failure behaviors based on which input sources are used.
There are two different failure behaviors based on which input sources are used.

1. _Using HDFS files as input source_ - Since the data is reliably stored on HDFS, all data can re-computed and therefore no data will be lost due to any failure.
1. _Using any input source that receives data through a network_ - The received input data is replicated in memory to multiple nodes. Since, all the data in the Spark worker's memory is lost when the Spark driver fails, the past input data will not be accessible and driver recovers. Hence, if stateful and window-based operations are used (like `updateStateByKey`, `window`, `countByValueAndWindow`, etc.), then the intermediate state will not be recovered completely.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.streaming.examples

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel

/**
* Counts words in text encoded with UTF8 received from the network every second.
Expand Down Expand Up @@ -48,7 +49,7 @@ object NetworkWordCount {

// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
val lines = ssc.socketTextStream(args(1), args(2).toInt)
val lines = ssc.socketTextStream(args(1), args(2).toInt, StorageLevel.MEMORY_ONLY_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ object FlumeUtils {
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel)
ssc.registerInputStream(inputStream)
inputStream
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class FlumeStreamSuite extends TestSuiteBase {
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
ssc.registerOutputStream(outputStream)
outputStream.register()
ssc.start()

val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,7 @@ object KafkaUtils {
topics: Map[String, Int],
storageLevel: StorageLevel
): DStream[(K, V)] = {
val inputStream = new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel)
ssc.registerInputStream(inputStream)
inputStream
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ object MQTTUtils {
topic: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[String] = {
val inputStream = new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel)
ssc.registerInputStream(inputStream)
inputStream
new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ object TwitterUtils {
filters: Seq[String] = Nil,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[Status] = {
val inputStream = new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)
ssc.registerInputStream(inputStream)
inputStream
new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,7 @@ class StreamingContext private[streaming] (
*/
def networkStream[T: ClassTag](
receiver: NetworkReceiver[T]): DStream[T] = {
val inputStream = new PluggableInputDStream[T](this,
receiver)
graph.addInputStream(inputStream)
inputStream
new PluggableInputDStream[T](this, receiver)
}

/**
Expand Down Expand Up @@ -259,9 +256,7 @@ class StreamingContext private[streaming] (
converter: (InputStream) => Iterator[T],
storageLevel: StorageLevel
): DStream[T] = {
val inputStream = new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
registerInputStream(inputStream)
inputStream
new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
}

/**
Expand All @@ -280,9 +275,7 @@ class StreamingContext private[streaming] (
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[T] = {
val inputStream = new RawInputDStream[T](this, hostname, port, storageLevel)
registerInputStream(inputStream)
inputStream
new RawInputDStream[T](this, hostname, port, storageLevel)
}

/**
Expand All @@ -300,9 +293,7 @@ class StreamingContext private[streaming] (
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag
] (directory: String): DStream[(K, V)] = {
val inputStream = new FileInputDStream[K, V, F](this, directory)
registerInputStream(inputStream)
inputStream
new FileInputDStream[K, V, F](this, directory)
}

/**
Expand All @@ -322,9 +313,7 @@ class StreamingContext private[streaming] (
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag
] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): DStream[(K, V)] = {
val inputStream = new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
registerInputStream(inputStream)
inputStream
new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
}

/**
Expand Down Expand Up @@ -367,9 +356,7 @@ class StreamingContext private[streaming] (
oneAtATime: Boolean,
defaultRDD: RDD[T]
): DStream[T] = {
val inputStream = new QueueInputDStream(this, queue, oneAtATime, defaultRDD)
registerInputStream(inputStream)
inputStream
new QueueInputDStream(this, queue, oneAtATime, defaultRDD)
}

/**
Expand All @@ -390,21 +377,6 @@ class StreamingContext private[streaming] (
new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc))
}

/**
* Register an input stream that will be started (InputDStream.start() called) to get the
* input data.
*/
def registerInputStream(inputStream: InputDStream[_]) {
graph.addInputStream(inputStream)
}

/**
* Register an output stream that will be computed every interval
*/
def registerOutputStream(outputStream: DStream[_]) {
graph.addOutputStream(outputStream)
}

/** Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for
* receiving system events related to streaming.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,13 +319,6 @@ class JavaStreamingContext(val ssc: StreamingContext) {
ssc.actorStream[T](props, name)
}

/**
* Registers an output stream that will be computed every interval
*/
def registerOutputStream(outputStream: JavaDStreamLike[_, _, _]) {
ssc.registerOutputStream(outputStream.dstream)
}

/**
* Creates a input stream from an queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ import org.apache.spark.streaming.Duration
/**
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
* sequence of RDDs (of the same type) representing a continuous stream of data (see
* [[org.apache.spark.rdd.RDD]] for more details on RDDs). DStreams can either be created from
* live data (such as, data from * HDFS, Kafka or Flume) or it can be generated by transformation
* existing DStreams using operations such as `map`, `window` and `reduceByKeyAndWindow`.
* While a Spark Streaming program is running, each DStream periodically generates a RDD,
* either from live data or by transforming the RDD generated by a parent DStream.
* org.apache.spark.rdd.RDD in the Spark core documentation for more details on RDDs).
* DStreams can either be created from live data (such as, data from Kafka, Flume, sockets, HDFS)
* or it can be generated by transforming existing DStreams using operations such as `map`,
* `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each DStream
* periodically generates a RDD, either from live data or by transforming the RDD generated by a
* parent DStream.
*
* This class contains the basic operations available on all DStreams, such as `map`, `filter` and
* `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains
Expand Down Expand Up @@ -523,7 +524,7 @@ abstract class DStream[T: ClassTag] (
* 'this' DStream will be registered as an output stream and therefore materialized.
*/
def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) {
ssc.registerOutputStream(new ForEachDStream(this, context.sparkContext.clean(foreachFunc)))
new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
}

/**
Expand Down Expand Up @@ -590,8 +591,7 @@ abstract class DStream[T: ClassTag] (
if (first11.size > 10) println("...")
println()
}
val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc))
ssc.registerOutputStream(newStream)
new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
}

/**
Expand Down Expand Up @@ -768,8 +768,8 @@ abstract class DStream[T: ClassTag] (
* Register this streaming as an output stream. This would ensure that RDDs of this
* DStream will be generated.
*/
def register(): DStream[T] = {
ssc.registerOutputStream(this)
private[streaming] def register(): DStream[T] = {
ssc.graph.addOutputStream(this)
this
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,24 @@ import org.apache.spark.streaming.{Time, Duration, StreamingContext}
import scala.reflect.ClassTag

/**
* This is the abstract base class for all input streams. This class provides to methods
* start() and stop() which called by the scheduler to start and stop receiving data/
* Input streams that can generated RDDs from new data just by running a service on
* the driver node (that is, without running a receiver onworker nodes) can be
* implemented by directly subclassing this InputDStream. For example,
* FileInputDStream, a subclass of InputDStream, monitors a HDFS directory for
* new files and generates RDDs on the new files. For implementing input streams
* that requires running a receiver on the worker nodes, use NetworkInputDStream
* as the parent class.
* This is the abstract base class for all input streams. This class provides methods
* start() and stop() which is called by Spark Streaming system to start and stop receiving data.
* Input streams that can generate RDDs from new data by running a service/thread only on
* the driver node (that is, without running a receiver on worker nodes), can be
* implemented by directly inheriting this InputDStream. For example,
* FileInputDStream, a subclass of InputDStream, monitors a HDFS directory from the driver for
* new files and generates RDDs with the new files. For implementing input streams
* that requires running a receiver on the worker nodes, use
* [[org.apache.spark.streaming.dstream.NetworkInputDStream]] as the parent class.
*
* @param ssc_ Streaming context that will execute this input stream
*/
abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
extends DStream[T](ssc_) {

var lastValidTime: Time = null
private[streaming] var lastValidTime: Time = null

ssc.graph.addInputStream(this)

/**
* Checks whether the 'time' is valid wrt slideDuration for generating RDD.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId}
import org.apache.spark.streaming.scheduler.{DeregisterReceiver, AddBlocks, RegisterReceiver}

/**
* Abstract class for defining any InputDStream that has to start a receiver on worker
* nodes to receive external data. Specific implementations of NetworkInputDStream must
* Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
* that has to start a receiver on worker nodes to receive external data.
* Specific implementations of NetworkInputDStream must
* define the getReceiver() function that gets the receiver object of type
* [[org.apache.spark.streaming.dstream.NetworkReceiver]] that will be sent to the workers to receive
* data.
* [[org.apache.spark.streaming.dstream.NetworkReceiver]] that will be sent
* to the workers to receive data.
* @param ssc_ Streaming context that will execute this input stream
* @tparam T Class type of the object of this stream
*/
Expand Down
Loading

0 comments on commit 980250b

Please sign in to comment.