-
Notifications
You must be signed in to change notification settings - Fork 92
Query Plan Transformations and Optimizations
We support the acceleration of the following Query Patterns:
These are typical Slice-and-Dice queries, a Group By on a set of Group Expressions with optional dimensional filters, a optional having, order and limit clauses on top of the Group By. These are translated to a Druid Group-By query(under certain conditions the Group By is optimized to a Druid TimeSeries or TopN query). The results of the Druid Query are bridged into the Spark Plan via the DruidRDD instance, a Projection on top of the DruidRDD handles any datatype translations, and value projections.
For example:
SELECT f, s, Count(*) AS count_order FROM (SELECT l_returnflag AS f, l_linestatus AS s, l_shipdate, s_region, s_nation, c_nation, p_type FROM orderlineitempartsupplier) t WHERE Dateisbeforeorequal(Datetime(`l_shipdate`), Dateminus(Datetime("1997-12-01"), Period("p90d"))) AND ( ( s_nation = 'FRANCE' AND c_nation = 'GERMANY' ) OR ( c_nation = 'FRANCE' AND s_nation = 'GERMANY' ) ) AND p_type = 'ECONOMY ANODIZED STEEL' GROUP BY f, s ORDER BY f, s
The optimized logical plan for this query is:
Sort [f#318 ASC,s#319 ASC], true +- Aggregate [f#318,s#319], [f#318,s#319,(count(1),mode=Complete,isDistinct=false) AS count_order#320L] +- Project [l_returnflag#337 AS f#318,l_linestatus#338 AS s#319] +- Filter ((UDF(UDF(l_shipdate#339),UDF(UDF(1997-12-01),UDF(P90D))) && (((s_nation#356 = FRANCE) && (c_nation#372 = GERMANY)) || ((c_nation#372 = FRANCE) && (s_nation#356 = GERMANY)))) && (p_type#361 = ECONOMY ANODIZED STEEL)) +- Relation[o_orderkey#321,o_custkey#322,o_orderstatus#323,o_totalprice#324,o_orderdate#325,o_orderpriority#326,o_clerk#327,o_shippriority#328,o_comment#329,l_partkey#330,l_suppkey#331,l_linenumber#332,l_quantity#333,l_extendedprice#334,l_discount#335,l_tax#336,l_returnflag#337,l_linestatus#338,l_shipdate#339,l_commitdate#340,l_receiptdate#341,l_shipinstruct#342,l_shipmode#343,l_comment#344,order_year#345,ps_partkey#346,ps_suppkey#347,ps_availqty#348,ps_supplycost#349,ps_comment#350,s_name#351,s_address#352,s_phone#353,s_acctbal#354,s_comment#355,s_nation#356,s_region#357,p_name#358,p_mfgr#359,p_brand#360,p_type#361,p_size#362,p_container#363,p_retailprice#364,p_comment#365,c_name#366,c_address#367,c_phone#368,c_acctbal#369,c_mktsegment#370,c_comment#371,c_nation#372,c_region#373] DruidRelationInfo(fullName = DruidRelationName(orderLineItemPartSupplier,localhost,tpch), sourceDFName = orderLineItemPartSupplierBase, timeDimensionCol = l_shipdate, options = DruidRelationOptions(1000000,100000,true,true,true,30000,true,/druid,false,true,2147483647,Some(1)))
A typical Scan -> Filter -> Project -> Aggregate -> Sort
query
pattern. The physical plan or this query is shown below. The Filter, Aggregate
and Sort operations are pushed to Druid as a GroupBy query.
Project [f#318,s#319,alias-1#375L AS count_order#320L] +- Scan DruidQuery(2137263483): { "q" : { "jsonClass" : "GroupByQuerySpec", "queryType" : "groupBy", "dataSource" : "tpch", "dimensions" : [ { "jsonClass" : "DefaultDimensionSpec", "type" : "default", "dimension" : "l_returnflag", "outputName" : "f" }, { "jsonClass" : "DefaultDimensionSpec", "type" : "default", "dimension" : "l_linestatus", "outputName" : "s" } ], "limitSpec" : { "jsonClass" : "LimitSpec", "type" : "default", "limit" : 2147483647, "columns" : [ { "jsonClass" : "OrderByColumnSpec", "dimension" : "f", "direction" : "ascending" }, { "jsonClass" : "OrderByColumnSpec", "dimension" : "s", "direction" : "ascending" } ] }, "granularity" : "all", "filter" : { "jsonClass" : "LogicalFilterSpec", "type" : "and", "fields" : [ { "jsonClass" : "LogicalFilterSpec", "type" : "or", "fields" : [ { "jsonClass" : "LogicalFilterSpec", "type" : "and", "fields" : [ { "jsonClass" : "SelectorFilterSpec", "type" : "selector", "dimension" : "s_nation", "value" : "FRANCE" }, { "jsonClass" : "SelectorFilterSpec", "type" : "selector", "dimension" : "c_nation", "value" : "GERMANY" } ] }, { "jsonClass" : "LogicalFilterSpec", "type" : "and", "fields" : [ { "jsonClass" : "SelectorFilterSpec", "type" : "selector", "dimension" : "c_nation", "value" : "FRANCE" }, { "jsonClass" : "SelectorFilterSpec", "type" : "selector", "dimension" : "s_nation", "value" : "GERMANY" } ] } ] }, { "jsonClass" : "SelectorFilterSpec", "type" : "selector", "dimension" : "p_type", "value" : "ECONOMY ANODIZED STEEL" } ] }, "aggregations" : [ { "jsonClass" : "FunctionAggregationSpec", "type" : "longSum", "name" : "alias-1", "fieldName" : "count" } ], "intervals" : [ "1993-01-01T00:00:00.000Z/1997-09-02T00:00:00.001Z" ] }, "queryHistoricalServer" : false, "numSegmentsPerQuery" : -1, "intervalSplits" : [ { "start" : 725846400000, "end" : 873158400001 } ], "outputAttrSpec" : [ { "exprId" : { "id" : 318, "jvmId" : { } }, "name" : "f", "dataType" : { }, "tf" : "toString" }, { "exprId" : { "id" : 319, "jvmId" : { } }, "name" : "s", "dataType" : { }, "tf" : "toString" }, { "exprId" : { "id" : 375, "jvmId" : { } }, "name" : "alias-1", "dataType" : { }, "tf" : "toLong" } ] }[f#318,s#319,alias-1#375L]
Grouping Set queries like Cube and Rollups are executed as a set of Druid Queries, one for each Grouping Set. A Union Operator is used to combine the output of all the Druid Queries.
For example
select l_returnflag, l_linestatus, grouping__id, count(*), sum(l_extendedprice) as s from orderLineItemPartSupplier where s_nation = 'FRANCE' group by l_returnflag, l_linestatus with rollup
The optimized logical plan for this query is:
Aggregate [l_returnflag#374,l_linestatus#375,grouping__id#373], [l_returnflag#374,l_linestatus#375,grouping__id#373,(count(1),mode=Complete,isDistinct=false) AS _c3#372L,(sum(l_extendedprice#332),mode=Complete,isDistinct=false) AS s#318] +- Project [l_returnflag#374,l_linestatus#375,grouping__id#373,l_extendedprice#332] +- Expand [List(o_orderkey#319, o_custkey#320, o_orderstatus#321, o_totalprice#322, o_orderdate#323, o_orderpriority#324, o_clerk#325, o_shippriority#326, o_comment#327, l_partkey#328, l_suppkey#329, l_linenumber#330, l_quantity#331, l_extendedprice#332, l_discount#333, l_tax#334, l_returnflag#335, l_linestatus#336, l_shipdate#337, l_commitdate#338, l_receiptdate#339, l_shipinstruct#340, l_shipmode#341, l_comment#342, order_year#343, ps_partkey#344, ps_suppkey#345, ps_availqty#346, ps_supplycost#347, ps_comment#348, s_name#349, s_address#350, s_phone#351, s_acctbal#352, s_comment#353, s_nation#354, s_region#355, p_name#356, p_mfgr#357, p_brand#358, p_type#359, p_size#360, p_container#361, p_retailprice#362, p_comment#363, c_name#364, c_address#365, c_phone#366, c_acctbal#367, c_mktsegment#368, c_comment#369, c_nation#370, c_region#371, null, null, 0),List(o_orderkey#319, o_custkey#320, o_orderstatus#321, o_totalprice#322, o_orderdate#323, o_orderpriority#324, o_clerk#325, o_shippriority#326, o_comment#327, l_partkey#328, l_suppkey#329, l_linenumber#330, l_quantity#331, l_extendedprice#332, l_discount#333, l_tax#334, l_returnflag#335, l_linestatus#336, l_shipdate#337, l_commitdate#338, l_receiptdate#339, l_shipinstruct#340, l_shipmode#341, l_comment#342, order_year#343, ps_partkey#344, ps_suppkey#345, ps_availqty#346, ps_supplycost#347, ps_comment#348, s_name#349, s_address#350, s_phone#351, s_acctbal#352, s_comment#353, s_nation#354, s_region#355, p_name#356, p_mfgr#357, p_brand#358, p_type#359, p_size#360, p_container#361, p_retailprice#362, p_comment#363, c_name#364, c_address#365, c_phone#366, c_acctbal#367, c_mktsegment#368, c_comment#369, c_nation#370, c_region#371, l_returnflag#374, null, 1),List(o_orderkey#319, o_custkey#320, o_orderstatus#321, o_totalprice#322, o_orderdate#323, o_orderpriority#324, o_clerk#325, o_shippriority#326, o_comment#327, l_partkey#328, l_suppkey#329, l_linenumber#330, l_quantity#331, l_extendedprice#332, l_discount#333, l_tax#334, l_returnflag#335, l_linestatus#336, l_shipdate#337, l_commitdate#338, l_receiptdate#339, l_shipinstruct#340, l_shipmode#341, l_comment#342, order_year#343, ps_partkey#344, ps_suppkey#345, ps_availqty#346, ps_supplycost#347, ps_comment#348, s_name#349, s_address#350, s_phone#351, s_acctbal#352, s_comment#353, s_nation#354, s_region#355, p_name#356, p_mfgr#357, p_brand#358, p_type#359, p_size#360, p_container#361, p_retailprice#362, p_comment#363, c_name#364, c_address#365, c_phone#366, c_acctbal#367, c_mktsegment#368, c_comment#369, c_nation#370, c_region#371, l_returnflag#374, l_linestatus#375, 3)], [o_orderkey#319,o_custkey#320,o_orderstatus#321,o_totalprice#322,o_orderdate#323,o_orderpriority#324,o_clerk#325,o_shippriority#326,o_comment#327,l_partkey#328,l_suppkey#329,l_linenumber#330,l_quantity#331,l_extendedprice#332,l_discount#333,l_tax#334,l_returnflag#335,l_linestatus#336,l_shipdate#337,l_commitdate#338,l_receiptdate#339,l_shipinstruct#340,l_shipmode#341,l_comment#342,order_year#343,ps_partkey#344,ps_suppkey#345,ps_availqty#346,ps_supplycost#347,ps_comment#348,s_name#349,s_address#350,s_phone#351,s_acctbal#352,s_comment#353,s_nation#354,s_region#355,p_name#356,p_mfgr#357,p_brand#358,p_type#359,p_size#360,p_container#361,p_retailprice#362,p_comment#363,c_name#364,c_address#365,c_phone#366,c_acctbal#367,c_mktsegment#368,c_comment#369,c_nation#370,c_region#371,l_returnflag#374,l_linestatus#375,grouping__id#373] +- Project [o_orderkey#319,o_custkey#320,o_orderstatus#321,o_totalprice#322,o_orderdate#323,o_orderpriority#324,o_clerk#325,o_shippriority#326,o_comment#327,l_partkey#328,l_suppkey#329,l_linenumber#330,l_quantity#331,l_extendedprice#332,l_discount#333,l_tax#334,l_returnflag#335,l_linestatus#336,l_shipdate#337,l_commitdate#338,l_receiptdate#339,l_shipinstruct#340,l_shipmode#341,l_comment#342,order_year#343,ps_partkey#344,ps_suppkey#345,ps_availqty#346,ps_supplycost#347,ps_comment#348,s_name#349,s_address#350,s_phone#351,s_acctbal#352,s_comment#353,s_nation#354,s_region#355,p_name#356,p_mfgr#357,p_brand#358,p_type#359,p_size#360,p_container#361,p_retailprice#362,p_comment#363,c_name#364,c_address#365,c_phone#366,c_acctbal#367,c_mktsegment#368,c_comment#369,c_nation#370,c_region#371,l_returnflag#335 AS l_returnflag#374,l_linestatus#336 AS l_linestatus#375] +- Filter (s_nation#354 = FRANCE) +- Relation[o_orderkey#319,o_custkey#320,o_orderstatus#321,o_totalprice#322,o_orderdate#323,o_orderpriority#324,o_clerk#325,o_shippriority#326,o_comment#327,l_partkey#328,l_suppkey#329,l_linenumber#330,l_quantity#331,l_extendedprice#332,l_discount#333,l_tax#334,l_returnflag#335,l_linestatus#336,l_shipdate#337,l_commitdate#338,l_receiptdate#339,l_shipinstruct#340,l_shipmode#341,l_comment#342,order_year#343,ps_partkey#344,ps_suppkey#345,ps_availqty#346,ps_supplycost#347,ps_comment#348,s_name#349,s_address#350,s_phone#351,s_acctbal#352,s_comment#353,s_nation#354,s_region#355,p_name#356,p_mfgr#357,p_brand#358,p_type#359,p_size#360,p_container#361,p_retailprice#362,p_comment#363,c_name#364,c_address#365,c_phone#366,c_acctbal#367,c_mktsegment#368,c_comment#369,c_nation#370,c_region#371] DruidRelationInfo(fullName = DruidRelationName(orderLineItemPartSupplier,localhost,tpch), sourceDFName = orderLineItemPartSupplierBase, timeDimensionCol = l_shipdate, options = DruidRelationOptions(1000000,100000,true,true,true,30000,true,/druid,false,true,2147483647,Some(1)))
The physical plan or this query is shown below.
Union :- Project [null AS l_returnflag#374,null AS l_linestatus#375,0 AS grouping__id#373,alias-1#376L AS _c3#372L,alias-2#377 AS s#318] : +- Scan DruidQuery(63544302): { "q" : { "jsonClass" : "TimeSeriesQuerySpec", "queryType" : "timeseries", "dataSource" : "tpch", "intervals" : [ "1993-01-01T00:00:00.000Z/1997-12-31T00:00:01.000Z" ], "granularity" : "all", "filter" : { "jsonClass" : "SelectorFilterSpec", "type" : "selector", "dimension" : "s_nation", "value" : "FRANCE" }, "aggregations" : [ { "jsonClass" : "FunctionAggregationSpec", "type" : "longSum", "name" : "alias-1", "fieldName" : "count" }, { "jsonClass" : "FunctionAggregationSpec", "type" : "doubleSum", "name" : "alias-2", "fieldName" : "l_extendedprice" } ] }, "queryHistoricalServer" : false, "numSegmentsPerQuery" : -1, "intervalSplits" : [ { "start" : 725846400000, "end" : 883526401000 } ], "outputAttrSpec" : [ { "exprId" : { "id" : 376, "jvmId" : { } }, "name" : "alias-1", "dataType" : { }, "tf" : "toLong" }, { "exprId" : { "id" : 377, "jvmId" : { } }, "name" : "alias-2", "dataType" : { }, "tf" : "" } ] }[alias-1#376L,alias-2#377] :- Project [l_returnflag#374,null AS l_linestatus#375,1 AS grouping__id#373,alias-3#378L AS _c3#372L,alias-4#379 AS s#318] : +- Scan DruidQuery(72665178): { "q" : { "jsonClass" : "GroupByQuerySpec", "queryType" : "groupBy", "dataSource" : "tpch", "dimensions" : [ { "jsonClass" : "DefaultDimensionSpec", "type" : "default", "dimension" : "l_returnflag", "outputName" : "l_returnflag" } ], "granularity" : "all", "filter" : { "jsonClass" : "SelectorFilterSpec", "type" : "selector", "dimension" : "s_nation", "value" : "FRANCE" }, "aggregations" : [ { "jsonClass" : "FunctionAggregationSpec", "type" : "longSum", "name" : "alias-3", "fieldName" : "count" }, { "jsonClass" : "FunctionAggregationSpec", "type" : "doubleSum", "name" : "alias-4", "fieldName" : "l_extendedprice" } ], "intervals" : [ "1993-01-01T00:00:00.000Z/1997-12-31T00:00:01.000Z" ] }, "queryHistoricalServer" : false, "numSegmentsPerQuery" : -1, "intervalSplits" : [ { "start" : 725846400000, "end" : 883526401000 } ], "outputAttrSpec" : [ { "exprId" : { "id" : 374, "jvmId" : { } }, "name" : "l_returnflag", "dataType" : { }, "tf" : "toString" }, { "exprId" : { "id" : 378, "jvmId" : { } }, "name" : "alias-3", "dataType" : { }, "tf" : "toLong" }, { "exprId" : { "id" : 379, "jvmId" : { } }, "name" : "alias-4", "dataType" : { }, "tf" : "" } ] }[l_returnflag#374,alias-3#378L,alias-4#379] +- Project [l_returnflag#374,l_linestatus#375,3 AS grouping__id#373,alias-5#380L AS _c3#372L,alias-6#381 AS s#318] +- Scan DruidQuery(1269235678): { "q" : { "jsonClass" : "GroupByQuerySpec", "queryType" : "groupBy", "dataSource" : "tpch", "dimensions" : [ { "jsonClass" : "DefaultDimensionSpec", "type" : "default", "dimension" : "l_returnflag", "outputName" : "l_returnflag" }, { "jsonClass" : "DefaultDimensionSpec", "type" : "default", "dimension" : "l_linestatus", "outputName" : "l_linestatus" } ], "granularity" : "all", "filter" : { "jsonClass" : "SelectorFilterSpec", "type" : "selector", "dimension" : "s_nation", "value" : "FRANCE" }, "aggregations" : [ { "jsonClass" : "FunctionAggregationSpec", "type" : "longSum", "name" : "alias-5", "fieldName" : "count" }, { "jsonClass" : "FunctionAggregationSpec", "type" : "doubleSum", "name" : "alias-6", "fieldName" : "l_extendedprice" } ], "intervals" : [ "1993-01-01T00:00:00.000Z/1997-12-31T00:00:01.000Z" ] }, "queryHistoricalServer" : false, "numSegmentsPerQuery" : -1, "intervalSplits" : [ { "start" : 725846400000, "end" : 883526401000 } ], "outputAttrSpec" : [ { "exprId" : { "id" : 374, "jvmId" : { } }, "name" : "l_returnflag", "dataType" : { }, "tf" : "toString" }, { "exprId" : { "id" : 375, "jvmId" : { } }, "name" : "l_linestatus", "dataType" : { }, "tf" : "toString" }, { "exprId" : { "id" : 380, "jvmId" : { } }, "name" : "alias-5", "dataType" : { }, "tf" : "toLong" }, { "exprId" : { "id" : 381, "jvmId" : { } }, "name" : "alias-6", "dataType" : { }, "tf" : "" } ] }[l_returnflag#374,l_linestatus#375,alias-5#380L,alias-6#381]
Queries that involve joining 1 or more Dimension tables with a Fact table where all the dimension and fact columns are part of the Druid Index are executed as a single Druid Query.
For example for the TPCH use case all the dimensions and measures are indexed. Consider the TPCH Query 3:
select o_orderkey, sum(l_extendedprice) as price, o_orderdate, o_shippriority from customer, orders, lineitem where c_mktsegment = 'BUILDING' and dateIsBefore(dateTime(`o_orderdate`),dateTime("1995-03-15")) and dateIsAfter(dateTime(`l_shipdate`),dateTime("1995-03-15")) and c_custkey = o_custkey and l_orderkey = o_orderkey group by o_orderkey, o_orderdate, o_shippriority
The optimized Logical Plan joins Customer, Orders and LineItem tables and then performs an Aggregation.
Aggregate [o_orderkey#414,o_orderdate#418,o_shippriority#421], [o_orderkey#414,(sum(l_extendedprice#732),mode=Complete,isDistinct=false) AS price#726,o_orderdate#418,o_shippriority#421] +- Project [o_orderkey#414,o_orderdate#418,o_shippriority#421,l_extendedprice#732] +- Join Inner, Some((l_orderkey#727 = o_orderkey#414)) :- Project [o_orderkey#414,o_orderdate#418,o_shippriority#421] : +- Join Inner, Some((c_custkey#594 = o_custkey#415)) : :- Project [c_custkey#594] : : +- Filter (c_mktsegment#600 = BUILDING) : : +- InMemoryRelation [c_custkey#594,c_name#595,c_address#596,c_nationkey#597,c_phone#598,c_acctbal#599,c_mktsegment#600,c_comment#601], true, 10000, StorageLevel(true, true, false, true, 1), ConvertToUnsafe, None : +- Project [o_orderkey#414,o_orderdate#418,o_shippriority#421,o_custkey#415] : +- Filter UDF(UDF(o_orderdate#418),UDF(1995-03-15)) : +- InMemoryRelation [o_orderkey#414,o_custkey#415,o_orderstatus#416,o_totalprice#417,o_orderdate#418,o_orderpriority#419,o_clerk#420,o_shippriority#421,o_comment#422], true, 10000, StorageLevel(true, true, false, true, 1), ConvertToUnsafe, None +- Project [l_extendedprice#732,l_orderkey#727] +- Filter UDF(UDF(l_shipdate#737),UDF(1995-03-15)) +- Relation[l_orderkey#727,l_partkey#728,l_suppkey#729,l_linenumber#730,l_quantity#731,l_extendedprice#732,l_discount#733,l_tax#734,l_returnflag#735,l_linestatus#736,l_shipdate#737,l_commitdate#738,l_receiptdate#739,l_shipinstruct#740,l_shipmode#741,l_comment#742] DruidRelationInfo(fullName = DruidRelationName(lineitem,localhost,tpch), sourceDFName = lineItemBase, timeDimensionCol = l_shipdate, options = DruidRelationOptions(1000000,100000,true,true,true,30000,true,/druid,false,true,2147483647,Some(1)))
The physical plan or this query is shown below. The Joins, and Aggregate operations are pushed to Druid as a single GroupBy query.
Project [cast(o_orderkey#414 as int) AS o_orderkey#414,alias-1#828 AS price#726,o_orderdate#418,cast(o_shippriority#421 as int) AS o_shippriority#421] +- Scan DruidQuery(963099648): { "q" : { "jsonClass" : "GroupByQuerySpec", "queryType" : "groupBy", "dataSource" : "tpch", "dimensions" : [ { "jsonClass" : "DefaultDimensionSpec", "type" : "default", "dimension" : "o_orderkey", "outputName" : "o_orderkey" }, { "jsonClass" : "DefaultDimensionSpec", "type" : "default", "dimension" : "o_orderdate", "outputName" : "o_orderdate" }, { "jsonClass" : "DefaultDimensionSpec", "type" : "default", "dimension" : "o_shippriority", "outputName" : "o_shippriority" } ], "granularity" : "all", "filter" : { "jsonClass" : "LogicalFilterSpec", "type" : "and", "fields" : [ { "jsonClass" : "SelectorFilterSpec", "type" : "selector", "dimension" : "c_mktsegment", "value" : "BUILDING" }, { "jsonClass" : "JavascriptFilterSpec", "type" : "javascript", "dimension" : "o_orderdate", "function" : "function(x) { return(x < '1995-03-15T00:00:00.000Z') }" } ] }, "aggregations" : [ { "jsonClass" : "FunctionAggregationSpec", "type" : "doubleSum", "name" : "alias-1", "fieldName" : "l_extendedprice" } ], "intervals" : [ "1995-03-15T00:00:00.001Z/1997-12-31T00:00:01.000Z" ] }, "queryHistoricalServer" : false, "numSegmentsPerQuery" : -1, "intervalSplits" : [ { "start" : 795225600001, "end" : 883526401000 } ], "outputAttrSpec" : [ { "exprId" : { "id" : 414, "jvmId" : { } }, "name" : "o_orderkey", "dataType" : { }, "tf" : "toString" }, { "exprId" : { "id" : 418, "jvmId" : { } }, "name" : "o_orderdate", "dataType" : { }, "tf" : "toString" }, { "exprId" : { "id" : 421, "jvmId" : { } }, "name" : "o_shippriority", "dataType" : { }, "tf" : "toString" }, { "exprId" : { "id" : 828, "jvmId" : { } }, "name" : "alias-1", "dataType" : { }, "tf" : "" } ] }[o_orderkey#414,o_orderdate#418,o_shippriority#421,alias-1#828]
Unlike the other patterns this involves a Logical transformation. A common analytical pattern is to compare Aggregates between different levels of Aggregation. The unoptimized logical plan for this query performs a Join on the output of the first Aggregation. The Physical Plan for this Logical Plan would only be able to push the inner Aggregate to Druid, and would entail a costly Scan of the Fact table in Spark.
For example the following computes the overall min. linenumber, the total account balance and count and then uses these statistics when computing metrics by linenumber.
select r1.l_linenumber x, sum(r1.l_quantity) y, (mi + 10) as z from orderLineItemPartSupplier r1 join (select min(l_linenumber) mi, sum(c_acctbal) ma, count(1) from orderLineItemPartSupplier where not(l_shipdate is null) having count(1) > 0 ) r2 group by r1.l_linenumber, (mi + 10) order by x, y, z
the unoptimized plan for this query is:
Sort [x#320 ASC,y#321 ASC,z#322 ASC], true +- Aggregate [l_linenumber#334,(mi#318 + 10)], [l_linenumber#334 AS x#320,(sum(l_quantity#335),mode=Complete,isDistinct=false) AS y#321,(mi#318 + 10) AS z#322] +- Join Inner, None :- Subquery r1 : +- Subquery orderlineitempartsupplier : +- Relation[o_orderkey#323,o_custkey#324,o_orderstatus#325,o_totalprice#326,o_orderdate#327,o_orderpriority#328,o_clerk#329,o_shippriority#330,o_comment#331,l_partkey#332,l_suppkey#333,l_linenumber#334,l_quantity#335,l_extendedprice#336,l_discount#337,l_tax#338,l_returnflag#339,l_linestatus#340,l_shipdate#341,l_commitdate#342,l_receiptdate#343,l_shipinstruct#344,l_shipmode#345,l_comment#346,order_year#347,ps_partkey#348,ps_suppkey#349,ps_availqty#350,ps_supplycost#351,ps_comment#352,s_name#353,s_address#354,s_phone#355,s_acctbal#356,s_comment#357,s_nation#358,s_region#359,p_name#360,p_mfgr#361,p_brand#362,p_type#363,p_size#364,p_container#365,p_retailprice#366,p_comment#367,c_name#368,c_address#369,c_phone#370,c_acctbal#371,c_mktsegment#372,c_comment#373,c_nation#374,c_region#375] DruidRelationInfo(fullName = DruidRelationName(orderLineItemPartSupplier,localhost,tpch), sourceDFName = orderLineItemPartSupplierBase, timeDimensionCol = l_shipdate, options = DruidRelationOptions(1000000,100000,true,true,true,30000,true,/druid,false,true,2147483647,Some(1))) +- Subquery r2 +- Project [mi#318,ma#319,_c2#376L] +- Filter havingCondition#377: boolean +- Aggregate [cast(((count(1),mode=Complete,isDistinct=false) > cast(0 as bigint)) as boolean) AS havingCondition#377,(min(l_linenumber#334),mode=Complete,isDistinct=false) AS mi#318,(sum(c_acctbal#371),mode=Complete,isDistinct=false) AS ma#319,(count(1),mode=Complete,isDistinct=false) AS _c2#376L] +- Filter NOT isnull(l_shipdate#341) +- Subquery orderlineitempartsupplier +- Relation[o_orderkey#323,o_custkey#324,o_orderstatus#325,o_totalprice#326,o_orderdate#327,o_orderpriority#328,o_clerk#329,o_shippriority#330,o_comment#331,l_partkey#332,l_suppkey#333,l_linenumber#334,l_quantity#335,l_extendedprice#336,l_discount#337,l_tax#338,l_returnflag#339,l_linestatus#340,l_shipdate#341,l_commitdate#342,l_receiptdate#343,l_shipinstruct#344,l_shipmode#345,l_comment#346,order_year#347,ps_partkey#348,ps_suppkey#349,ps_availqty#350,ps_supplycost#351,ps_comment#352,s_name#353,s_address#354,s_phone#355,s_acctbal#356,s_comment#357,s_nation#358,s_region#359,p_name#360,p_mfgr#361,p_brand#362,p_type#363,p_size#364,p_container#365,p_retailprice#366,p_comment#367,c_name#368,c_address#369,c_phone#370,c_acctbal#371,c_mktsegment#372,c_comment#373,c_nation#374,c_region#375] DruidRelationInfo(fullName = DruidRelationName(orderLineItemPartSupplier,localhost,tpch), sourceDFName = orderLineItemPartSupplierBase, timeDimensionCol = l_shipdate, options = DruidRelationOptions(1000000,100000,true,true,true,30000,true,/druid,false,true,2147483647,Some(1)))
This Logical Optimization pushes the outer Aggregation below the Join. Now we perform 2 very fast aggregations in Druid followed by a Join of very small aggregated results.
+- Project [x#320,y#321,(mi#318 + 10) AS z#322] +- Join Inner, None :- Aggregate [l_linenumber#334], [l_linenumber#334 AS x#320,(sum(l_quantity#335),mode=Complete,isDistinct=false) AS y#321] : +- Project [l_linenumber#334,l_quantity#335] : +- Relation[o_orderkey#323,o_custkey#324,o_orderstatus#325,o_totalprice#326,o_orderdate#327,o_orderpriority#328,o_clerk#329,o_shippriority#330,o_comment#331,l_partkey#332,l_suppkey#333,l_linenumber#334,l_quantity#335,l_extendedprice#336,l_discount#337,l_tax#338,l_returnflag#339,l_linestatus#340,l_shipdate#341,l_commitdate#342,l_receiptdate#343,l_shipinstruct#344,l_shipmode#345,l_comment#346,order_year#347,ps_partkey#348,ps_suppkey#349,ps_availqty#350,ps_supplycost#351,ps_comment#352,s_name#353,s_address#354,s_phone#355,s_acctbal#356,s_comment#357,s_nation#358,s_region#359,p_name#360,p_mfgr#361,p_brand#362,p_type#363,p_size#364,p_container#365,p_retailprice#366,p_comment#367,c_name#368,c_address#369,c_phone#370,c_acctbal#371,c_mktsegment#372,c_comment#373,c_nation#374,c_region#375] DruidRelationInfo(fullName = DruidRelationName(orderLineItemPartSupplier,localhost,tpch), sourceDFName = orderLineItemPartSupplierBase, timeDimensionCol = l_shipdate, options = DruidRelationOptions(1000000,100000,true,true,true,30000,true,/druid,false,true,2147483647,Some(1))) +- Project [mi#318] +- Filter havingCondition#377: boolean +- Aggregate [((count(1),mode=Complete,isDistinct=false) > 0) AS havingCondition#377,(min(l_linenumber#334),mode=Complete,isDistinct=false) AS mi#318,(sum(c_acctbal#371),mode=Complete,isDistinct=false) AS ma#319,(count(1),mode=Complete,isDistinct=false) AS _c2#376L] +- Project [l_linenumber#334,c_acctbal#371] +- Filter NOT isnull(l_shipdate#341) +- Relation[o_orderkey#323,o_custkey#324,o_orderstatus#325,o_totalprice#326,o_orderdate#327,o_orderpriority#328,o_clerk#329,o_shippriority#330,o_comment#331,l_partkey#332,l_suppkey#333,l_linenumber#334,l_quantity#335,l_extendedprice#336,l_discount#337,l_tax#338,l_returnflag#339,l_linestatus#340,l_shipdate#341,l_commitdate#342,l_receiptdate#343,l_shipinstruct#344,l_shipmode#345,l_comment#346,order_year#347,ps_partkey#348,ps_suppkey#349,ps_availqty#350,ps_supplycost#351,ps_comment#352,s_name#353,s_address#354,s_phone#355,s_acctbal#356,s_comment#357,s_nation#358,s_region#359,p_name#360,p_mfgr#361,p_brand#362,p_type#363,p_size#364,p_container#365,p_retailprice#366,p_comment#367,c_name#368,c_address#369,c_phone#370,c_acctbal#371,c_mktsegment#372,c_comment#373,c_nation#374,c_region#375] DruidRelationInfo(fullName = DruidRelationName(orderLineItemPartSupplier,localhost,tpch), sourceDFName = orderLineItemPartSupplierBase, timeDimensionCol = l_shipdate, options = DruidRelationOptions(1000000,100000,true,true,true,30000,true,/druid,false,true,2147483647,Some(1)))
Predicates on dimension columns are pushed to Druid as filter specifications. There are 3 types of transformations.
- simple comparison expressions of the form
name = 'X'
orsize > 10
are transformed into Druid Select or Bound filters. - Logical operators: And, Or, Not are translated to the Druid equivalent operators.
- In, InSet is transformed to ExtractionFilter in Druid.
- IsNull is translated to a Selection of the empty string.
Certain Predicates on the time Dimension are transformed into intervals on the Druid Query:
- predicates of the form
timeDim compOp literal
orliteral compOp timeDim
are converted into an interval on the Druid Query. The literal can be a String, Date or Timestamp. - invocation of date comparison functions from the spark-datetime package are also converted to an interval on the Druid Query
This is a catch all transformation, it handles expressions that don’t fall into the above 2 categories. As of this writing it handles the following expressions:
- Attribute references
- Literals include null values
- Cast expressions
- String functions:
Concat, Lower, Upper, Substring
- Case invocations
- the Coalesce function
- Comparisons: ~<,<=,=,!=,>,>=, in, not in, like, stringPredicate, ~
- Date functions: toDate, DateAdd, DateSub, DateDiff, Year, Quarter, Month, DayOfMonth, WeekOfYear, Hour, Minute, Second, FromUnixTime, UnixTimestamp.
- Arithmetic functions:
+, -, *, /, mod, remainder, abs, floor,
ceil, log, sqrt, unaryminus, unaryplus
Group Expressions contain dimension columns are pushed to Druid as Dimension specifications. There are 3 types of transformations.
This is straightforward transformation of a group expression on a dimension column in SQL into a Druid DefaultDimensionSpec.
We attempt to transform expressions on the time dimension into a Druid TimeParsingExtraction or TimeFormatExtraction. We handle the following functions:
- AttributeReferences
- Casts to DateType, Timestamp
- ToDate
- DateTime element functions: Year, DayOfMonth, DayOfYear, Month, WeekOfYear, Hour, Minute, Second, UnixTimestamp, FromUnixTime, FromUTCTimestamp, ToUTCTimestamp.
We also attempt to transform spark-datetime package expressions to use the Druid time extraction specs. We handle the following functions: era, centuryOfEra, yearOfEra, yearOfCentury, year, weekyear, monthOfYear, monthOfYearName, weekOfWeekyear, dayOfYear, dayOfMonth, dayOfWeek, dayOfWeekName, hourOfDay, secondOfMinute
This is a catch all transformation, it handles expressions that don’t fall into the above 2 categories. It handles all the patterns listed in the Filter Expressions section.
Aggregate expressions in SQL are transformed into Aggregate Specifications on the Druid Query. There are 2 forms of tranformations.
Native function transforms
Simple patterns of the form aggFunc(metric)
are transformed into a
Druid FunctionAggregation spec.
Javascript transforms Any other expression that contains any of the subexpressions listed for Filter Javascript is handled by this case. These expressions can reference both dimension and metric columns. Such an expression is transformed into Javascript Aggregation Spec. in Druid.
- Overview
- Quick Start
-
User Guide
- [Defining a DataSource on a Flattened Dataset](https://github.com/SparklineData/spark-druid-olap/wiki/Defining-a Druid-DataSource-on-a-Flattened-Dataset)
- Defining a Star Schema
- Sample Queries
- Approximate Count and Spatial Queries
- Druid Datasource Options
- Sparkline SQLContext Options
- Using Tableau with Sparkline
- How to debug a Query Plan?
- Running the ThriftServer with Sparklinedata components
- [Setting up multiple Sparkline ThriftServers - Load Balancing & HA] (https://github.com/SparklineData/spark-druid-olap/wiki/Setting-up-multiple-Sparkline-ThriftServers-(Load-Balancing-&-HA))
- Runtime Views
- Sparkline SQL extensions
- Sparkline Pluggable Modules
- Dev. Guide
- Reference Architectures
- Releases
- Cluster Spinup Tool
- TPCH Benchmark