Quick Start Guide
You can follow the instructions here
Download a spark version. As of this writing, we have tested with spark-1.5.2
We included a sample of the TPCH dataset scale1 in the quickstart folder. This is includes a sample of facts from the datascale1 dataset. Note the rows in the flattened dataset and the original lineitem table may not be the same; so don't use these datasets to compare result data values.
Follow the instructions here to create the Druid Index for the flattened dataset. Copy and edit the tpch_index_task.json.template. You need to change the location of the baseDir in your configuration.
When starting the spark-shell, include the SparklineData packages and setup the DruidPlanner
For example here is how you can start the spark-shell
bin/spark-shell \
--packages com.databricks:spark-csv_2.10:1.1.0,SparklineData:spark-datetime:0.0.2,SparklineData:spark-druid-olap:0.0.2
In the shell issue the following setup statements: these register the Sparklinedata datetime functions and also register the Rewrite rules of the Sparklinedata DruidPlanner with the Spark SQLContext.
import org.sparklinedata.spark.dateTime.Functions._
import org.apache.spark.sql.sources.druid.DruidPlanner
Setup the Datasets in Spark:
- you need to change the following to point to your folder for the orderLineItemPartSupplierCustomer flattened dataset.
CREATE TEMPORARY TABLE orderLineItemPartSupplierBase(o_orderkey integer,
o_custkey integer,
o_orderstatus string, o_totalprice double, o_orderdate string, o_orderpriority string,
o_clerk string,
o_shippriority integer, o_comment string, l_partkey integer, l_suppkey integer,
l_linenumber integer,
l_quantity double, l_extendedprice double, l_discount double, l_tax double,
l_returnflag string,
l_linestatus string, l_shipdate string, l_commitdate string, l_receiptdate string,
l_shipinstruct string,
l_shipmode string, l_comment string, order_year string, ps_partkey integer,
ps_suppkey integer,
ps_availqty integer, ps_supplycost double, ps_comment string, s_name string, s_address string,
s_phone string, s_acctbal double, s_comment string, s_nation string,
s_region string, p_name string,
p_mfgr string, p_brand string, p_type string, p_size integer, p_container string,
p_retailprice double,
p_comment string, c_name string , c_address string , c_phone string , c_acctbal double ,
c_mktsegment string , c_comment string , c_nation string , c_region string)
USING com.databricks.spark.csv
OPTIONS (path "<location of orderLineItemPartSupplierCustomer>",
header "false", delimiter "|")
// now create the orderLineItemPartSupplier DataFrame that links the raw data with the Druid Index
CREATE TEMPORARY TABLE orderLineItemPartSupplier
USING org.sparklinedata.druid
OPTIONS (sourceDataframe "orderLineItemPartSupplierBase",
timeDimensionColumn "l_shipdate",
druidDatasource "tpch",
druidHost "localhost",
druidPort "8082",
columnMapping '{ "l_quantity" : "sum_l_quantity", "ps_availqty" : "sum_ps_availqty", "cn_name" : "c_nation", "cr_name" : "c_region", "sn_name" : "s_nation", "sr_name" : "s_region" } ',
functionalDependencies '[ {"col1" : "c_name", "col2" : "c_address", "type" : "1-1"}, {"col1" : "c_phone", "col2" : "c_address", "type" : "1-1"}, {"col1" : "c_name", "col2" : "c_mktsegment", "type" : "n-1"}, {"col1" : "c_name", "col2" : "c_comment", "type" : "1-1"}, {"col1" : "c_name", "col2" : "c_nation", "type" : "n-1"}, {"col1" : "c_nation", "col2" : "c_region", "type" : "n-1"} ] ',
starSchema ' { "factTable" : "orderLineItemPartSupplier", "relations" : [] } ')
Now you can run queries against orderLineItemPartSupplier which will be rewritten to use the Druid Index. For example:
select l_returnflag as r, l_linestatus as ls,
count(*), sum(l_extendedprice) as s, max(ps_supplycost) as m, avg(ps_availqty) as a
from orderLineItemPartSupplier
group by l_returnflag, l_linestatus
order by s, ls, r
limit 3""".stripMargin
You can compare this query against the non rewritten query against the raw flattened table:
select l_returnflag as r, l_linestatus as ls,
count(*), sum(l_extendedprice) as s, max(ps_supplycost) as m, avg(ps_availqty) as a
from orderLineItemPartSupplierBase
group by l_returnflag, l_linestatus
order by s, ls, r
limit 3""".stripMargin
- set the TPCH_BASE_DIR appropriately. In the following we have it at
val TPCH_BASE_DIR = sys.env("HOME") + "/tpch/datascale1"
def tpchDataFolder(tableName : String) = s"$TPCH_BASE_DIR/$tableName/"
sql(s"""CREATE TEMPORARY TABLE lineitembase(l_orderkey integer,
l_partkey integer, l_suppkey integer,
l_linenumber integer,
l_quantity double, l_extendedprice double, l_discount double, l_tax double,
l_returnflag string,
l_linestatus string, l_shipdate string, l_commitdate string, l_receiptdate string,
l_shipinstruct string,
l_shipmode string, l_comment string)
USING com.databricks.spark.csv
OPTIONS (path "${tpchDataFolder("lineitem")}",
header "false", delimiter "|")""".stripMargin)
|o_orderkey integer, o_custkey integer,
| o_orderstatus VARCHAR(1),
| o_totalprice double,
| o_orderdate string,
| o_orderpriority VARCHAR(15),
| o_clerk VARCHAR(15),
| o_shippriority integer,
| o_comment VARCHAR(79)
USING com.databricks.spark.csv
OPTIONS (path "${tpchDataFolder("orders")}",
header "false", delimiter "|")""".stripMargin)
sql(s"""CREATE TEMPORARY TABLE partsupp(
| ps_partkey integer, ps_suppkey integer,
| ps_availqty integer, ps_supplycost double,
| ps_comment VARCHAR(199)
USING com.databricks.spark.csv
OPTIONS (path "${tpchDataFolder("partsupp")}",
header "false", delimiter "|")""".stripMargin)
sql(s"""CREATE TEMPORARY TABLE supplier(
s_suppkey integer, s_name string, s_address string,
s_nationkey integer,
| s_phone string, s_acctbal double, s_comment string)
USING com.databricks.spark.csv
OPTIONS (path "${tpchDataFolder("supplier")}",
header "false", delimiter "|")""".stripMargin)
sql(s"""CREATE TEMPORARY TABLE part(p_partkey integer, p_name string,
| p_mfgr string, p_brand string, p_type string, p_size integer, p_container string,
| p_retailprice double,
| p_comment string)
USING com.databricks.spark.csv
OPTIONS (path "${tpchDataFolder("part")}",
header "false", delimiter "|")""".stripMargin)
sql(s"""CREATE TEMPORARY TABLE customer(
| c_custkey INTEGER,
| c_name VARCHAR(25),
| c_address VARCHAR(40),
| c_nationkey INTEGER,
| c_phone VARCHAR(15),
| c_acctbal double,
| c_mktsegment VARCHAR(10),
| c_comment VARCHAR(117)
USING com.databricks.spark.csv
OPTIONS (path "${tpchDataFolder("customer")}",
header "false", delimiter "|")""".stripMargin)
sql(s"""CREATE TEMPORARY TABLE custnation(
| cn_nationkey integer, cn_name VARCHAR(25),
| cn_regionkey integer, cn_comment VARCHAR(152)
USING com.databricks.spark.csv
OPTIONS (path "${tpchDataFolder("nation")}",
header "false", delimiter "|")""".stripMargin)
sql(s"""CREATE TEMPORARY TABLE custregion(
| cr_regionkey integer, cr_name VARCHAR(25),
| cr_comment VARCHAR(152)
USING com.databricks.spark.csv
OPTIONS (path "${tpchDataFolder("region")}",
header "false", delimiter "|")""".stripMargin)
sql(s"""CREATE TEMPORARY TABLE suppnation(
| sn_nationkey integer, sn_name VARCHAR(25),
| sn_regionkey integer, sn_comment VARCHAR(152)
USING com.databricks.spark.csv
OPTIONS (path "${tpchDataFolder("nation")}",
header "false", delimiter "|")""".stripMargin)
sql(s"""CREATE TEMPORARY TABLE suppregion(
| sr_regionkey integer, sr_name VARCHAR(25),
| sr_comment VARCHAR(152)
USING com.databricks.spark.csv
OPTIONS (path "${tpchDataFolder("region")}",
header "false", delimiter "|")""".stripMargin)
This links the lineitembase DataFrame with the Druid Index and we also define the Star Schema(column relationships) details. The Star Schema Definition page describes how to define a Star-Schema.
USING org.sparklinedata.druid
OPTIONS (sourceDataframe "lineItemBase",
timeDimensionColumn "l_shipdate",
druidDatasource "tpch",
druidHost "localhost",
druidPort "8082",
columnMapping '{ "l_quantity" : "sum_l_quantity", "ps_availqty" : "sum_ps_availqty", "cn_name" : "c_nation", "cr_name" : "c_region", "sn_name" : "s_nation", "sr_name" : "s_region" } ',
functionalDependencies '[ {"col1" : "c_name", "col2" : "c_address", "type" : "1-1"}, {"col1" : "c_phone", "col2" : "c_address", "type" : "1-1"}, {"col1" : "c_name", "col2" : "c_mktsegment", "type" : "n-1"}, {"col1" : "c_name", "col2" : "c_comment", "type" : "1-1"}, {"col1" : "c_name", "col2" : "c_nation", "type" : "n-1"}, {"col1" : "c_nation", "col2" : "c_region", "type" : "n-1"} ] ',
starSchema ' { "factTable" : "lineitem", "relations" : [ { "leftTable" : "lineitem", "rightTable" : "orders", "relationType" : "n-1", "joinCondition" : [ { "leftAttribute" : "l_orderkey", "rightAttribute" : "o_orderkey" } ] }, { "leftTable" : "lineitem", "rightTable" : "partsupp", "relationType" : "n-1", "joinCondition" : [ { "leftAttribute" : "l_partkey", "rightAttribute" : "ps_partkey" }, { "leftAttribute" : "l_suppkey", "rightAttribute" : "ps_suppkey" } ] }, { "leftTable" : "partsupp", "rightTable" : "part", "relationType" : "n-1", "joinCondition" : [ { "leftAttribute" : "ps_partkey", "rightAttribute" : "p_partkey" } ] }, { "leftTable" : "partsupp", "rightTable" : "supplier", "relationType" : "n-1", "joinCondition" : [ { "leftAttribute" : "ps_suppkey", "rightAttribute" : "s_suppkey" } ] }, { "leftTable" : "orders", "rightTable" : "customer", "relationType" : "n-1", "joinCondition" : [ { "leftAttribute" : "o_custkey", "rightAttribute" : "c_custkey" } ] }, { "leftTable" : "customer", "rightTable" : "custnation", "relationType" : "n-1", "joinCondition" : [ { "leftAttribute" : "c_nationkey", "rightAttribute" : "cn_nationkey" } ] }, { "leftTable" : "custnation", "rightTable" : "custregion", "relationType" : "n-1", "joinCondition" : [ { "leftAttribute" : "cn_regionkey", "rightAttribute" : "cr_regionkey" } ] }, { "leftTable" : "supplier", "rightTable" : "suppnation", "relationType" : "n-1", "joinCondition" : [ { "leftAttribute" : "s_nationkey", "rightAttribute" : "sn_nationkey" } ] }, { "leftTable" : "suppnation", "rightTable" : "suppregion", "relationType" : "n-1", "joinCondition" : [ { "leftAttribute" : "sn_regionkey", "rightAttribute" : "sr_regionkey" } ] } ] } ')
Now you can run queries against star schema which will be rewritten to use the Druid Index. For example the following is TPCH Q3:
sum(l_extendedprice) as price, o_orderdate,
from customer,
where c_mktsegment = 'BUILDING' and dateIsBefore(dateTime(`o_orderdate`),dateTime("1995-03-15")) and dateIsAfter(dateTime(`l_shipdate`),dateTime("1995-03-15"))
and c_custkey = o_custkey
and l_orderkey = o_orderkey
group by o_orderkey,
Compare the above against TPCH Q3 running against the original lineitem DataFrame:
sum(l_extendedprice) as price, o_orderdate,
from customer,
where c_mktsegment = 'BUILDING' and dateIsBefore(dateTime(`o_orderdate`),dateTime("1995-03-15")) and dateIsAfter(dateTime(`l_shipdate`),dateTime("1995-03-15"))
and c_custkey = o_custkey
and l_orderkey = o_orderkey
group by o_orderkey,
