Skip to content
This repository has been archived by the owner on Mar 30, 2021. It is now read-only.

Quick Start Guide

hbutani edited this page Jan 21, 2016 · 20 revisions

Install and Setup Druid

You can follow the instructions here

Download and unzip Spark

Download a spark version. As of this writing, we have tested with spark-1.5.2

The TPCH dataset

We included a sample of the TPCH dataset scale1 in the quickstart folder. This is includes a sample of facts from the datascale1 dataset. Note the rows in the flattened dataset and the original lineitem table may not be the same; so don't use these datasets to compare result data values.

Index sample tpch dataset

Follow the instructions here to create the Druid Index for the flattened dataset. Copy and edit the tpch_index_task.json.template. You need to change the location of the baseDir in your configuration.

Starting the spark-shell

When starting the spark-shell, include the SparklineData packages and setup the DruidPlanner

For example here is how you can start the spark-shell

bin/spark-shell \
--packages com.databricks:spark-csv_2.10:1.1.0,SparklineData:spark-datetime:0.0.2,SparklineData:spark-druid-olap:0.0.2

In the shell issue the following setup statements: these register the Sparklinedata datetime functions and also register the Rewrite rules of the Sparklinedata DruidPlanner with the Spark SQLContext.

import org.sparklinedata.spark.dateTime.Functions._
import org.apache.spark.sql.sources.druid.DruidPlanner

register(sqlContext)
DruidPlanner(sqlContext)

Use Case 1: Flattened Dataset Queries in the Spark Shell

Setup the Datasets in Spark:

  • you need to change the following to point to your folder for the orderLineItemPartSupplierCustomer flattened dataset.
sql("""
CREATE TEMPORARY TABLE orderLineItemPartSupplierBase(o_orderkey integer,
             o_custkey integer,
      o_orderstatus string, o_totalprice double, o_orderdate string, o_orderpriority string,
      o_clerk string,
      o_shippriority integer, o_comment string, l_partkey integer, l_suppkey integer,
      l_linenumber integer,
      l_quantity double, l_extendedprice double, l_discount double, l_tax double,
      l_returnflag string,
      l_linestatus string, l_shipdate string, l_commitdate string, l_receiptdate string,
      l_shipinstruct string,
      l_shipmode string, l_comment string, order_year string, ps_partkey integer,
      ps_suppkey integer,
      ps_availqty integer, ps_supplycost double, ps_comment string, s_name string, s_address string,
      s_phone string, s_acctbal double, s_comment string, s_nation string,
      s_region string, p_name string,
      p_mfgr string, p_brand string, p_type string, p_size integer, p_container string,
      p_retailprice double,
      p_comment string, c_name string , c_address string , c_phone string , c_acctbal double ,
      c_mktsegment string , c_comment string , c_nation string , c_region string)
      USING com.databricks.spark.csv
      OPTIONS (path "<location of orderLineItemPartSupplierCustomer>",
      header "false", delimiter "|")
""".stripMargin
)

// now create the orderLineItemPartSupplier DataFrame that links the raw data with the Druid Index
sql("""
CREATE TEMPORARY TABLE orderLineItemPartSupplier
      USING org.sparklinedata.druid
      OPTIONS (sourceDataframe "orderLineItemPartSupplierBase",
      timeDimensionColumn "l_shipdate",
      druidDatasource "tpch",
      druidHost "localhost",
      druidPort "8082",
      columnMapping '{  "l_quantity" : "sum_l_quantity",  "ps_availqty" : "sum_ps_availqty",  "cn_name" : "c_nation",  "cr_name" : "c_region",   "sn_name" : "s_nation",  "sr_name" : "s_region" }     ',
      functionalDependencies '[   {"col1" : "c_name", "col2" : "c_address", "type" : "1-1"},   {"col1" : "c_phone", "col2" : "c_address", "type" : "1-1"},   {"col1" : "c_name", "col2" : "c_mktsegment", "type" : "n-1"},   {"col1" : "c_name", "col2" : "c_comment", "type" : "1-1"},   {"col1" : "c_name", "col2" : "c_nation", "type" : "n-1"},   {"col1" : "c_nation", "col2" : "c_region", "type" : "n-1"} ]     ',
      starSchema ' {   "factTable" : "orderLineItemPartSupplier",   "relations" : []  }     ')
""".stripMargin
)

Now you can run queries against orderLineItemPartSupplier which will be rewritten to use the Druid Index. For example:

sql("""
select l_returnflag as r, l_linestatus as ls, 
  count(*), sum(l_extendedprice) as s, max(ps_supplycost) as m, avg(ps_availqty) as a  
from orderLineItemPartSupplier 
group by l_returnflag, l_linestatus 
order by s, ls, r 
limit 3""".stripMargin
).show()

You can compare this query against the non rewritten query against the raw flattened table:

sql("""
select l_returnflag as r, l_linestatus as ls, 
  count(*), sum(l_extendedprice) as s, max(ps_supplycost) as m, avg(ps_availqty) as a  
from orderLineItemPartSupplierBase
group by l_returnflag, l_linestatus 
order by s, ls, r 
limit 3""".stripMargin
).show()

Use Case 2: Star Schema Queries in the Spark Shell

Setup the TPCH tables

  • set the TPCH_BASE_DIR appropriately. In the following we have it at ~/tpch/datascale1
val TPCH_BASE_DIR = sys.env("HOME") + "/tpch/datascale1"

def tpchDataFolder(tableName : String) = s"$TPCH_BASE_DIR/$tableName/"

sql(s"""CREATE TEMPORARY TABLE lineitembase(l_orderkey integer,
        l_partkey integer, l_suppkey integer,
      l_linenumber integer,
      l_quantity double, l_extendedprice double, l_discount double, l_tax double,
      l_returnflag string,
      l_linestatus string, l_shipdate string, l_commitdate string, l_receiptdate string,
      l_shipinstruct string,
      l_shipmode string, l_comment string)
      USING com.databricks.spark.csv
      OPTIONS (path "${tpchDataFolder("lineitem")}",
      header "false", delimiter "|")""".stripMargin)

    sql(s"""CREATE TEMPORARY TABLE orders(
           |o_orderkey integer, o_custkey integer,
           |    o_orderstatus VARCHAR(1),
           |    o_totalprice double,
           |    o_orderdate string,
           |    o_orderpriority VARCHAR(15),
           |    o_clerk VARCHAR(15),
           |    o_shippriority integer,
           |    o_comment VARCHAR(79)
    )
      USING com.databricks.spark.csv
      OPTIONS (path "${tpchDataFolder("orders")}",
      header "false", delimiter "|")""".stripMargin)

    sql(s"""CREATE TEMPORARY TABLE partsupp(
           | ps_partkey integer, ps_suppkey integer,
           |    ps_availqty integer, ps_supplycost double,
           |    ps_comment VARCHAR(199)
    )
      USING com.databricks.spark.csv
      OPTIONS (path "${tpchDataFolder("partsupp")}",
      header "false", delimiter "|")""".stripMargin)

    sql(s"""CREATE TEMPORARY TABLE supplier(
             s_suppkey integer, s_name string, s_address string,
             s_nationkey integer,
           |      s_phone string, s_acctbal double, s_comment string)
      USING com.databricks.spark.csv
      OPTIONS (path "${tpchDataFolder("supplier")}",
      header "false", delimiter "|")""".stripMargin)

    sql(s"""CREATE TEMPORARY TABLE part(p_partkey integer, p_name string,
           |      p_mfgr string, p_brand string, p_type string, p_size integer, p_container string,
           |      p_retailprice double,
           |      p_comment string)
      USING com.databricks.spark.csv
      OPTIONS (path "${tpchDataFolder("part")}",
      header "false", delimiter "|")""".stripMargin)

    sql(s"""CREATE TEMPORARY TABLE customer(
           | c_custkey INTEGER,
           |    c_name VARCHAR(25),
           |    c_address VARCHAR(40),
           |    c_nationkey INTEGER,
           |    c_phone VARCHAR(15),
           |    c_acctbal double,
           |    c_mktsegment VARCHAR(10),
           |    c_comment VARCHAR(117)
           |)
      USING com.databricks.spark.csv
      OPTIONS (path "${tpchDataFolder("customer")}",
      header "false", delimiter "|")""".stripMargin)

    sql(s"""CREATE TEMPORARY TABLE custnation(
           | cn_nationkey integer, cn_name VARCHAR(25),
           |    cn_regionkey integer, cn_comment VARCHAR(152)
           |)
      USING com.databricks.spark.csv
      OPTIONS (path "${tpchDataFolder("nation")}",
      header "false", delimiter "|")""".stripMargin)

    sql(s"""CREATE TEMPORARY TABLE custregion(
           | cr_regionkey integer, cr_name VARCHAR(25),
           |    cr_comment VARCHAR(152)
           |)
      USING com.databricks.spark.csv
      OPTIONS (path "${tpchDataFolder("region")}",
      header "false", delimiter "|")""".stripMargin)

    sql(s"""CREATE TEMPORARY TABLE suppnation(
           | sn_nationkey integer, sn_name VARCHAR(25),
           |    sn_regionkey integer, sn_comment VARCHAR(152)
           |)
      USING com.databricks.spark.csv
      OPTIONS (path "${tpchDataFolder("nation")}",
      header "false", delimiter "|")""".stripMargin)

    sql(s"""CREATE TEMPORARY TABLE suppregion(
           | sr_regionkey integer, sr_name VARCHAR(25),
           |    sr_comment VARCHAR(152)
           |)
      USING com.databricks.spark.csv
      OPTIONS (path "${tpchDataFolder("region")}",
      header "false", delimiter "|")""".stripMargin)

Setup the LineItem Sparkline Druid DataFrame

This links the lineitembase DataFrame with the Druid Index and we also define the Star Schema(column relationships) details. The Star Schema Definition page describes how to define a Star-Schema.

sql("""
CREATE TEMPORARY TABLE lineitem
      USING org.sparklinedata.druid
      OPTIONS (sourceDataframe "lineItemBase",
      timeDimensionColumn "l_shipdate",
      druidDatasource "tpch",
      druidHost "localhost",
      druidPort "8082",
      columnMapping '{  "l_quantity" : "sum_l_quantity",  "ps_availqty" : "sum_ps_availqty",  "cn_name" : "c_nation",  "cr_name" : "c_region",   "sn_name" : "s_nation",  "sr_name" : "s_region" }     ',
      functionalDependencies '[   {"col1" : "c_name", "col2" : "c_address", "type" : "1-1"},   {"col1" : "c_phone", "col2" : "c_address", "type" : "1-1"},   {"col1" : "c_name", "col2" : "c_mktsegment", "type" : "n-1"},   {"col1" : "c_name", "col2" : "c_comment", "type" : "1-1"},   {"col1" : "c_name", "col2" : "c_nation", "type" : "n-1"},   {"col1" : "c_nation", "col2" : "c_region", "type" : "n-1"} ]     ',
      starSchema ' {   "factTable" : "lineitem",   "relations" : [ {     "leftTable" : "lineitem",     "rightTable" : "orders",     "relationType" : "n-1",     "joinCondition" : [ {       "leftAttribute" : "l_orderkey",       "rightAttribute" : "o_orderkey"     } ]   }, {     "leftTable" : "lineitem",     "rightTable" : "partsupp",     "relationType" : "n-1",     "joinCondition" : [ {       "leftAttribute" : "l_partkey",       "rightAttribute" : "ps_partkey"     }, {       "leftAttribute" : "l_suppkey",       "rightAttribute" : "ps_suppkey"     } ]   }, {     "leftTable" : "partsupp",     "rightTable" : "part",     "relationType" : "n-1",     "joinCondition" : [ {       "leftAttribute" : "ps_partkey",       "rightAttribute" : "p_partkey"     } ]   }, {     "leftTable" : "partsupp",     "rightTable" : "supplier",     "relationType" : "n-1",     "joinCondition" : [ {       "leftAttribute" : "ps_suppkey",       "rightAttribute" : "s_suppkey"     } ]   }, {     "leftTable" : "orders",     "rightTable" : "customer",     "relationType" : "n-1",     "joinCondition" : [ {       "leftAttribute" : "o_custkey",       "rightAttribute" : "c_custkey"     } ]   }, {     "leftTable" : "customer",     "rightTable" : "custnation",     "relationType" : "n-1",     "joinCondition" : [ {       "leftAttribute" : "c_nationkey",       "rightAttribute" : "cn_nationkey"     } ]   }, {     "leftTable" : "custnation",     "rightTable" : "custregion",     "relationType" : "n-1",     "joinCondition" : [ {       "leftAttribute" : "cn_regionkey",       "rightAttribute" : "cr_regionkey"     } ]   }, {     "leftTable" : "supplier",     "rightTable" : "suppnation",     "relationType" : "n-1",     "joinCondition" : [ {       "leftAttribute" : "s_nationkey",       "rightAttribute" : "sn_nationkey"     } ]   }, {     "leftTable" : "suppnation",     "rightTable" : "suppregion",     "relationType" : "n-1",     "joinCondition" : [ {       "leftAttribute" : "sn_regionkey",       "rightAttribute" : "sr_regionkey"     } ]   } ] }   ')
""".stripMargin
)

Now you can run queries against star schema which will be rewritten to use the Druid Index. For example the following is TPCH Q3:

sql("""

      select
      o_orderkey,
      sum(l_extendedprice) as price, o_orderdate,
      o_shippriority
      from customer,
orders,
lineitem
      where c_mktsegment = 'BUILDING' and dateIsBefore(dateTime(`o_orderdate`),dateTime("1995-03-15")) and dateIsAfter(dateTime(`l_shipdate`),dateTime("1995-03-15"))
and c_custkey = o_custkey
and l_orderkey = o_orderkey
      group by o_orderkey,
      o_orderdate,
      o_shippriority
""".stripMargin
).show()

Compare the above against TPCH Q3 running against the original lineitem DataFrame:

sql("""

      select
      o_orderkey,
      sum(l_extendedprice) as price, o_orderdate,
      o_shippriority
      from customer,
orders,
lineitembase
      where c_mktsegment = 'BUILDING' and dateIsBefore(dateTime(`o_orderdate`),dateTime("1995-03-15")) and dateIsAfter(dateTime(`l_shipdate`),dateTime("1995-03-15"))
and c_custkey = o_custkey
and l_orderkey = o_orderkey
      group by o_orderkey,
      o_orderdate,
      o_shippriority
""".stripMargin
).show()
Clone this wiki locally