Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/spark into SPARK-1…
Browse files Browse the repository at this point in the history
…712_new
  • Loading branch information
witgo committed May 14, 2014
2 parents 03cc562 + 68f28da commit 9a5cfad
Show file tree
Hide file tree
Showing 18 changed files with 130 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.common.collect.Lists;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
Expand Down Expand Up @@ -48,25 +49,23 @@
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
* `$ ./run org.apache.spark.examples.streaming.JavaCustomReceiver local[2] localhost 9999`
* `$ bin/run-example org.apache.spark.examples.streaming.JavaCustomReceiver localhost 9999`
*/

public class JavaCustomReceiver extends Receiver<String> {
private static final Pattern SPACE = Pattern.compile(" ");

public static void main(String[] args) {
if (args.length < 3) {
System.err.println("Usage: JavaNetworkWordCount <master> <hostname> <port>\n" +
"In local mode, <master> should be 'local[n]' with n > 1");
if (args.length < 2) {
System.err.println("Usage: JavaNetworkWordCount <hostname> <port>");
System.exit(1);
}

StreamingExamples.setStreamingLogLevels();

// Create the context with a 1 second batch size
JavaStreamingContext ssc = new JavaStreamingContext(args[0], "JavaNetworkWordCount",
new Duration(1000), System.getenv("SPARK_HOME"),
JavaStreamingContext.jarOfClass(JavaNetworkWordCount.class));
SparkConf sparkConf = new SparkConf().setAppName("JavaCustomReceiver");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000));

// Create a input stream with the custom receiver on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@
* Your Flume AvroSink should be pointed to this address.
*
* Usage: JavaFlumeEventCount <host> <port>
*
* <host> is the host the Flume receiver will be started on - a receiver
* creates a server and listens for flume events.
* <port> is the port the Flume receiver will listen on.
*
* To run this example:
* `$ bin/run-example org.apache.spark.examples.streaming.JavaFlumeEventCount <host> <port>`
*/
public final class JavaFlumeEventCount {
private JavaFlumeEventCount() {
Expand All @@ -56,7 +58,7 @@ public static void main(String[] args) {
Duration batchInterval = new Duration(2000);
SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval);
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, "localhost", port);
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port);

flumeStream.count();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@

/**
* Consumes messages from one or more topics in Kafka and does wordcount.
*
* Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>
* <zkQuorum> is a list of one or more zookeeper servers that make quorum
* <group> is the name of kafka consumer group
* <topics> is a list of one or more kafka topics to consume from
* <numThreads> is the number of threads the kafka consumer should use
*
* Example:
* `./bin/spark-submit examples.jar \
* --class org.apache.spark.examples.streaming.JavaKafkaWordCount zoo01,zoo02, \
* To run this example:
* `$ bin/run-example org.apache.spark.examples.streaming.JavaKafkaWordCount zoo01,zoo02, \
* zoo03 my-consumer-group topic1,topic2 1`
*/

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.examples.streaming.StreamingExamples;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
Expand All @@ -41,8 +41,7 @@
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
* `$ ./bin/spark-submit examples.jar \
* --class org.apache.spark.examples.streaming.JavaNetworkWordCount localhost 9999`
* `$ bin/run-example org.apache.spark.examples.streaming.JavaNetworkWordCount localhost 9999`
*/
public final class JavaNetworkWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
Expand All @@ -54,13 +53,17 @@ public static void main(String[] args) {
}

StreamingExamples.setStreamingLogLevels();
SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount");

// Create the context with a 1 second batch size
SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000));

// Create a JavaReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(args[0], Integer.parseInt(args[1]));
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,9 @@ object FeederActor {
* <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on.
*
* To run this example locally, you may run Feeder Actor as
* `./bin/spark-submit examples.jar \
* --class org.apache.spark.examples.streaming.FeederActor 127.0.1.1 9999`
* `$ bin/run-example org.apache.spark.examples.streaming.FeederActor 127.0.1.1 9999`
* and then run the example
* `./bin/spark-submit examples.jar --class org.apache.spark.examples.streaming.ActorWordCount \
* 127.0.1.1 9999`
* `$ bin/run-example org.apache.spark.examples.streaming.ActorWordCount 127.0.1.1 9999`
*/
object ActorWordCount {
def main(args: Array[String]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.examples.streaming
import java.io.{InputStreamReader, BufferedReader, InputStream}
import java.net.Socket

import org.apache.spark.Logging
import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
Expand All @@ -30,32 +30,27 @@ import org.apache.spark.streaming.receiver.Receiver
* Custom Receiver that receives data over a socket. Received bytes is interpreted as
* text and \n delimited lines are considered as records. They are then counted and printed.
*
* Usage: CustomReceiver <master> <hostname> <port>
* <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
* <hostname> and <port> of the TCP server that Spark Streaming would connect to receive data.
*
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
* `$ ./run org.apache.spark.examples.streaming.CustomReceiver local[2] localhost 9999`
* `$ bin/run-example org.apache.spark.examples.streaming.CustomReceiver localhost 9999`
*/
object CustomReceiver {
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" +
"In local mode, <master> should be 'local[n]' with n > 1")
if (args.length < 2) {
System.err.println("Usage: CustomReceiver <hostname> <port>")
System.exit(1)
}

StreamingExamples.setStreamingLogLevels()

// Create the context with a 1 second batch size
val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1),
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
val sparkConf = new SparkConf().setAppName("CustomReceiver")
val ssc = new StreamingContext(sparkConf, Seconds(1))

// Create a input stream with the custom receiver on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
val lines = ssc.receiverStream(new CustomReceiver(args(1), args(2).toInt))
val lines = ssc.receiverStream(new CustomReceiver(args(0), args(1).toInt))
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ import org.apache.spark.util.IntParam
* Your Flume AvroSink should be pointed to this address.
*
* Usage: FlumeEventCount <host> <port>
*
* <host> is the host the Flume receiver will be started on - a receiver
* creates a server and listens for flume events.
* <port> is the port the Flume receiver will listen on.
*
* To run this example:
* `$ bin/run-example org.apache.spark.examples.streaming.FlumeEventCount <host> <port> `
*/
object FlumeEventCount {
def main(args: Array[String]) {
if (args.length != 3) {
if (args.length < 2) {
System.err.println(
"Usage: FlumeEventCount <host> <port>")
System.exit(1)
Expand All @@ -49,8 +51,9 @@ object FlumeEventCount {
val Array(host, IntParam(port)) = args

val batchInterval = Milliseconds(2000)
val sparkConf = new SparkConf().setAppName("FlumeEventCount")

// Create the context and set the batch size
val sparkConf = new SparkConf().setAppName("FlumeEventCount")
val ssc = new StreamingContext(sparkConf, batchInterval)

// Create a flume stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ import org.apache.spark.streaming.StreamingContext._
* <directory> is the directory that Spark Streaming will use to find and read new text files.
*
* To run this on your local machine on directory `localdir`, run this example
* `$ ./bin/spark-submit examples.jar \
* --class org.apache.spark.examples.streaming.HdfsWordCount localdir`
* $ bin/run-example \
* org.apache.spark.examples.streaming.HdfsWordCount localdir
*
* Then create a text file in `localdir` and the words in the file will get counted.
*/
object HdfsWordCount {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ import org.apache.spark.SparkConf
* <numThreads> is the number of threads the kafka consumer should use
*
* Example:
* `./bin/spark-submit examples.jar \
* --class org.apache.spark.examples.streaming.KafkaWordCount local[2] zoo01,zoo02,zoo03 \
* my-consumer-group topic1,topic2 1`
* `$ bin/run-example \
* org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 \
* my-consumer-group topic1,topic2 1`
*/
object KafkaWordCount {
def main(args: Array[String]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,14 @@ object MQTTPublisher {
* Example Java code for Mqtt Publisher and Subscriber can be found here
* https://bitbucket.org/mkjinesh/mqttclient
* Usage: MQTTWordCount <MqttbrokerUrl> <topic>
\ * <MqttbrokerUrl> and <topic> describe where Mqtt publisher is running.
* <MqttbrokerUrl> and <topic> describe where Mqtt publisher is running.
*
* To run this example locally, you may run publisher as
* `$ ./bin/spark-submit examples.jar \
* --class org.apache.spark.examples.streaming.MQTTPublisher tcp://localhost:1883 foo`
* `$ bin/run-example \
* org.apache.spark.examples.streaming.MQTTPublisher tcp://localhost:1883 foo`
* and run the example as
* `$ ./bin/spark-submit examples.jar \
* --class org.apache.spark.examples.streaming.MQTTWordCount tcp://localhost:1883 foo`
* `$ bin/run-example \
* org.apache.spark.examples.streaming.MQTTWordCount tcp://localhost:1883 foo`
*/
object MQTTWordCount {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,15 @@ import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel

/**
* Counts words in text encoded with UTF8 received from the network every second.
* Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
*
* Usage: NetworkWordCount <hostname> <port>
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
*
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
* `$ ./bin/spark-submit examples.jar \
* --class org.apache.spark.examples.streaming.NetworkWordCount localhost 9999`
* `$ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999`
*/
object NetworkWordCount {
def main(args: Array[String]) {
Expand All @@ -42,13 +41,16 @@ object NetworkWordCount {
}

StreamingExamples.setStreamingLogLevels()
val sparkConf = new SparkConf().setAppName("NetworkWordCount");

// Create the context with a 1 second batch size
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(1))

// Create a NetworkInputDStream on target ip:port and count the
// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_ONLY_SER)
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ import org.apache.spark.util.IntParam
*
* and run the example as
*
* `$ ./bin/spark-submit examples.jar \
* --class org.apache.spark.examples.streaming.RecoverableNetworkWordCount \
* `$ ./bin/run-example org.apache.spark.examples.streaming.RecoverableNetworkWordCount \
* localhost 9999 ~/checkpoint/ ~/out`
*
* If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create
Expand All @@ -57,7 +56,7 @@ import org.apache.spark.util.IntParam
*
* To run this example in a local standalone cluster with automatic driver recovery,
*
* `$ ./spark-class org.apache.spark.deploy.Client -s launch <cluster-url> \
* `$ bin/spark-class org.apache.spark.deploy.Client -s launch <cluster-url> \
* <path-to-examples-jar> \
* org.apache.spark.examples.streaming.RecoverableNetworkWordCount <cluster-url> \
* localhost 9999 ~/checkpoint ~/out`
Expand All @@ -81,7 +80,7 @@ object RecoverableNetworkWordCount {
// Create the context with a 1 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(1))

// Create a NetworkInputDStream on target ip:port and count the
// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
val lines = ssc.socketTextStream(ip, port)
val words = lines.flatMap(_.split(" "))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import org.apache.spark.streaming.StreamingContext._
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
* `$ ./bin/spark-submit examples.jar
* --class org.apache.spark.examples.streaming.StatefulNetworkWordCount localhost 9999`
* `$ bin/run-example
* org.apache.spark.examples.streaming.StatefulNetworkWordCount localhost 9999`
*/
object StatefulNetworkWordCount {
def main(args: Array[String]) {
Expand All @@ -51,7 +51,7 @@ object StatefulNetworkWordCount {
Some(currentCount + previousCount)
}

val sparkConf = new SparkConf().setAppName("NetworkWordCumulativeCountUpdateStateByKey")
val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount")
// Create the context with a 1 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(1))
ssc.checkpoint(".")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,29 @@ import org.apache.spark.SparkConf
* stream. The stream is instantiated with credentials and optionally filters supplied by the
* command line arguments.
*
* Run this on your local machine as
*
*/
object TwitterPopularTags {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: TwitterPopularTags <consumer key> <consumer secret> " +
"<access token> <access token secret> [<filters>]")
System.exit(1)
}

StreamingExamples.setStreamingLogLevels()

val filters = args
val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
val filters = args.takeRight(args.length - 4)

// Set the system properties so that Twitter4j library used by twitter stream
// can use them to generat OAuth credentials
System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)

val sparkConf = new SparkConf().setAppName("TwitterPopularTags")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val stream = TwitterUtils.createStream(ssc, None, filters)
Expand All @@ -52,13 +68,13 @@ object TwitterPopularTags {

// Print popular hashtags
topCounts60.foreachRDD(rdd => {
val topList = rdd.take(5)
val topList = rdd.take(10)
println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
})

topCounts10.foreachRDD(rdd => {
val topList = rdd.take(5)
val topList = rdd.take(10)
println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
})
Expand Down
Loading

0 comments on commit 9a5cfad

Please sign in to comment.