Skip to content

Commit

Permalink
[YSPARK-414] Build spark-starter for yspark-2.0
Browse files Browse the repository at this point in the history
[YSPARK-414] Build spark-starter for yspark-2.0
This change adds a Scala word count example which writes output to disk.
Updates Java word count for spark 2.0. And adds examples in README file.
  • Loading branch information
Dhruve Ashar committed Jun 29, 2016
1 parent 08b7d03 commit c716ce9
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 46 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
target/
.idea/
spark-starter.iml
29 changes: 24 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,40 @@ mvn clean package
Jars are located in <project-root>/target. Two jars will be created in <project-root>/target, one jar packaged with all dependencies:

```
spark-starter-1.0-SNAPSHOT-jar-with-dependencies.jar
spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar
```

and another packaged with just the example code:

```
spark-starter-1.0-SNAPSHOT.jar
spark-starter-2.0-SNAPSHOT.jar
```

Example of how to run on grid:

> Ensure that ```SPARK_HOME``` and ```SPARK_CONF_DIR``` are set on the gateway or launcher box.
* Running word count example in java.
```
spark-submit --class com.yahoo.spark.starter.SparkPi \
--master yarn --deploy-mode cluster --executor-memory 3g --queue default --num-executors 3 --driver-memory 3g \
$SPARK_STARTER_HOME/target/spark-starter-1.0-SNAPSHOT-jar-with-dependencies.jar
```
--master yarn \
--deploy-mode cluster \
--executor-memory 3g \
--queue default \
--num-executors 3 \
--driver-memory 3g \
$SPARK_STARTER_HOME/target/spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar
```

* Running word count example in scala which writes output to provided uri.
```
spark-submit --class com.yahoo.spark.starter.ScalaWordCount \
--master yarn \
--deploy-mode cluster \
--queue default \
--driver-memory 5g \
$SPARK_STARTER_HOME/target/spark-starter-2.0-SNAPSHOT.jar largeRandomText/chunk[1-3]/* largeRandomText/output/
```

For more details please reference

Expand Down
18 changes: 14 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@

<groupId>com.yahoo.spark.starter</groupId>
<artifactId>spark-starter</artifactId>
<version>1.0-SNAPSHOT</version>
<version>2.0-SNAPSHOT</version>

<properties>
<spark.version.full>1.5.1.1_2.6.0.16.1506060127_1510272107</spark.version.full>
<spark.scala.version>2.10</spark.scala.version>
<scala.version>2.10.4</scala.version>
<spark.version.full>2.0.0.10</spark.version.full>
<spark.scala.version>2.11</spark.scala.version>
<scala.version>2.11.8</scala.version>
<java.version>1.7</java.version>
</properties>

<repositories>
Expand Down Expand Up @@ -75,6 +76,15 @@

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
Expand Down
80 changes: 43 additions & 37 deletions src/main/java/com/yahoo/spark/starter/JavaWordCount.java
Original file line number Diff line number Diff line change
@@ -1,58 +1,64 @@
package com.yahoo.spark.starter;

import scala.Tuple2;
import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;

// A simple Word Count example in Java.
public final class JavaWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
private static final Pattern SPACE = Pattern.compile(" ");

public static void main(String[] args) throws Exception {

if (args.length < 1) {
System.err.println("Usage: JavaWordCount <file>");
System.exit(1);
}

public static void main(String[] args) throws Exception {
SparkSession spark = SparkSession
.builder()
.appName("JavaWordCount")
.getOrCreate();

if (args.length < 1) {
System.err.println("Usage: JavaWordCount <file>");
System.exit(1);
JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();

JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) {
return Arrays.asList(SPACE.split(s)).iterator();
}
});

JavaPairRDD<String, Integer> ones = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<>(s, 1);
}
});

SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaRDD<String> lines = ctx.textFile(args[0], 1);

JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String s) {
return Arrays.asList(SPACE.split(s));
}
});

JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});

JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});

List<Tuple2<String, Integer>> output = counts.collect();
for (Tuple2<?, ?> tuple : output) {
System.out.println(tuple._1() + ": " + tuple._2());
JavaPairRDD<String, Integer> counts = ones.reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
ctx.stop();
});

List<Tuple2<String, Integer>> output = counts.collect();
for (Tuple2<?,?> tuple : output) {
System.out.println(tuple._1() + ": " + tuple._2());
}
spark.stop();
}
}

31 changes: 31 additions & 0 deletions src/main/scala/com/yahoo/spark/starter/ScalaWordCount.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.yahoo.spark.starter

import org.apache.spark._

// Simple example of Word Count in Scala
object ScalaWordCount {
def main(args: Array[String]) {

if (args.length < 2) {
System.err.println("Usage: ScalaWordCount <inputFilesURI> <outputFilesUri>")
System.exit(1)
}

val conf = new SparkConf().setAppName("Scala Word Count")
val sc = new SparkContext(conf)

// get the input file uri
val inputFilesUri = args(0)

// get the output file uri
val outputFilesUri = args(1)

val textFile = sc.textFile(inputFilesUri)
val counts = textFile.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
counts.saveAsTextFile(outputFilesUri)

sc.stop()
}
}

0 comments on commit c716ce9

Please sign in to comment.