Skip to content
This repository has been archived by the owner on Mar 30, 2021. It is now read-only.

Query Plan Transformations and Optimizations

hbutani edited this page Jul 19, 2016 · 1 revision

Query Rewrite Patterns

We support the acceleration of the following Query Patterns:

Group By Queries

These are typical Slice-and-Dice queries, a Group By on a set of Group Expressions with optional dimensional filters, a optional having, order and limit clauses on top of the Group By. These are translated to a Druid Group-By query(under certain conditions the Group By is optimized to a Druid TimeSeries or TopN query). The results of the Druid Query are bridged into the Spark Plan via the DruidRDD instance, a Projection on top of the DruidRDD handles any datatype translations, and value projections.

For example:

SELECT f, s, Count(*) AS count_order 
FROM   (SELECT l_returnflag AS f, 
               l_linestatus AS s, 
               l_shipdate, 
               s_region, 
               s_nation, 
               c_nation, 
               p_type 
        FROM   orderlineitempartsupplier) t 
WHERE  Dateisbeforeorequal(Datetime(`l_shipdate`), 
              Dateminus(Datetime("1997-12-01"), Period("p90d"))) 
       AND ( ( s_nation = 'FRANCE' 
               AND c_nation = 'GERMANY' ) 
              OR ( c_nation = 'FRANCE' 
                   AND s_nation = 'GERMANY' ) ) 
       AND p_type = 'ECONOMY ANODIZED STEEL' 
GROUP  BY f, 
          s 
ORDER  BY f, 
          s

The optimized logical plan for this query is:

Sort [f#318 ASC,s#319 ASC], true
+- Aggregate [f#318,s#319], [f#318,s#319,(count(1),mode=Complete,isDistinct=false) AS count_order#320L]
   +- Project [l_returnflag#337 AS f#318,l_linestatus#338 AS s#319]
      +- Filter ((UDF(UDF(l_shipdate#339),UDF(UDF(1997-12-01),UDF(P90D))) && (((s_nation#356 = FRANCE) && (c_nation#372 = GERMANY)) || ((c_nation#372 = FRANCE) && (s_nation#356 = GERMANY)))) && (p_type#361 = ECONOMY ANODIZED STEEL))
         +- Relation[o_orderkey#321,o_custkey#322,o_orderstatus#323,o_totalprice#324,o_orderdate#325,o_orderpriority#326,o_clerk#327,o_shippriority#328,o_comment#329,l_partkey#330,l_suppkey#331,l_linenumber#332,l_quantity#333,l_extendedprice#334,l_discount#335,l_tax#336,l_returnflag#337,l_linestatus#338,l_shipdate#339,l_commitdate#340,l_receiptdate#341,l_shipinstruct#342,l_shipmode#343,l_comment#344,order_year#345,ps_partkey#346,ps_suppkey#347,ps_availqty#348,ps_supplycost#349,ps_comment#350,s_name#351,s_address#352,s_phone#353,s_acctbal#354,s_comment#355,s_nation#356,s_region#357,p_name#358,p_mfgr#359,p_brand#360,p_type#361,p_size#362,p_container#363,p_retailprice#364,p_comment#365,c_name#366,c_address#367,c_phone#368,c_acctbal#369,c_mktsegment#370,c_comment#371,c_nation#372,c_region#373] DruidRelationInfo(fullName = DruidRelationName(orderLineItemPartSupplier,localhost,tpch), sourceDFName = orderLineItemPartSupplierBase,
timeDimensionCol = l_shipdate,
options = DruidRelationOptions(1000000,100000,true,true,true,30000,true,/druid,false,true,2147483647,Some(1)))

A typical Scan -> Filter -> Project -> Aggregate -> Sort query pattern. The physical plan or this query is shown below. The Filter, Aggregate and Sort operations are pushed to Druid as a GroupBy query.

Project [f#318,s#319,alias-1#375L AS count_order#320L]
+- Scan DruidQuery(2137263483): {
  "q" : {
    "jsonClass" : "GroupByQuerySpec",
    "queryType" : "groupBy",
    "dataSource" : "tpch",
    "dimensions" : [ {
      "jsonClass" : "DefaultDimensionSpec",
      "type" : "default",
      "dimension" : "l_returnflag",
      "outputName" : "f"
    }, {
      "jsonClass" : "DefaultDimensionSpec",
      "type" : "default",
      "dimension" : "l_linestatus",
      "outputName" : "s"
    } ],
    "limitSpec" : {
      "jsonClass" : "LimitSpec",
      "type" : "default",
      "limit" : 2147483647,
      "columns" : [ {
        "jsonClass" : "OrderByColumnSpec",
        "dimension" : "f",
        "direction" : "ascending"
      }, {
        "jsonClass" : "OrderByColumnSpec",
        "dimension" : "s",
        "direction" : "ascending"
      } ]
    },
    "granularity" : "all",
    "filter" : {
      "jsonClass" : "LogicalFilterSpec",
      "type" : "and",
      "fields" : [ {
        "jsonClass" : "LogicalFilterSpec",
        "type" : "or",
        "fields" : [ {
          "jsonClass" : "LogicalFilterSpec",
          "type" : "and",
          "fields" : [ {
            "jsonClass" : "SelectorFilterSpec",
            "type" : "selector",
            "dimension" : "s_nation",
            "value" : "FRANCE"
          }, {
            "jsonClass" : "SelectorFilterSpec",
            "type" : "selector",
            "dimension" : "c_nation",
            "value" : "GERMANY"
          } ]
        }, {
          "jsonClass" : "LogicalFilterSpec",
          "type" : "and",
          "fields" : [ {
            "jsonClass" : "SelectorFilterSpec",
            "type" : "selector",
            "dimension" : "c_nation",
            "value" : "FRANCE"
          }, {
            "jsonClass" : "SelectorFilterSpec",
            "type" : "selector",
            "dimension" : "s_nation",
            "value" : "GERMANY"
          } ]
        } ]
      }, {
        "jsonClass" : "SelectorFilterSpec",
        "type" : "selector",
        "dimension" : "p_type",
        "value" : "ECONOMY ANODIZED STEEL"
      } ]
    },
    "aggregations" : [ {
      "jsonClass" : "FunctionAggregationSpec",
      "type" : "longSum",
      "name" : "alias-1",
      "fieldName" : "count"
    } ],
    "intervals" : [ "1993-01-01T00:00:00.000Z/1997-09-02T00:00:00.001Z" ]
  },
  "queryHistoricalServer" : false,
  "numSegmentsPerQuery" : -1,
  "intervalSplits" : [ {
    "start" : 725846400000,
    "end" : 873158400001
  } ],
  "outputAttrSpec" : [ {
    "exprId" : {
      "id" : 318,
      "jvmId" : { }
    },
    "name" : "f",
    "dataType" : { },
    "tf" : "toString"
  }, {
    "exprId" : {
      "id" : 319,
      "jvmId" : { }
    },
    "name" : "s",
    "dataType" : { },
    "tf" : "toString"
  }, {
    "exprId" : {
      "id" : 375,
      "jvmId" : { }
    },
    "name" : "alias-1",
    "dataType" : { },
    "tf" : "toLong"
  } ]
}[f#318,s#319,alias-1#375L] 

Cube/Rollup Queries

Grouping Set queries like Cube and Rollups are executed as a set of Druid Queries, one for each Grouping Set. A Union Operator is used to combine the output of all the Druid Queries.

For example

select l_returnflag, l_linestatus, grouping__id,
       count(*), sum(l_extendedprice) as s 
from orderLineItemPartSupplier 
where s_nation = 'FRANCE'
group by l_returnflag, l_linestatus with rollup

The optimized logical plan for this query is:

Aggregate [l_returnflag#374,l_linestatus#375,grouping__id#373], [l_returnflag#374,l_linestatus#375,grouping__id#373,(count(1),mode=Complete,isDistinct=false) AS _c3#372L,(sum(l_extendedprice#332),mode=Complete,isDistinct=false) AS s#318]
+- Project [l_returnflag#374,l_linestatus#375,grouping__id#373,l_extendedprice#332]
   +- Expand [List(o_orderkey#319, o_custkey#320, o_orderstatus#321, o_totalprice#322, o_orderdate#323, o_orderpriority#324, o_clerk#325, o_shippriority#326, o_comment#327, l_partkey#328, l_suppkey#329, l_linenumber#330, l_quantity#331, l_extendedprice#332, l_discount#333, l_tax#334, l_returnflag#335, l_linestatus#336, l_shipdate#337, l_commitdate#338, l_receiptdate#339, l_shipinstruct#340, l_shipmode#341, l_comment#342, order_year#343, ps_partkey#344, ps_suppkey#345, ps_availqty#346, ps_supplycost#347, ps_comment#348, s_name#349, s_address#350, s_phone#351, s_acctbal#352, s_comment#353, s_nation#354, s_region#355, p_name#356, p_mfgr#357, p_brand#358, p_type#359, p_size#360, p_container#361, p_retailprice#362, p_comment#363, c_name#364, c_address#365, c_phone#366, c_acctbal#367, c_mktsegment#368, c_comment#369, c_nation#370, c_region#371, null, null, 0),List(o_orderkey#319, o_custkey#320, o_orderstatus#321, o_totalprice#322, o_orderdate#323, o_orderpriority#324, o_clerk#325, o_shippriority#326, o_comment#327, l_partkey#328, l_suppkey#329, l_linenumber#330, l_quantity#331, l_extendedprice#332, l_discount#333, l_tax#334, l_returnflag#335, l_linestatus#336, l_shipdate#337, l_commitdate#338, l_receiptdate#339, l_shipinstruct#340, l_shipmode#341, l_comment#342, order_year#343, ps_partkey#344, ps_suppkey#345, ps_availqty#346, ps_supplycost#347, ps_comment#348, s_name#349, s_address#350, s_phone#351, s_acctbal#352, s_comment#353, s_nation#354, s_region#355, p_name#356, p_mfgr#357, p_brand#358, p_type#359, p_size#360, p_container#361, p_retailprice#362, p_comment#363, c_name#364, c_address#365, c_phone#366, c_acctbal#367, c_mktsegment#368, c_comment#369, c_nation#370, c_region#371, l_returnflag#374, null, 1),List(o_orderkey#319, o_custkey#320, o_orderstatus#321, o_totalprice#322, o_orderdate#323, o_orderpriority#324, o_clerk#325, o_shippriority#326, o_comment#327, l_partkey#328, l_suppkey#329, l_linenumber#330, l_quantity#331, l_extendedprice#332, l_discount#333, l_tax#334, l_returnflag#335, l_linestatus#336, l_shipdate#337, l_commitdate#338, l_receiptdate#339, l_shipinstruct#340, l_shipmode#341, l_comment#342, order_year#343, ps_partkey#344, ps_suppkey#345, ps_availqty#346, ps_supplycost#347, ps_comment#348, s_name#349, s_address#350, s_phone#351, s_acctbal#352, s_comment#353, s_nation#354, s_region#355, p_name#356, p_mfgr#357, p_brand#358, p_type#359, p_size#360, p_container#361, p_retailprice#362, p_comment#363, c_name#364, c_address#365, c_phone#366, c_acctbal#367, c_mktsegment#368, c_comment#369, c_nation#370, c_region#371, l_returnflag#374, l_linestatus#375, 3)], [o_orderkey#319,o_custkey#320,o_orderstatus#321,o_totalprice#322,o_orderdate#323,o_orderpriority#324,o_clerk#325,o_shippriority#326,o_comment#327,l_partkey#328,l_suppkey#329,l_linenumber#330,l_quantity#331,l_extendedprice#332,l_discount#333,l_tax#334,l_returnflag#335,l_linestatus#336,l_shipdate#337,l_commitdate#338,l_receiptdate#339,l_shipinstruct#340,l_shipmode#341,l_comment#342,order_year#343,ps_partkey#344,ps_suppkey#345,ps_availqty#346,ps_supplycost#347,ps_comment#348,s_name#349,s_address#350,s_phone#351,s_acctbal#352,s_comment#353,s_nation#354,s_region#355,p_name#356,p_mfgr#357,p_brand#358,p_type#359,p_size#360,p_container#361,p_retailprice#362,p_comment#363,c_name#364,c_address#365,c_phone#366,c_acctbal#367,c_mktsegment#368,c_comment#369,c_nation#370,c_region#371,l_returnflag#374,l_linestatus#375,grouping__id#373]
      +- Project [o_orderkey#319,o_custkey#320,o_orderstatus#321,o_totalprice#322,o_orderdate#323,o_orderpriority#324,o_clerk#325,o_shippriority#326,o_comment#327,l_partkey#328,l_suppkey#329,l_linenumber#330,l_quantity#331,l_extendedprice#332,l_discount#333,l_tax#334,l_returnflag#335,l_linestatus#336,l_shipdate#337,l_commitdate#338,l_receiptdate#339,l_shipinstruct#340,l_shipmode#341,l_comment#342,order_year#343,ps_partkey#344,ps_suppkey#345,ps_availqty#346,ps_supplycost#347,ps_comment#348,s_name#349,s_address#350,s_phone#351,s_acctbal#352,s_comment#353,s_nation#354,s_region#355,p_name#356,p_mfgr#357,p_brand#358,p_type#359,p_size#360,p_container#361,p_retailprice#362,p_comment#363,c_name#364,c_address#365,c_phone#366,c_acctbal#367,c_mktsegment#368,c_comment#369,c_nation#370,c_region#371,l_returnflag#335 AS l_returnflag#374,l_linestatus#336 AS l_linestatus#375]
         +- Filter (s_nation#354 = FRANCE)
            +- Relation[o_orderkey#319,o_custkey#320,o_orderstatus#321,o_totalprice#322,o_orderdate#323,o_orderpriority#324,o_clerk#325,o_shippriority#326,o_comment#327,l_partkey#328,l_suppkey#329,l_linenumber#330,l_quantity#331,l_extendedprice#332,l_discount#333,l_tax#334,l_returnflag#335,l_linestatus#336,l_shipdate#337,l_commitdate#338,l_receiptdate#339,l_shipinstruct#340,l_shipmode#341,l_comment#342,order_year#343,ps_partkey#344,ps_suppkey#345,ps_availqty#346,ps_supplycost#347,ps_comment#348,s_name#349,s_address#350,s_phone#351,s_acctbal#352,s_comment#353,s_nation#354,s_region#355,p_name#356,p_mfgr#357,p_brand#358,p_type#359,p_size#360,p_container#361,p_retailprice#362,p_comment#363,c_name#364,c_address#365,c_phone#366,c_acctbal#367,c_mktsegment#368,c_comment#369,c_nation#370,c_region#371] DruidRelationInfo(fullName = DruidRelationName(orderLineItemPartSupplier,localhost,tpch), sourceDFName = orderLineItemPartSupplierBase,
timeDimensionCol = l_shipdate,
options = DruidRelationOptions(1000000,100000,true,true,true,30000,true,/druid,false,true,2147483647,Some(1)))

The physical plan or this query is shown below.

Union
:- Project [null AS l_returnflag#374,null AS l_linestatus#375,0 AS grouping__id#373,alias-1#376L AS _c3#372L,alias-2#377 AS s#318]
:  +- Scan DruidQuery(63544302): {
  "q" : {
    "jsonClass" : "TimeSeriesQuerySpec",
    "queryType" : "timeseries",
    "dataSource" : "tpch",
    "intervals" : [ "1993-01-01T00:00:00.000Z/1997-12-31T00:00:01.000Z" ],
    "granularity" : "all",
    "filter" : {
      "jsonClass" : "SelectorFilterSpec",
      "type" : "selector",
      "dimension" : "s_nation",
      "value" : "FRANCE"
    },
    "aggregations" : [ {
      "jsonClass" : "FunctionAggregationSpec",
      "type" : "longSum",
      "name" : "alias-1",
      "fieldName" : "count"
    }, {
      "jsonClass" : "FunctionAggregationSpec",
      "type" : "doubleSum",
      "name" : "alias-2",
      "fieldName" : "l_extendedprice"
    } ]
  },
  "queryHistoricalServer" : false,
  "numSegmentsPerQuery" : -1,
  "intervalSplits" : [ {
    "start" : 725846400000,
    "end" : 883526401000
  } ],
  "outputAttrSpec" : [ {
    "exprId" : {
      "id" : 376,
      "jvmId" : { }
    },
    "name" : "alias-1",
    "dataType" : { },
    "tf" : "toLong"
  }, {
    "exprId" : {
      "id" : 377,
      "jvmId" : { }
    },
    "name" : "alias-2",
    "dataType" : { },
    "tf" : ""
  } ]
}[alias-1#376L,alias-2#377] 
:- Project [l_returnflag#374,null AS l_linestatus#375,1 AS grouping__id#373,alias-3#378L AS _c3#372L,alias-4#379 AS s#318]
:  +- Scan DruidQuery(72665178): {
  "q" : {
    "jsonClass" : "GroupByQuerySpec",
    "queryType" : "groupBy",
    "dataSource" : "tpch",
    "dimensions" : [ {
      "jsonClass" : "DefaultDimensionSpec",
      "type" : "default",
      "dimension" : "l_returnflag",
      "outputName" : "l_returnflag"
    } ],
    "granularity" : "all",
    "filter" : {
      "jsonClass" : "SelectorFilterSpec",
      "type" : "selector",
      "dimension" : "s_nation",
      "value" : "FRANCE"
    },
    "aggregations" : [ {
      "jsonClass" : "FunctionAggregationSpec",
      "type" : "longSum",
      "name" : "alias-3",
      "fieldName" : "count"
    }, {
      "jsonClass" : "FunctionAggregationSpec",
      "type" : "doubleSum",
      "name" : "alias-4",
      "fieldName" : "l_extendedprice"
    } ],
    "intervals" : [ "1993-01-01T00:00:00.000Z/1997-12-31T00:00:01.000Z" ]
  },
  "queryHistoricalServer" : false,
  "numSegmentsPerQuery" : -1,
  "intervalSplits" : [ {
    "start" : 725846400000,
    "end" : 883526401000
  } ],
  "outputAttrSpec" : [ {
    "exprId" : {
      "id" : 374,
      "jvmId" : { }
    },
    "name" : "l_returnflag",
    "dataType" : { },
    "tf" : "toString"
  }, {
    "exprId" : {
      "id" : 378,
      "jvmId" : { }
    },
    "name" : "alias-3",
    "dataType" : { },
    "tf" : "toLong"
  }, {
    "exprId" : {
      "id" : 379,
      "jvmId" : { }
    },
    "name" : "alias-4",
    "dataType" : { },
    "tf" : ""
  } ]
}[l_returnflag#374,alias-3#378L,alias-4#379] 
+- Project [l_returnflag#374,l_linestatus#375,3 AS grouping__id#373,alias-5#380L AS _c3#372L,alias-6#381 AS s#318]
   +- Scan DruidQuery(1269235678): {
  "q" : {
    "jsonClass" : "GroupByQuerySpec",
    "queryType" : "groupBy",
    "dataSource" : "tpch",
    "dimensions" : [ {
      "jsonClass" : "DefaultDimensionSpec",
      "type" : "default",
      "dimension" : "l_returnflag",
      "outputName" : "l_returnflag"
    }, {
      "jsonClass" : "DefaultDimensionSpec",
      "type" : "default",
      "dimension" : "l_linestatus",
      "outputName" : "l_linestatus"
    } ],
    "granularity" : "all",
    "filter" : {
      "jsonClass" : "SelectorFilterSpec",
      "type" : "selector",
      "dimension" : "s_nation",
      "value" : "FRANCE"
    },
    "aggregations" : [ {
      "jsonClass" : "FunctionAggregationSpec",
      "type" : "longSum",
      "name" : "alias-5",
      "fieldName" : "count"
    }, {
      "jsonClass" : "FunctionAggregationSpec",
      "type" : "doubleSum",
      "name" : "alias-6",
      "fieldName" : "l_extendedprice"
    } ],
    "intervals" : [ "1993-01-01T00:00:00.000Z/1997-12-31T00:00:01.000Z" ]
  },
  "queryHistoricalServer" : false,
  "numSegmentsPerQuery" : -1,
  "intervalSplits" : [ {
    "start" : 725846400000,
    "end" : 883526401000
  } ],
  "outputAttrSpec" : [ {
    "exprId" : {
      "id" : 374,
      "jvmId" : { }
    },
    "name" : "l_returnflag",
    "dataType" : { },
    "tf" : "toString"
  }, {
    "exprId" : {
      "id" : 375,
      "jvmId" : { }
    },
    "name" : "l_linestatus",
    "dataType" : { },
    "tf" : "toString"
  }, {
    "exprId" : {
      "id" : 380,
      "jvmId" : { }
    },
    "name" : "alias-5",
    "dataType" : { },
    "tf" : "toLong"
  }, {
    "exprId" : {
      "id" : 381,
      "jvmId" : { }
    },
    "name" : "alias-6",
    "dataType" : { },
    "tf" : ""
  } ]
}[l_returnflag#374,l_linestatus#375,alias-5#380L,alias-6#381] 

Star Join Queries

Queries that involve joining 1 or more Dimension tables with a Fact table where all the dimension and fact columns are part of the Druid Index are executed as a single Druid Query.

For example for the TPCH use case all the dimensions and measures are indexed. Consider the TPCH Query 3:

select
      o_orderkey,
      sum(l_extendedprice) as price, o_orderdate,
      o_shippriority
      from customer,
orders,
lineitem
      where c_mktsegment = 'BUILDING' and 
            dateIsBefore(dateTime(`o_orderdate`),dateTime("1995-03-15")) and 
            dateIsAfter(dateTime(`l_shipdate`),dateTime("1995-03-15"))
and c_custkey = o_custkey
and l_orderkey = o_orderkey
      group by o_orderkey,
      o_orderdate,
      o_shippriority

The optimized Logical Plan joins Customer, Orders and LineItem tables and then performs an Aggregation.

Aggregate [o_orderkey#414,o_orderdate#418,o_shippriority#421], [o_orderkey#414,(sum(l_extendedprice#732),mode=Complete,isDistinct=false) AS price#726,o_orderdate#418,o_shippriority#421]
+- Project [o_orderkey#414,o_orderdate#418,o_shippriority#421,l_extendedprice#732]
   +- Join Inner, Some((l_orderkey#727 = o_orderkey#414))
      :- Project [o_orderkey#414,o_orderdate#418,o_shippriority#421]
      :  +- Join Inner, Some((c_custkey#594 = o_custkey#415))
      :     :- Project [c_custkey#594]
      :     :  +- Filter (c_mktsegment#600 = BUILDING)
      :     :     +- InMemoryRelation [c_custkey#594,c_name#595,c_address#596,c_nationkey#597,c_phone#598,c_acctbal#599,c_mktsegment#600,c_comment#601], true, 10000, StorageLevel(true, true, false, true, 1), ConvertToUnsafe, None
      :     +- Project [o_orderkey#414,o_orderdate#418,o_shippriority#421,o_custkey#415]
      :        +- Filter UDF(UDF(o_orderdate#418),UDF(1995-03-15))
      :           +- InMemoryRelation [o_orderkey#414,o_custkey#415,o_orderstatus#416,o_totalprice#417,o_orderdate#418,o_orderpriority#419,o_clerk#420,o_shippriority#421,o_comment#422], true, 10000, StorageLevel(true, true, false, true, 1), ConvertToUnsafe, None
      +- Project [l_extendedprice#732,l_orderkey#727]
         +- Filter UDF(UDF(l_shipdate#737),UDF(1995-03-15))
            +- Relation[l_orderkey#727,l_partkey#728,l_suppkey#729,l_linenumber#730,l_quantity#731,l_extendedprice#732,l_discount#733,l_tax#734,l_returnflag#735,l_linestatus#736,l_shipdate#737,l_commitdate#738,l_receiptdate#739,l_shipinstruct#740,l_shipmode#741,l_comment#742] DruidRelationInfo(fullName = DruidRelationName(lineitem,localhost,tpch), sourceDFName = lineItemBase,
timeDimensionCol = l_shipdate,
options = DruidRelationOptions(1000000,100000,true,true,true,30000,true,/druid,false,true,2147483647,Some(1)))

The physical plan or this query is shown below. The Joins, and Aggregate operations are pushed to Druid as a single GroupBy query.

Project [cast(o_orderkey#414 as int) AS o_orderkey#414,alias-1#828 AS price#726,o_orderdate#418,cast(o_shippriority#421 as int) AS o_shippriority#421]
+- Scan DruidQuery(963099648): {
  "q" : {
    "jsonClass" : "GroupByQuerySpec",
    "queryType" : "groupBy",
    "dataSource" : "tpch",
    "dimensions" : [ {
      "jsonClass" : "DefaultDimensionSpec",
      "type" : "default",
      "dimension" : "o_orderkey",
      "outputName" : "o_orderkey"
    }, {
      "jsonClass" : "DefaultDimensionSpec",
      "type" : "default",
      "dimension" : "o_orderdate",
      "outputName" : "o_orderdate"
    }, {
      "jsonClass" : "DefaultDimensionSpec",
      "type" : "default",
      "dimension" : "o_shippriority",
      "outputName" : "o_shippriority"
    } ],
    "granularity" : "all",
    "filter" : {
      "jsonClass" : "LogicalFilterSpec",
      "type" : "and",
      "fields" : [ {
        "jsonClass" : "SelectorFilterSpec",
        "type" : "selector",
        "dimension" : "c_mktsegment",
        "value" : "BUILDING"
      }, {
        "jsonClass" : "JavascriptFilterSpec",
        "type" : "javascript",
        "dimension" : "o_orderdate",
        "function" : "function(x) { return(x < '1995-03-15T00:00:00.000Z') }"
      } ]
    },
    "aggregations" : [ {
      "jsonClass" : "FunctionAggregationSpec",
      "type" : "doubleSum",
      "name" : "alias-1",
      "fieldName" : "l_extendedprice"
    } ],
    "intervals" : [ "1995-03-15T00:00:00.001Z/1997-12-31T00:00:01.000Z" ]
  },
  "queryHistoricalServer" : false,
  "numSegmentsPerQuery" : -1,
  "intervalSplits" : [ {
    "start" : 795225600001,
    "end" : 883526401000
  } ],
  "outputAttrSpec" : [ {
    "exprId" : {
      "id" : 414,
      "jvmId" : { }
    },
    "name" : "o_orderkey",
    "dataType" : { },
    "tf" : "toString"
  }, {
    "exprId" : {
      "id" : 418,
      "jvmId" : { }
    },
    "name" : "o_orderdate",
    "dataType" : { },
    "tf" : "toString"
  }, {
    "exprId" : {
      "id" : 421,
      "jvmId" : { }
    },
    "name" : "o_shippriority",
    "dataType" : { },
    "tf" : "toString"
  }, {
    "exprId" : {
      "id" : 828,
      "jvmId" : { }
    },
    "name" : "alias-1",
    "dataType" : { },
    "tf" : ""
  } ]
}[o_orderkey#414,o_orderdate#418,o_shippriority#421,alias-1#828] 

Multi-Level Group By using Joins

Unlike the other patterns this involves a Logical transformation. A common analytical pattern is to compare Aggregates between different levels of Aggregation. The unoptimized logical plan for this query performs a Join on the output of the first Aggregation. The Physical Plan for this Logical Plan would only be able to push the inner Aggregate to Druid, and would entail a costly Scan of the Fact table in Spark.

For example the following computes the overall min. linenumber, the total account balance and count and then uses these statistics when computing metrics by linenumber.

select r1.l_linenumber x, sum(r1.l_quantity) y, (mi + 10) as z
from orderLineItemPartSupplier r1 join
       (select min(l_linenumber) mi, sum(c_acctbal) ma, count(1) 
        from orderLineItemPartSupplier
        where not(l_shipdate is null) having count(1) > 0
       ) r2
group by r1.l_linenumber, (mi + 10)
order by x, y, z

the unoptimized plan for this query is:

Sort [x#320 ASC,y#321 ASC,z#322 ASC], true
+- Aggregate [l_linenumber#334,(mi#318 + 10)], [l_linenumber#334 AS x#320,(sum(l_quantity#335),mode=Complete,isDistinct=false) AS y#321,(mi#318 + 10) AS z#322]
   +- Join Inner, None
      :- Subquery r1
      :  +- Subquery orderlineitempartsupplier
      :     +- Relation[o_orderkey#323,o_custkey#324,o_orderstatus#325,o_totalprice#326,o_orderdate#327,o_orderpriority#328,o_clerk#329,o_shippriority#330,o_comment#331,l_partkey#332,l_suppkey#333,l_linenumber#334,l_quantity#335,l_extendedprice#336,l_discount#337,l_tax#338,l_returnflag#339,l_linestatus#340,l_shipdate#341,l_commitdate#342,l_receiptdate#343,l_shipinstruct#344,l_shipmode#345,l_comment#346,order_year#347,ps_partkey#348,ps_suppkey#349,ps_availqty#350,ps_supplycost#351,ps_comment#352,s_name#353,s_address#354,s_phone#355,s_acctbal#356,s_comment#357,s_nation#358,s_region#359,p_name#360,p_mfgr#361,p_brand#362,p_type#363,p_size#364,p_container#365,p_retailprice#366,p_comment#367,c_name#368,c_address#369,c_phone#370,c_acctbal#371,c_mktsegment#372,c_comment#373,c_nation#374,c_region#375] DruidRelationInfo(fullName = DruidRelationName(orderLineItemPartSupplier,localhost,tpch), sourceDFName = orderLineItemPartSupplierBase,
timeDimensionCol = l_shipdate,
options = DruidRelationOptions(1000000,100000,true,true,true,30000,true,/druid,false,true,2147483647,Some(1)))
      +- Subquery r2
         +- Project [mi#318,ma#319,_c2#376L]
            +- Filter havingCondition#377: boolean
               +- Aggregate [cast(((count(1),mode=Complete,isDistinct=false) > cast(0 as bigint)) as boolean) AS havingCondition#377,(min(l_linenumber#334),mode=Complete,isDistinct=false) AS mi#318,(sum(c_acctbal#371),mode=Complete,isDistinct=false) AS ma#319,(count(1),mode=Complete,isDistinct=false) AS _c2#376L]
                  +- Filter NOT isnull(l_shipdate#341)
                     +- Subquery orderlineitempartsupplier
                        +- Relation[o_orderkey#323,o_custkey#324,o_orderstatus#325,o_totalprice#326,o_orderdate#327,o_orderpriority#328,o_clerk#329,o_shippriority#330,o_comment#331,l_partkey#332,l_suppkey#333,l_linenumber#334,l_quantity#335,l_extendedprice#336,l_discount#337,l_tax#338,l_returnflag#339,l_linestatus#340,l_shipdate#341,l_commitdate#342,l_receiptdate#343,l_shipinstruct#344,l_shipmode#345,l_comment#346,order_year#347,ps_partkey#348,ps_suppkey#349,ps_availqty#350,ps_supplycost#351,ps_comment#352,s_name#353,s_address#354,s_phone#355,s_acctbal#356,s_comment#357,s_nation#358,s_region#359,p_name#360,p_mfgr#361,p_brand#362,p_type#363,p_size#364,p_container#365,p_retailprice#366,p_comment#367,c_name#368,c_address#369,c_phone#370,c_acctbal#371,c_mktsegment#372,c_comment#373,c_nation#374,c_region#375] DruidRelationInfo(fullName = DruidRelationName(orderLineItemPartSupplier,localhost,tpch), sourceDFName = orderLineItemPartSupplierBase,
timeDimensionCol = l_shipdate,
options = DruidRelationOptions(1000000,100000,true,true,true,30000,true,/druid,false,true,2147483647,Some(1)))

This Logical Optimization pushes the outer Aggregation below the Join. Now we perform 2 very fast aggregations in Druid followed by a Join of very small aggregated results.

+- Project [x#320,y#321,(mi#318 + 10) AS z#322]
   +- Join Inner, None
      :- Aggregate [l_linenumber#334], [l_linenumber#334 AS x#320,(sum(l_quantity#335),mode=Complete,isDistinct=false) AS y#321]
      :  +- Project [l_linenumber#334,l_quantity#335]
      :     +- Relation[o_orderkey#323,o_custkey#324,o_orderstatus#325,o_totalprice#326,o_orderdate#327,o_orderpriority#328,o_clerk#329,o_shippriority#330,o_comment#331,l_partkey#332,l_suppkey#333,l_linenumber#334,l_quantity#335,l_extendedprice#336,l_discount#337,l_tax#338,l_returnflag#339,l_linestatus#340,l_shipdate#341,l_commitdate#342,l_receiptdate#343,l_shipinstruct#344,l_shipmode#345,l_comment#346,order_year#347,ps_partkey#348,ps_suppkey#349,ps_availqty#350,ps_supplycost#351,ps_comment#352,s_name#353,s_address#354,s_phone#355,s_acctbal#356,s_comment#357,s_nation#358,s_region#359,p_name#360,p_mfgr#361,p_brand#362,p_type#363,p_size#364,p_container#365,p_retailprice#366,p_comment#367,c_name#368,c_address#369,c_phone#370,c_acctbal#371,c_mktsegment#372,c_comment#373,c_nation#374,c_region#375] DruidRelationInfo(fullName = DruidRelationName(orderLineItemPartSupplier,localhost,tpch), sourceDFName = orderLineItemPartSupplierBase,
timeDimensionCol = l_shipdate,
options = DruidRelationOptions(1000000,100000,true,true,true,30000,true,/druid,false,true,2147483647,Some(1)))
      +- Project [mi#318]
         +- Filter havingCondition#377: boolean
            +- Aggregate [((count(1),mode=Complete,isDistinct=false) > 0) AS havingCondition#377,(min(l_linenumber#334),mode=Complete,isDistinct=false) AS mi#318,(sum(c_acctbal#371),mode=Complete,isDistinct=false) AS ma#319,(count(1),mode=Complete,isDistinct=false) AS _c2#376L]
               +- Project [l_linenumber#334,c_acctbal#371]
                  +- Filter NOT isnull(l_shipdate#341)
                     +- Relation[o_orderkey#323,o_custkey#324,o_orderstatus#325,o_totalprice#326,o_orderdate#327,o_orderpriority#328,o_clerk#329,o_shippriority#330,o_comment#331,l_partkey#332,l_suppkey#333,l_linenumber#334,l_quantity#335,l_extendedprice#336,l_discount#337,l_tax#338,l_returnflag#339,l_linestatus#340,l_shipdate#341,l_commitdate#342,l_receiptdate#343,l_shipinstruct#344,l_shipmode#345,l_comment#346,order_year#347,ps_partkey#348,ps_suppkey#349,ps_availqty#350,ps_supplycost#351,ps_comment#352,s_name#353,s_address#354,s_phone#355,s_acctbal#356,s_comment#357,s_nation#358,s_region#359,p_name#360,p_mfgr#361,p_brand#362,p_type#363,p_size#364,p_container#365,p_retailprice#366,p_comment#367,c_name#368,c_address#369,c_phone#370,c_acctbal#371,c_mktsegment#372,c_comment#373,c_nation#374,c_region#375] DruidRelationInfo(fullName = DruidRelationName(orderLineItemPartSupplier,localhost,tpch), sourceDFName = orderLineItemPartSupplierBase,
timeDimensionCol = l_shipdate,
options = DruidRelationOptions(1000000,100000,true,true,true,30000,true,/druid,false,true,2147483647,Some(1)))

Expression Pushdown to Druid

Filter Expressions

Predicates on dimension columns are pushed to Druid as filter specifications. There are 3 types of transformations.

Direct transformations

  • simple comparison expressions of the form name = 'X' or size > 10 are transformed into Druid Select or Bound filters.
  • Logical operators: And, Or, Not are translated to the Druid equivalent operators.
  • In, InSet is transformed to ExtractionFilter in Druid.
  • IsNull is translated to a Selection of the empty string.

Interval transformations

Certain Predicates on the time Dimension are transformed into intervals on the Druid Query:

  • predicates of the form timeDim compOp literal or literal compOp timeDim are converted into an interval on the Druid Query. The literal can be a String, Date or Timestamp.
  • invocation of date comparison functions from the spark-datetime package are also converted to an interval on the Druid Query

Javascript transformations

This is a catch all transformation, it handles expressions that don’t fall into the above 2 categories. As of this writing it handles the following expressions:

  • Attribute references
  • Literals include null values
  • Cast expressions
  • String functions: Concat, Lower, Upper, Substring
  • Case invocations
  • the Coalesce function
  • Comparisons: ~<,<=,=,!=,>,>=, in, not in, like, stringPredicate, ~
  • Date functions: toDate, DateAdd, DateSub, DateDiff, Year, Quarter, Month, DayOfMonth, WeekOfYear, Hour, Minute, Second, FromUnixTime, UnixTimestamp.
  • Arithmetic functions: +, -, *, /, mod, remainder, abs, floor, ceil, log, sqrt, unaryminus, unaryplus

Group By Expressions

Group Expressions contain dimension columns are pushed to Druid as Dimension specifications. There are 3 types of transformations.

Transformation of Attribute references

This is straightforward transformation of a group expression on a dimension column in SQL into a Druid DefaultDimensionSpec.

Transformation of time dimension expressions

We attempt to transform expressions on the time dimension into a Druid TimeParsingExtraction or TimeFormatExtraction. We handle the following functions:

  • AttributeReferences
  • Casts to DateType, Timestamp
  • ToDate
  • DateTime element functions: Year, DayOfMonth, DayOfYear, Month, WeekOfYear, Hour, Minute, Second, UnixTimestamp, FromUnixTime, FromUTCTimestamp, ToUTCTimestamp.

We also attempt to transform spark-datetime package expressions to use the Druid time extraction specs. We handle the following functions: era, centuryOfEra, yearOfEra, yearOfCentury, year, weekyear, monthOfYear, monthOfYearName, weekOfWeekyear, dayOfYear, dayOfMonth, dayOfWeek, dayOfWeekName, hourOfDay, secondOfMinute

Javascript transformations

This is a catch all transformation, it handles expressions that don’t fall into the above 2 categories. It handles all the patterns listed in the Filter Expressions section.

Aggregate Expressions

Aggregate expressions in SQL are transformed into Aggregate Specifications on the Druid Query. There are 2 forms of tranformations.

Native function transforms Simple patterns of the form aggFunc(metric) are transformed into a Druid FunctionAggregation spec.

Javascript transforms Any other expression that contains any of the subexpressions listed for Filter Javascript is handled by this case. These expressions can reference both dimension and metric columns. Such an expression is transformed into Javascript Aggregation Spec. in Druid.

Clone this wiki locally