Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow merging consecutive 'long' values in SortedRangeSet #3316

Closed
wants to merge 1 commit into from

Conversation

rzeyde-varada
Copy link
Contributor

@rzeyde-varada rzeyde-varada commented Apr 2, 2020

This can be useful when pushing down TupleDomains generated by dynamic filtering.

Fixes #6076

@cla-bot cla-bot bot added the cla-signed label Apr 2, 2020
@rzeyde-varada rzeyde-varada force-pushed the merge-longs branch 4 times, most recently from f5210e6 to 1f57c13 Compare April 3, 2020 20:04
@rzeyde-varada
Copy link
Contributor Author

rzeyde-varada commented Apr 4, 2020

Two of the TestTpcdsCostBasedPlan tests failed, due to a join order flip (log): q46 and q68.

Seems that changing the ("date_dim"."d_year" IN (1999 , (1999 + 1) , (1999 + 2))) predicate to ("date_dim"."d_year" >= 1999 AND "date_dim"."d_year" <= 1999 + 2) causes the plan change.

@rzeyde-varada
Copy link
Contributor Author

rzeyde-varada commented Apr 4, 2020

It also seems that statistics estimation is different after rewriting the query:

presto:tiny> EXPLAIN ANALYZE SELECT d_date_sk FROM date_dim WHERE d_year BETWEEN 1999 AND 2001;
                                                                                               Query Plan                                                                                               
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 1 [SOURCE]                                                                                                                                                                                    
     CPU: 497.52ms, Scheduled: 527.01ms, Input: 73049 rows (0B); per task: avg.: 73049.00 std.dev.: 0.00, Output: 1096 rows (9.63kB)                                                                    
     Output layout: [d_date_sk]                                                                                                                                                                         
     Output partitioning: SINGLE []                                                                                                                                                                     
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                      
     ScanFilterProject[table = tpcds:date_dim:sf0.01, grouped = false, filterPredicate = ("d_year" BETWEEN 1999 AND 2001)]                                                                              
         Layout: [d_date_sk:bigint]                                                                                                                                                                     
         Estimates: {rows: 73049 (642.03kB), cpu: 998.72k, memory: 0B, network: 0B}/{rows: 730 (6.42kB), cpu: 1.95M, memory: 0B, network: 0B}/{rows: 730 (6.42kB), cpu: 1.96M, memory: 0B, network: 0B} 
         CPU: 497.00ms (100.00%), Scheduled: 526.00ms (100.00%), Output: 1096 rows (9.63kB)                                                                                                             
         Input avg.: 18262.25 rows, Input std.dev.: 173.21%                                                                                                                                             
         d_date_sk := tpcds:d_date_sk                                                                                                                                                                   
         d_year := tpcds:d_year                                                                                                                                                                         
         Input: 73049 rows (0B), Filtered: 98.50%                                                                                                                                                       


presto:tiny> EXPLAIN ANALYZE SELECT d_date_sk FROM date_dim WHERE d_year IN (1999, 2000, 2001);
                                                                                                Query Plan                                                                                                
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 1 [SOURCE]                                                                                                                                                                                      
     CPU: 591.47ms, Scheduled: 682.71ms, Input: 73049 rows (0B); per task: avg.: 73049.00 std.dev.: 0.00, Output: 1096 rows (9.63kB)                                                                      
     Output layout: [d_date_sk]                                                                                                                                                                           
     Output partitioning: SINGLE []                                                                                                                                                                       
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                        
     ScanFilterProject[table = tpcds:date_dim:sf0.01, grouped = false, filterPredicate = ("d_year" IN (1999, 2000, 2001))]                                                                                
         Layout: [d_date_sk:bigint]                                                                                                                                                                       
         Estimates: {rows: 73049 (642.03kB), cpu: 998.72k, memory: 0B, network: 0B}/{rows: 1090 (9.58kB), cpu: 1.95M, memory: 0B, network: 0B}/{rows: 1090 (9.58kB), cpu: 1.96M, memory: 0B, network: 0B} 
         CPU: 591.00ms (100.00%), Scheduled: 682.00ms (100.00%), Output: 1096 rows (9.63kB)                                                                                                               
         Input avg.: 18262.25 rows, Input std.dev.: 173.21%                                                                                                                                               
         d_date_sk := tpcds:d_date_sk                                                                                                                                                                     
         d_year := tpcds:d_year                                                                                                                                                                           
         Input: 73049 rows (0B), Filtered: 98.50%                                                                                                                                                                 

For small IN predicates, the range estimation may under-estimate the actual statistics:

presto:tiny> EXPLAIN SELECT d_year FROM date_dim WHERE d_year >= 2000 AND d_year <= 2001;
                                                                    Query Plan                                                                    
--------------------------------------------------------------------------------------------------------------------------------------------------
 Output[d_year]                                                                                                                                   
 │   Layout: [d_year:integer]                                                                                                                     
 │   Estimates: {rows: 365 (1.78kB), cpu: 713.37k, memory: 0B, network: 1.78kB}                                                                   
 └─ RemoteExchange[GATHER]                                                                                                                        
    │   Layout: [d_year:integer]                                                                                                                  
    │   Estimates: {rows: 365 (1.78kB), cpu: 713.37k, memory: 0B, network: 1.78kB}                                                                
    └─ ScanFilter[table = tpcds:date_dim:sf0.01, filterPredicate = ("d_year" BETWEEN 2000 AND 2001)]                                              
           Layout: [d_year:integer]                                                                                                               
           Estimates: {rows: 73049 (356.68kB), cpu: 356.68k, memory: 0B, network: 0B}/{rows: 365 (1.78kB), cpu: 713.37k, memory: 0B, network: 0B} 
           d_year := tpcds:d_year                                                                                                                 

presto:tiny> EXPLAIN SELECT d_year FROM date_dim WHERE d_year IN (2000, 2001);
                                                                    Query Plan                                                                    
--------------------------------------------------------------------------------------------------------------------------------------------------
 Output[d_year]                                                                                                                                   
 │   Layout: [d_year:integer]                                                                                                                     
 │   Estimates: {rows: 727 (3.55kB), cpu: 713.37k, memory: 0B, network: 3.55kB}                                                                   
 └─ RemoteExchange[GATHER]                                                                                                                        
    │   Layout: [d_year:integer]                                                                                                                  
    │   Estimates: {rows: 727 (3.55kB), cpu: 713.37k, memory: 0B, network: 3.55kB}                                                                
    └─ ScanFilter[table = tpcds:date_dim:sf0.01, filterPredicate = ("d_year" IN (2000, 2001))]                                                    
           Layout: [d_year:integer]                                                                                                               
           Estimates: {rows: 73049 (356.68kB), cpu: 356.68k, memory: 0B, network: 0B}/{rows: 727 (3.55kB), cpu: 713.37k, memory: 0B, network: 0B} 
           d_year := tpcds:d_year                                                                                                                 

@martint
Copy link
Member

martint commented Apr 5, 2020

Two of the TestTpcdsCostBasedPlan tests failed, due to a join order flip (log): q46 and q68.

I haven't looked at the plans, but is the new plan "better"?

@martint
Copy link
Member

martint commented Apr 5, 2020

Seems that changing the ("date_dim"."d_year" IN (1999 , (1999 + 1) , (1999 + 2))) predicate to ("date_dim"."d_year" >= 1999 AND "date_dim"."d_year" <= 1999 + 2) causes the plan change.

That seems like a bug (or inconsistency) in the cost estimator. For integer types, it's clearly equivalent.

@findepi
Copy link
Member

findepi commented Apr 5, 2020

Seems that changing the ("date_dim"."d_year" IN (1999 , (1999 + 1) , (1999 + 2))) predicate to ("date_dim"."d_year" >= 1999 AND "date_dim"."d_year" <= 1999 + 2) causes the plan change.

That seems like a bug (or inconsistency) in the cost estimator. For integer types, it's clearly equivalent.

@martint there is no logic impl'd which could realize these are equivalent.
So, while indeed equivalent, they will be hit different code paths today.

Of course, something to improve, so the obvious question would be -- how big of a problem this is, how urgent.

@sopel39
Copy link
Member

sopel39 commented Apr 6, 2020

Given that plans changes, we probably should benchmark this PR

@rzeyde-varada
Copy link
Contributor Author

rzeyde-varada commented Apr 8, 2020

I haven't looked at the plans, but is the new plan "better"?

I have ran the query on sf1000 and it seems that the new plan should be better, since the build side table is smaller than the probe side.

  • The "build side" on q46 (results in 431k rows, ~25MB):
SELECT
  "ss_ticket_number"
, "ss_customer_sk"
, "ca_city" "bought_city"
, "sum"("ss_coupon_amt") "amt"
, "sum"("ss_net_profit") "profit"
FROM
  store_sales
, date_dim
, store
, household_demographics
, customer_address
WHERE ("store_sales"."ss_sold_date_sk" = "date_dim"."d_date_sk")
   AND ("store_sales"."ss_store_sk" = "store"."s_store_sk")
   AND ("store_sales"."ss_hdemo_sk" = "household_demographics"."hd_demo_sk")
   AND ("store_sales"."ss_addr_sk" = "customer_address"."ca_address_sk")
   AND (("household_demographics"."hd_dep_count" = 4)
      OR ("household_demographics"."hd_vehicle_count" = 3))
   AND ("date_dim"."d_dow" IN (6   , 0))
   AND ("date_dim"."d_year" IN (1999   , (1999 + 1)   , (1999 + 2)))
   AND ("store"."s_city" IN ('Fairview'   , 'Midway'   , 'Fairview'   , 'Fairview'   , 'Fairview'))
GROUP BY "ss_ticket_number", "ss_customer_sk", "ss_addr_sk", "ca_city";
  • The build side on q68 (results in 98k rows, ~7.3MB):
SELECT
  "ss_ticket_number"
, "ss_customer_sk"
, "ca_city" "bought_city"
, "sum"("ss_ext_sales_price") "extended_price"
, "sum"("ss_ext_list_price") "list_price"
, "sum"("ss_ext_tax") "extended_tax"
FROM
  store_sales
, date_dim
, store
, household_demographics
, customer_address
WHERE ("store_sales"."ss_sold_date_sk" = "date_dim"."d_date_sk")
   AND ("store_sales"."ss_store_sk" = "store"."s_store_sk")
   AND ("store_sales"."ss_hdemo_sk" = "household_demographics"."hd_demo_sk")
   AND ("store_sales"."ss_addr_sk" = "customer_address"."ca_address_sk")
   AND ("date_dim"."d_dom" BETWEEN 1 AND 2)
   AND (("household_demographics"."hd_dep_count" = 4)
      OR ("household_demographics"."hd_vehicle_count" = 3))
   AND ("date_dim"."d_year" IN (1999   , (1999 + 1)   , (1999 + 2)))
   AND ("store"."s_city" IN ('Midway'   , 'Fairview'));
  • The "probe side" in both queries (results in 12M rows, ~403MB):
SELECT
  "c_last_name"
, "c_first_name"
, "ca_city"
FROM
  customer
, customer_address
WHERE ("c_current_addr_sk" = "ca_address_sk");

@rzeyde-varada
Copy link
Contributor Author

Squashed and rebased over the latest master.

@martint martint self-requested a review May 15, 2020 06:25
@sopel39
Copy link
Member

sopel39 commented May 31, 2020

@martint is it sill LGTM? I wanted to run benchmarks on this

@martint
Copy link
Member

martint commented May 31, 2020

Yup, please do

@sopel39
Copy link
Member

sopel39 commented Jun 10, 2020

Benchmarks.
Benchmarks merge.pdf
I'm reruning them as it seems odd that merging causes CPU regression in so many queries.

However, I see two problems with this PR:

  1. Filter rules based on range are less accurate than ones based on discrete values (e.g because for discrete values we use NDVs). This can and should be fixed for int/long types
  2. Readers (e.g ORC reader: TupleDomainOrcPredicate#extractDiscreteValues) use discrete values to filter using Bloom filters. This should be fixed and I think this could be causing observed CPU regressions

@rzeyde-varada
Copy link
Contributor Author

Many thanks for the benchmarking!
Would you prefer to fix the problems above in separate PRs?

@findepi
Copy link
Member

findepi commented Jun 11, 2020

Would you prefer to fix the problems above in separate PRs?

Would that mean we have a regression until those new PRs are done & merged?

@rzeyde-varada
Copy link
Contributor Author

rzeyde-varada commented Jun 11, 2020

Would that mean we have a regression until those new PRs are done & merged?

No, it would be indeed undesirable.
I am suggesting first fixing those issues (in separate PRs) - and then test and make sure that this PR doesn't introduce performance regressions before merging it.

@sopel39
Copy link
Member

sopel39 commented Jun 15, 2020

Here is another benchmark.

Benchmarks comparison-merge.pdf

I am suggesting first fixing those issues (in separate PRs) - and then test and make sure that this PR doesn't introduce performance regressions before merging it.

Good idea

@rzeyde-varada
Copy link
Contributor Author

Marking this PR as draft, will unmark after fixing #4107 and #4108.

@rzeyde-varada rzeyde-varada force-pushed the merge-longs branch 2 times, most recently from 9e42c28 to 63fe21b Compare November 20, 2021 06:15
@rzeyde-varada
Copy link
Contributor Author

Rebased over the latest version of #9868.

@rzeyde-varada
Copy link
Contributor Author

Rebased over latest #9868.

@rzeyde-varada
Copy link
Contributor Author

#9868 is merged - please take a look at this PR :)

Copy link
Member

@raunaqmorarka raunaqmorarka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like current functionality requires multiple flags to unlock.
We will still fall back to min/max DF collection by default when there are many adjacent distinct values. So enable-large-dynamic-filters also has to be enabled to avoid that.
In the current implementation of DFs, connectors already receive the full Domain without simplification. We'll save some memory and network communication costs with a more compact representation in Domain. But are these efficiency gains the main benefit or is this more about preventing Domain#simplify call in connectors from loosing the granularity of information in the received Domain ?
Would these changes make it worthwhile to revise the default DF thresholds in DynamicFilterConfig to higher values ?

/**
* Try to return a more compact representation (if possible).
*/
default ValueSet compact()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The concept of compacting ranges seems very specific to SortedRangeSet and not easily applicable to every type of ValueSet.
So it seems more suitable to put this in Ranges interface and use it through ValuesProcessor where needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The concept of compacting ranges seems very specific to SortedRangeSet

agreed

note however, that the interface

/**
 * Try to return a more compact representation (if possible).
 */
ValueSet compact();

is not range-specific and could live in ValueSet.
Current name is getCompactedRanges, but it still is ValueSet-breaking operation, which could be potentially extended to other ValueSet types.

I think the important question to ask is what "compact" actually means?

  • occupies less memory? (eg SortedRangeSet uses dictionary for singleton-ranges, but only if constructed explicitly as such)
    • in the first case, it's not ranges-specific
  • fewer ranges?
    • in the second case, it's ranges specific, but then why does it return a ValueSet, losing information it's ranges?

private static boolean areConsecutive(Type type, Object low, Object high)
{
return type
.getDiscreteValues(new Type.Range(low, high))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use comparison operator to detect adjacent values the way tryMergeWithNext does ?
That would avoid relying on getDiscreteValues which isn't implemented for many types.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure - IIUC, tryMergeWithNext() relies on the ranges sharing the low/high bound values, i.e. [1,5] and (5,9] can be merged into [1,9].
However, if we try to merge [1,5] and [6,9], we should make sure that 5 and 6 are "consecutive" (i.e. there is no other value between them, and this requires calling Type#getDiscreteValues).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can't use similar logic as tryMergeWithNext here.
But re-using Type#getDiscreteValues also seems awkward.
I'm thinking we should just add a boolean areConsecutive(Object low, Object high) to Type for this.
@findepi @martint thoughts ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While working on Iceberg I had a need to find an adjacent value (not check whether two values are adjacent).
Here is my API proposal for that: #12797

Here, it seems we don't need new methods on Type, so I'd hold off and not add them.

@rzeyde-varada
Copy link
Contributor Author

Sorry for the delayed response - fixed the issues, and updated the PR.

But are these efficiency gains the main benefit or is this more about preventing Domain#simplify call in connectors from loosing the granularity of information in the received Domain ?

We would prefer to not to call Domain#simplify to allow applying the DF efficiently.
This way, our connector will get an "equivalent" predicate, which can be evaluated much faster (compared to a large discrete values' list).

@rzeyde-varada rzeyde-varada force-pushed the merge-longs branch 2 times, most recently from 5a77f95 to 5ff4c69 Compare February 4, 2022 13:13
private static boolean areConsecutive(Type type, Object low, Object high)
{
return type
.getDiscreteValues(new Type.Range(low, high))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can't use similar logic as tryMergeWithNext here.
But re-using Type#getDiscreteValues also seems awkward.
I'm thinking we should just add a boolean areConsecutive(Object low, Object high) to Type for this.
@findepi @martint thoughts ?

Comment on lines +33 to +35
* Try to return a more compact representation (if possible).
*/
ValueSet getCompactedRanges();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document whether the compacted version is supported to

  • be equal (as in Object.equals)
  • contain same information, or can eg be slightly lossy (widening) for more efficient compaction
  • what does is returned when compaction is not possible?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, higher level

  • why is this a method on Ranges instead of on ValueSet?
  • why do we want to have such method at all? I would hope the ValueSet instance is "as compact as possible", always

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document whether the compacted version is supported to

Sounds good - done.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this a method on Ranges instead of on ValueSet?

Following #3316 (comment) comment.

why do we want to have such method at all? I would hope the ValueSet instance is "as compact as possible", always

I agree, but such compactions may cause CBO to change the plan (since we estimate the statistics differently depending on the predicate being a range or a discrete set of values), so I preferred to keep the existing code behavior - and add predicate compaction only to DF-related code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, but such compactions may cause CBO to change the plan (since we estimate the statistics differently depending on the predicate being a range or a discrete set of values)

Is this something that could be improved on?

cc @sopel39

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this something that could be improved on?

IMO it should be, I don't think there should be code duality (DF vs CBO)

Enable it for integral types, to be used in DF.
@colebow
Copy link
Member

colebow commented Oct 19, 2022

👋 @rzeyde-varada - this PR has become inactive. If you're still interested in working on it, please let us know, and we can try to get reviewers to help with that.

We're working on closing out old and inactive PRs, so if you're too busy or this has too many merge conflicts to be worth picking back up, we'll be making another pass to close it out in a few weeks.

@colebow colebow closed this Nov 11, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

Simplify discrete domains for NOT IN
6 participants