Skip to content

Commit

Permalink
Merge pull request apache#7 from dashar/branch-2.0
Browse files Browse the repository at this point in the history
Added a SparkHBase example alongwith Oozie files
  • Loading branch information
Tom Graves committed Sep 29, 2016
2 parents c716ce9 + 10e829f commit 174ef88
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 0 deletions.
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
<spark.scala.version>2.11</spark.scala.version>
<scala.version>2.11.8</scala.version>
<java.version>1.7</java.version>
<hbase.version>0.94.6.7.1304181342</hbase.version>
</properties>

<repositories>
Expand Down Expand Up @@ -72,6 +73,12 @@
<artifactId>scalap</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>${hbase.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
Expand Down
9 changes: 9 additions & 0 deletions src/main/resources/oozie/hbase/job.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
nameNode=hdfs://axonitered-nn1.red.ygrid.yahoo.com:8020
jobTracker=axonitered-jt1.red.ygrid.yahoo.com:8032
queueName=default
# Enter your workflow root here
wfRoot=spark_oozie
# This is to parameterize the HBase tablename.
tableName=dashar123
oozie.libpath=/user/${user.name}/${wfRoot}/apps/lib
oozie.wf.application.path=${nameNode}/user/${user.name}/${wfRoot}/apps/spark
33 changes: 33 additions & 0 deletions src/main/resources/oozie/hbase/workflow.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<workflow-app xmlns='uri:oozie:workflow:0.5' name='SparkHBaseViaOozieTest'>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
</global>
<credentials>
<credential name="hbase.cert" type="hbase"></credential>
</credentials>
<start to="spark-node"/>
<action name='spark-node' cred="hbase.cert">
<spark xmlns="uri:oozie:spark-action:0.2">
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark_latest,hbase_current,hbase_conf_reluxred</value>
</property>
</configuration>
<master>yarn</master>
<mode>cluster</mode>
<name>SparkHBaseViaOozieTest</name>
<class>com.yahoo.spark.starter.SparkClusterHBase</class>
<jar>spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar</jar>
<spark-opts>--queue default --conf "spark.yarn.security.tokens.hbase.enabled=false" --conf "spark.yarn.security.tokens.hive.enabled=false"</spark-opts>
<arg>${tableName}</arg>
</spark>
<ok to="end" />
<error to="fail" />
</action>
<kill name="fail">
<message>Script failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name='end' />
</workflow-app>
57 changes: 57 additions & 0 deletions src/main/scala/com/yahoo/spark/starter/SparkClusterHBase.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.yahoo.spark.starter

import org.apache.hadoop.hbase.client.{HBaseAdmin, HTable, Put}
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, HColumnDescriptor, TableName}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat

import org.apache.spark._
import org.apache.spark.sql.SparkSession


// Simple example of accessing HBase from Spark
object SparkClusterHBase {

def main(args: Array[String]) {

if (args == null || args.length < 1) {
System.err.println("Usage: SparkClusterHBase <tableName>")
System.exit(1)
}

// Use the new 2.0 API. If you are using 1.6.2 create the spark conf and context as in 1.6 examples.
val spark = SparkSession
.builder
.appName("Spark HBase Example")
.getOrCreate()

val hconf = HBaseConfiguration.create()
val tableName = s"spark_test:${args(0)}"
hconf.set(TableInputFormat.INPUT_TABLE, tableName)
val admin = new HBaseAdmin(hconf)

// create the table if not existed
if(!admin.isTableAvailable(tableName)) {
val tableDesc = new HTableDescriptor(tableName)
tableDesc.addFamily(new HColumnDescriptor("cf1".getBytes()));
admin.createTable(tableDesc)
}

// put data into the table
val myTable = new HTable(hconf, tableName);
for (i <- 0 to 5) {
val p = new Put(new String("row" + i).getBytes());
p.add("cf1".getBytes(), "column-1".getBytes(), new String("value " + i).getBytes());
myTable.put(p);
}
myTable.flushCommits();

// access the table through RDD
val hBaseRDD = spark.sparkContext.newAPIHadoopRDD(hconf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
val count = hBaseRDD.count()
print("HBase RDD count:"+count)

spark.stop
}
}

0 comments on commit 174ef88

Please sign in to comment.