Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework GroupByHash for faster performance and support grouping by nulls #790

Closed
alamb opened this issue Jul 28, 2021 · 48 comments · Fixed by #808
Closed

Rework GroupByHash for faster performance and support grouping by nulls #790

alamb opened this issue Jul 28, 2021 · 48 comments · Fixed by #808
Assignees
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Jul 28, 2021

Rationale

  1. The current GroupByHash operator does not take NULL into account and thus produces incorrect answers when grouping on columns that contain NULL, as described in Wrong results when grouping on a column with NULLs #782 and Wrong results when grouping with dictionary arrays with nulls  #781.
  2. Without additional changes, adding support for NULL in grouping will likely both slow down group by hashing as well as increase the memory overhead per group (e.g. see Remove GroupByScalar and use ScalarValue in preparation for supporting null values in GroupBy #786)

Thus this ticket proposes to rearrange the GroupByHash code to be more efficient in both space and time, thus providing us with a performance budget to add NULL support without an overall regression

Overview of Current GroupByHash

This section explains the current state of GroupByHash on master at 5416341

At a high level, the group by hash does the following for each input row, in a vectorized fashion:

  1. Compute the group by key values (the expressions that appear in the GROUP BY clause)
  2. Form a key out of the group by values
  3. Find/Create an entry in hash map of (key values) --> (accumulators)
  4. Update the accumulators (one for each aggregate, such as COUNT that appears in the query) with the arguments

When all the input has been processed, then the hash table is drained, producing one row for each entry in the hash table, in the following manner:

(group key1, group key2, ...) (aggregate1, aggregrate2, ...)

So for example, given a query such as

SELECT SUM(c1)
FROM t
GROUP BY k1, abs(k2)

This looks something like

 ┌────┐ ┌────┐               ┌────┐ ┌────┐         ┌─────────────┐
 │    │ │    │               │    │ │    │────────▶│  key 1234   │
 │    │ │    │               │    │ │    │         └─────────────┘
 │    │ │    │               │    │ │    │
 │    │ │    │               │    │ │    │         ┌─────────────┐
 │    │ │    │               │    │ │    │────────▶│   key 23    │
 │    │ │    │               │    │ │abs │         └─────────────┘
 │ k1 │ │ k2 │──────────────▶│ k1 │ │(k2)│
 │    │ │    │               │    │ │    │
 │    │ │    │               │    │ │    │        ...
 │    │ │    │               │    │ │    │
 │    │ │    │               │    │ │    │
 │    │ │    │               │    │ │    │         ┌─────────────┐
 │    │ │    │               │    │ │    │────────▶│   key 321   │
 └────┘ └────┘               └────┘ └────┘         └─────────────┘
                              group by key
  input data                     values
                  step 1:    (group_values)        step 2:
                 evaluate               create a variable sized hash
                 gby exprs                 table key for each row

The hash table that is formed looks something like this:

┌───────────────┐         ┌──────────────────┬────────────┬─────────────────┐
│┌─────────────┐│         │    Group Keys    │Accumulator │   row_indices   │
││  key 1234   ├┼────────▶│Box[GroupByScalar]│  for SUM   │    Vec<u32>     │
│└─────────────┘│         │ formed key 1234  │            │ (scratch space) │
│               │         └──────────────────┴────────────┴─────────────────┘
│               │
│     ...       │
│               │                  ...
│               │
│               │
│               │
│               │         ┌──────────────────┬────────────┬─────────────────┐
│┌─────────────┐│         │    Group Keys    │Accumulator │   row_indices   │
││   key 321   │├────────▶│Box[GroupByScalar]│  for SUM   │    Vec<u32>     │
│└─────────────┘│         │  formed key 321  │            │ (scratch space) │
└───────────────┘         └──────────────────┴────────────┴─────────────────┘
  hash table
"accumulators"


           Step 3:                   NOTE:  Each entry in the hash table has
 The keys are used to find an        1. The original group keys
   entry in the hash table           2. The accumulators
    which then are mapped            3. row_indexes scratch space

Key Formation

The current state of the art, introduced in 93de66a / apache/arrow#8863 by @Dandandan is quite clever. The code in create_key, packs data from the group keys together into a single mut Vec which is then used as the key for the hash table

For example, if the input row was:

{
  k1: "foo"
  k2: 0x1234 as u16
}

The resuling key is a 13 byte Vec, 11 bytes for "foo" (8 bytes for the length + 3 bytes for "foo") and 2 bytes for 0x1234, a 16 bit integer:

                        │        │
     string len                      0x1234
    (as usize le)       │  "foo" │    as le
┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
│00│00│00│00│00│00│00│03│"f│"o│"o│34│12│
└──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘   byte
 0  1  2  3  4  5  6  7 │8  9  10│11 12   offset

                        │        │

However, there are at least a few downsides of this approach:

  1. There is no way to represent NULL as mentioned by @Dandandan on Wrong results when grouping on a column with NULLs #782 (comment)
  2. The data for each group key value is currently stored twice -- once in the Vec key and once in the values as a GroupByScalar used to produce the output -- resulting in memory overhead, especially for variable length (e.g. string) values

Proposal

Modeled after what I think @Dandandan is suggesting in #786 (comment):

The HashTable would not store the key or aggregate accumulators directly, but instead would map "signatures" computed from the group by keys to list offsets in a mutable storage area that contained the values and aggregates.

The "signature" is simply a hash of the values (and validitiy bit) of the group key values. The term "signature" is used to avoid confusion with the hash used in the hash table. It would be computed as a u64 or similar directly from the group by key values

┌────┐ ┌────┐         ┌─────────────┐
│    │ │    │────────▶│   0x1133    │
│    │ │    │         └─────────────┘
│    │ │    │         ┌─────────────┐
│    │ │    │────────▶│   0x432A    │
│    │ │    │         └─────────────┘
│    │ │abs │
│ k1 │ │(k2)│
│    │ │    │         ...
│    │ │    │
│    │ │    │
│    │ │    │
│    │ │    │         ┌─────────────┐
│    │ │    │────────▶│   0x432A    │
└────┘ └────┘         └─────────────┘
 group by key
    values
(group_values)

Step 2: Create a FIXED LENGTH signature
  (e.g. u64) by hashing the values in
            the group by key

The hashtable composition would be different. Each entry is a SmallVec (non allocating Vec) containing a list of indicies into a "mutable storage" area

┌───────────────┐
│┌─────────────┐│    ┌───────┐                       ┌──────┐┌──────┐ ┌────────────┐
││   0x1133    ├┼───▶│  [1]  │─ ─ ─ ─ ─ ─            │      ││      │ │            │
│└─────────────┘│    └───────┘           └ ─ ─ ─ ─ ─ │      ││      │ │            │
│               │                                    │  k1  ││ abs  │ │            │
│               │                         ─ ─ ─ ─ ─ ▶│values││ (k2) │ │            │
│     ...       │                        │           │      ││values│ │Accumulators│
│               │       ...                          │ (and ││      │ │  for SUM   │
│               │                        │           │valid ││ (and │ │            │
│               │                         ─ ─ ─ ─ ─ ▶│mask) ││valid │ │            │
│               │                        │           │      ││mask) │ │            │
│               │                                    │      ││      │ │            │
│┌─────────────┐│    ┌───────┐           │           │      ││      │ │            │
││   0x432A    │├───▶│ [2,4] │─ ─ ─ ─ ─ ─            └──────┘└──────┘ └────────────┘
│└─────────────┘│    └───────┘
└───────────────┘               values are lists
 keys are gby key               (SmallVec) of         mutable storage
 signatures                     offsets into
                                storage tables
hashtable

Edit: Collisions are handled by the fact that the entry in the hash table is a list of indices into the mutable storage area -- if there are multiple values in that list each entry in the mutable area needs to be checked for equality to find the correct one.

The mutable storage area contains:

  1. A Vec of ScalarValues for each group key column
  2. The Vec of accumulators for each grouping

For example, this is one example of how (logically) this mutable storage would work

        valid          valid
         bit            bit
        mask           mask
 ┌────┐┌────┐   ┌────┐┌────┐   ┌────┐
 │"D" ││ t  │   │ 1  ││ t  │   │ 11 │
 ├────┤├────┤   ├────┤├────┤   ├────┤
 │"C" ││ t  │   │ 3  ││ t  │   │ 3  │
 ├────┤├────┤   ├────┤├────┤   ├────┤
 │"A" ││ t  │   │ 1  ││ t  │   │ 27 │
 ├────┤├────┤   ├────┤├────┤   ├────┤
 │"D" ││ t  │   │ 2  ││ t  │   │ 2  │
 ├────┤├────┤   ├────┤├────┤   ├────┤
 │ "" ││ t  │   │ 0  ││ f  │   │ 4  │
 └────┘└────┘   └────┘└────┘   └────┘

   group by key storage      Accumulator
        (5 groups)             for SUM
                            (aggregates)


   Example showing how groups
  (D,1), (C,3), (A,1), (D,2),
      (NULL, 0) are stored

I propose using Vec<ScalarValue> to store the group key values in the mutable area as there is no equivalent of a mutable Array in arrow-rs yet (though I think there is MutablePrimitiveArray in arrow2). If/when we get access to a mutable array in datafusion, we can potentially switch to using that representation for the mutable storage area, which would likely both take less memory for some data types, but also allow for faster output generation.

Alternatives considered

One alternate that would require fewer changes but be slightly slower would be to append a validity bitmap on the end of both the keys and values in the hash table. For example

      Alternate Design
 ┌───────────────┐         ┌──────────────────┬────────────┬─────────────────╦ ═ ═ ═ ═ ═ ═ ═ ═ ╗
 │┌─────────────┐│         │    Group Keys    │Accumulator │   row_indices   │  Null Bitmask
 ││  key 1234   ├┼────────▶│Box[GroupByScalar]│  for SUM   │    Vec<u32>     ║      (NEW)      ║
 │└─────────────┘│         │ formed key 1234  │            │ (scratch space) │
 │               │         └──────────────────┴────────────┴─────────────────╩ ═ ═ ═ ═ ═ ═ ═ ═ ╝
 │               │
 │     ...       │
 │               │                  ...
 │               │
 │               │
 │               │
 │               │         ┌──────────────────┬────────────┬─────────────────╦ ═ ═ ═ ═ ═ ═ ═ ═ ╗
 │┌─────────────┐│         │    Group Keys    │Accumulator │   row_indices   │  Null Bitmask
 ││  key 3211   │├────────▶│Box[GroupByScalar]│  for SUM   │    Vec<u32>     ║      (NEW)      ║
 │└─────────────┘│         │ formed key 3211  │            │ (scratch space) │
 └───────────────┘         └──────────────────┴────────────┴─────────────────╩ ═ ═ ═ ═ ═ ═ ═ ═ ╝

And the keys would have a null bitmask appended on the end:

                                           │        │
                        string len                      0x1234
                       (as usize le)       │  "foo" │    as le
{                  ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──╦ ═
  k1: "foo"        │00│00│00│00│00│00│00│03│"f│"o│"0│34│12│00║◀ ─ ─
  k2: 0x1234u16    └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──╩ ═      │
}                   0  1  2  3  4  5  6  7 │8  9  10│11 12 13
                                                                   │
                                           │        │
                                                                   │

                                                            New bitmask at
                                                             end of each
                                                                 key

@Dandandan
Copy link
Contributor

Dandandan commented Jul 28, 2021

Yes, this (great) explanation / overview matches my suggestion. Thanks for the write up!! Somehow the ASCII art doesn't render well here?

A couple of points:

  • Great idea to use a Vec<ScalarValue> first. It seems it should be quite easy to swap that to a typed /mutable array alternative later, and probably involves less changes now

  • To convert the group by values it can re-use the create_hashes function from the hash join. This way it will also benefit from improvements there and is calculated on (multiple) Arrays directly.

  • Before implementing this proposal it is I think be possible with minimal changes to also to include a null part in the hashmap key, so the key would become something like (bool, Vec<u8>) as hashmap key. A null key would have (False, vec![]) as value, an actual value would contain a True and the bytes of the contents.
    That will be a bit slower and consume more memory, but at least allows to group on nulls too.

@jorgecarleitao
Copy link
Member

Great proposal.

From the hashing side, an unknown to me atm is how to efficiently hash values+validity. I.e. given V = ["a", "", "c"] and N = [true, false, true], I see some options:

  • hash(V) ^ !N + unique * N where unique is a unique sentinel value exclusive for null values. If hash is vectorized, this operation is vectorized.

  • concat(hash(value), is_valid) for value, is_valid in zip(V,N)

  • split the array between nulls and not nulls, i.e. N -> (non-null indices, null indices), perform hashing over valid indices only, and then, at the very end, append all values for the nulls. We do this in the sort kernel, to reduce the number of slots to perform comparisons over.

If we could write the code in a way that we could "easily" switch between implementations (during dev only, not a conf parameter), we could bench whether one wins over the other, or under which circumstances.

Regardless, nulls in the group by are so important that IMO any is +1 at this point xD

@alamb
Copy link
Contributor Author

alamb commented Jul 28, 2021

Before implementing this proposal it is I think be possible with minimal changes to also to include a null part in the hashmap key, so the key would become something like (bool, Vec) as hashmap key

@Dandandan I think this is a version of what I was trying to explain in the "Alternatives" considered section. I think we need an entire validity mask (as only some of the group keys might be null). Also, we would need to keep validity in the hash map's values to produce the correct output , but we could get that "for free" switching to ScalarValue ala #788

@alamb
Copy link
Contributor Author

alamb commented Jul 28, 2021

From the hashing side, an unknown to me atm is how to efficiently hash values+validity. I.e. given V = ["a", "", "c"] and N = [true, false, true],

@jorgecarleitao -- excellent point. I have some idea of how to potentially benchmark these / keep the code separate to allow switching in different implementations. Thank you for the options (I was planning on doing the first thing you suggested, but it is good to think about the others.

@Dandandan
Copy link
Contributor

Before implementing this proposal it is I think be possible with minimal changes to also to include a null part in the hashmap key, so the key would become something like (bool, Vec) as hashmap key

@Dandandan I think this is a version of what I was trying to explain in the "Alternatives" considered section. I think we need an entire validity mask (as only some of the group keys might be null). Also, we would need to keep validity in the hash map's values to produce the correct output , but we could get that "for free" switching to ScalarValue ala #788

Yes, that's right. I missed the part in the alternatives, but that should work I think👍

@jhorstmann
Copy link
Contributor

One thing I'd like to benchmark is the storing of indices as part of the map values. I see that this can make updating the accumulators more efficient since we can use update_batch instead of update(ScalarValue), but collecting those indices, using the take kernel and array.slice also adds some additional overhead, especially with a larger number of groups. So I'm wondering whether directly updating the accumulators row by row would be much simpler and also not much slower.

The overhead in that approach would be in constructing and matching ScalarValue. All Accumulators except DistinctCount only work with numbers as input, so in most cases that wouldn't require allocation. Overhead could could maybe further be reduced by creating typed accumulators (via a macro). Instead of a SumAccumulator that has to match on all possible variants there could be separate Float64Sum, Int64Sum, UInt64Sum and so on. Not sure how much benefit that would bring though.

I have been experimenting with another completely different approach since some time, although with a much simplified type system and also without considering null values yet. And it would require access to the whole input data and might be difficult to adapt to work with batches.

The main idea is to use a hashmap that only maps input row numbers to consecutive integers that identify which group that row belongs to. So for an input like [a, b, b, c, b, a] these indices would be [0, 1, 1, 2, 1, 0]. We also keep track of the indices where each group first appeared in the input, that would be [0, 1, 3] in this example, and this can then be used to create the arrays for the group by columns. In this example there is only one group by column, but it also works with multiple. Ideally the hash values for the group by arrays would be calculated up front in a vectorized way, the biggest overhead then is the equals function that has too look up two indices in the group by arrays and needs to compare their values.

Once the indices are calculated as described above, we can do the aggregation. For each aggregate there is a generically typed implementation that takes the input array for that aggregation and the indices as parameter. It created a typed Vec of Accumulators, the length corresponding to the number of groups, and then iterators through indices and inputs and updates the accumulator at that index. This does a lot of random access, but there are no runtime typechecks or dynamic dispatch needed inside the inner loop.

Another benefit of the approach is that there can be separate optimized versions for the indices part, for example when grouping by a single dictionary encoded array.

@jhorstmann
Copy link
Contributor

More concretely regarding the proposal, how exactly the signature works is still a bit unclear to me. If it has a fixed length and is calculated somehow, then there is a possibility of collisions. If it is supposed to be an integer sequence then we'd need another hashmap to create it.

To fix the immediate problem of null values, I would try to encode them inline into the Vec<u8>, for example by prepending a 0 or 1 byte before the bytes of each group by column. The bytes for non-valid entries then also need to be set to a default value for that type. Maybe there is a smart way to omit the bytes for these non-valid entries, but care is needed to ensure no two different keys get the same encoded bytes.

I think in the current group by implementation, this vector is fully responsible for the equals and hashcode. That means the GroupByScalar implementation for Eq and Hash are not really used, and we could replace that with ScalarValue. The creation of this ScalarValue is already only happening when a new entry needs to be inserted into the hashmap.

@alamb
Copy link
Contributor Author

alamb commented Jul 28, 2021

More concretely regarding the proposal, how exactly the signature works is still a bit unclear to me. If it has a fixed length and is calculated somehow, then there is a possibility of collisions.

@jhorstmann the idea was that the "signature" is just a hash of the values. Collisions are handled by the fact that the entry in the hash table is a list of indicies into the mutable area -- if there are multiple values in that list each entry in the mutable area needs to be checked for equality to find the correct one. This was not very clear in the writeup and I apologize.

To fix the immediate problem of null values, I would try to encode them inline into the Vec,

I agree this is a good plan (and I think it is similar to what I was trying to describe in the alternative section). I plan to try and code this version shortly so we have something to compare against.

That means the GroupByScalar implementation for Eq and Hash are not really used, and we could replace that with ScalarValue.

Indeed -- I even have a PR which proposes exactly such a change: #786 :)

@rdettai
Copy link
Contributor

rdettai commented Jul 29, 2021

Nice write-up and very interesting discussions!

  • By feeding the signature as a key to the HashMap, are we not hashing the original key twice? I guess this can easily be solved by setting the identity function instead of the default hasher on the HashMap 😃
  • Too bad we cannot index into the builder, that would allow us to build the target arrow columns strait away. Can't wait for the switch to arrow2 😉

@Dandandan
Copy link
Contributor

Dandandan commented Jul 29, 2021

Nice write-up and very interesting discussions!

  • By feeding the signature as a key to the HashMap, are we not hashing the original key twice? I guess this can easily be solved by setting the identity function instead of the default hasher on the HashMap 😃

Yes, that's also what we currently do for the hash join algorithm. It's a small performance win. It also avoids the higher re-hashing cost when growing the hashmap.
The cost of hashing u64 was already way smaller though than hashing /re-hashing a complex nested key.

I believe a hashmap could also implemented manually using a Vec and a number of buckets, when I tried it was slower, I think as the HashMap itselfs is quite fast for collision checks (maybe when speeding them up / vectorizing that part I will try this again).

@Dandandan
Copy link
Contributor

Two related issues that will might be resolved after implementing this (with some discussion):

#418
#26

@alamb alamb changed the title Rework GroupByHash to support grouping by nulls Rework GroupByHash to for faster performance and support grouping by nulls Aug 1, 2021
@alamb alamb changed the title Rework GroupByHash to for faster performance and support grouping by nulls Rework GroupByHash for faster performance and support grouping by nulls Aug 1, 2021
@jhorstmann
Copy link
Contributor

Collisions are handled by the fact that the entry in the hash table is a list of indicies into the mutable area -- if there are multiple values in that list each entry in the mutable area needs to be checked for equality to find the correct one.

I don't really see how we can to this equality checks faster than the hashmap itself could do it, but I would be very happy to be proven wrong.

For optimized eq and hash handling, hashbrown has some more low-level functionality that we are not yet using:

  • from_hash(hash, eq_fn): allows looking up an entry with a given hash and equals function, bypassing the hashbuilder of the map. This allows implementing eq on values that are not directly stored in the map, for example if only an index into another structure is part of the key.
  • insert_with_hasher(hash, key, value, hasher): Again allows bypassing the hashbuilder and looking up the hash value in some other structure. The hasher parameter is only called if the table needs rehashing.

@alamb
Copy link
Contributor Author

alamb commented Aug 2, 2021

This allows implementing eq on values that are not directly stored in the map, for example if only an index into another structure is part of the key

Thank you for the pointers @jhorstmann -- I think this is basically exactly what the design is trying to do. I will study the hashbrown api some more and see if I can use it.

@Dandandan
Copy link
Contributor

Collisions are handled by the fact that the entry in the hash table is a list of indicies into the mutable area -- if there are multiple values in that list each entry in the mutable area needs to be checked for equality to find the correct one.

I don't really see how we can to this equality checks faster than the hashmap itself could do it, but I would be very happy to be proven wrong.

For optimized eq and hash handling, hashbrown has some more low-level functionality that we are not yet using:

  • from_hash(hash, eq_fn): allows looking up an entry with a given hash and equals function, bypassing the hashbuilder of the map. This allows implementing eq on values that are not directly stored in the map, for example if only an index into another structure is part of the key.
  • insert_with_hasher(hash, key, value, hasher): Again allows bypassing the hashbuilder and looking up the hash value in some other structure. The hasher parameter is only called if the table needs rehashing.

from_hash might be indeed interesting to explore.

Some more info of why collision might be not that bad, even with an alternative approach:

  • Based on good quality u64 hashes, I found the remaining number collisions to be very low, as most of the collisions (with different u64 but within the same hashmap "bucket") are already handled in the hashmap implementation. I had to implement some tests with fixed hashes to trigger hash collisions in the hash join algorithm as the current approach already gave the correct answers on all of the unit / integration tests without detecting collisions, e.g. each key maps to a different u64 hash value in the tests.
  • For hash join there are some known ways to do hash collisions in a vectorized way e.g. see this article https://www.cockroachlabs.com/blog/vectorized-hash-joiner/ . this might be similar for hash aggregations

@alamb
Copy link
Contributor Author

alamb commented Aug 4, 2021

Summary

TLDR; If it is ok to change create_hashes to use std::hash::Hasher to be consistent with ScalarValue::hash I believe we can avoid "signatures" and hashbrown's HashMap directly. This will:

  1. Handle hash collisions without any datafusion specific code
  2. Hash key values once for each input row (and once for each new group key)
  3. Copy key valyes once for each group

Discussion

Based on the great suggestion from @jhorstmann, I spent some time playing with hashbrown's from_hash, to try and:

  1. Avoid double hashing as mentioned by @rdettai Rework GroupByHash for faster performance and support grouping by nulls #790 (comment)
  2. Let hashbrown handle collisions, as mentioned by @jhorstmann: Rework GroupByHash for faster performance and support grouping by nulls #790 (comment)
  3. Avoid the storage overhead of storing a synthetic key / signature

The approach was to create a HashMap keyed off a list of ScalarValues, similar to the following (we can't actually use Vec as explained later)

HashMap<Vec<ScalarValue>, GroupByState>

And then use the hashbrown API to avoid creating Vec<ScalarValue> values on each row to lookup the correct entry.

I made a cut down example showing how this works here: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=3c5e4356d83b3a5e2221dc82eb3f4ebf.

In order to use this approach, the following changes are needed, as shown in the example:

  1. Change create_hashes to be consistent with the impl std::hash::Hash for ScalarValue (more details below)
  2. Add a function to compare the values of &[ArrayRef] at a row with the values of &[ScalarValue] without creating ScalarValues
  3. Add a wrapper around [ScalarValue] so that we can ensure the hash value is consistent with the hash value created from the arrays.

The second two are straightforward. The first may require some discussion

Consistent Hashing

Ratonale

Sadly, for some reason even when using the with_hash HashBrown API, the hash value of the key is computed with std::hash::Hash. This means the implementation of create_hashes must be consistent with ScalarValue::hash.

You can see the problem by switching the example to create_hash_existing in the example (which mirrors how create_hash works today) in which case the matching key is not found in the hash table and the group for (1, 10) is not aggregated

Changes needed

To illustrate the change that is needed, here is a highly simplified version of how create_hash is implemented today

fn create_hash_existing(arrays: &[Array], row: usize, random_state: &RandomState) -> u64 {
    let mut hash: u64 = 0;
    for array in arrays {
        let row_hash = u64::get_hash(&array.values[row], random_state);
        hash = combine_hashes(hash, row_hash);
    }
    hash
}

Here is how a consistent version that uses the std::hash::Hasher API is implemented

fn create_hash_working(arrays: &[Array], row: usize, random_state: &RandomState) -> u64 {
    let mut hasher = random_state.build_hasher();
    for array in arrays {
        hasher.write_u64(array.values[row]);
    }
    hasher.finish()
}

We also need to sort out how to hash null values, which are handled differently in create_hashes (not hashed) and ScalarValue (hashed). @jorgecarleitao has some good thoughts on the topic of hashing nulls here: #790 (comment)

@alamb
Copy link
Contributor Author

alamb commented Aug 4, 2021

@Dandandan if you have a moment, I would like to know if you have any concerns with the "change create_hashes" function item above, before I spend significant time on it

@jhorstmann
Copy link
Contributor

Thanks, great writeup and nice example. Consistent hashes between Arrays and Scalar sound very nice but I think will require an extensive test suite for all data types so it doesn't get broken accidentally.

An alternative could be to store the calculated hash also in the GroupKey, but that would increase the size of each key and decrease cache locality of the map. Rehashing would become faster, but that operation would ideally be quite rare.

For reference, my experiments with grouping by mapping each input row to a consecutive integer used hashbrown like this:

fn hash_to_group_indices(
    columns: &[Column],
    len: usize,
    group_by: &[usize],
) -> (Vec<usize>, Vec<usize>) {
    let mut hashes: Vec<u64> = vec![0; len];
    hash(columns, group_by, hashes.as_mut_slice());

    // row -> group idx
    let mut map: HashMap<usize, usize> = HashMap::with_capacity(DEFAULT_MAP_CAPACITY);

    // index of the group that each row belongs to
    let indices: Vec<usize> = hashes.iter().enumerate().map(|(i,  hash)| {
        let next = map.len();

        match map.raw_entry_mut().from_hash(*hash, |j| columns_eq(columns, group_by, i, *j)) {
            RawEntryMut::Occupied(entry) => *entry.get(),
            RawEntryMut::Vacant(entry) => {
                *entry.insert_with_hasher(*hash, i, next, |j| hashes[*j]).1
            }
        }
    }).collect();

    // index where each group first appeared in the input 
    let mut key_indices = map.keys().copied().collect::<Vec<usize>>();
    key_indices.sort_unstable();

    (indices, key_indices)
}

Since the hashmap key is just an index into the input columns, the insert_with_hasher can look up the hash value based on that index. The hash value for the new entry is provided as a parameter, the hasher closure is only used for rehashing. The example requires access to the whole data though, this unfortunately won't work when processing a batch at a time.

There is also a method insert_hashed_no_check, which allows specifying the hash value for the new entry, but would still use the default hash implementation for rehashing. That would remove one hash calculation in your code.

@alamb
Copy link
Contributor Author

alamb commented Aug 4, 2021

Consistent hashes between Arrays and Scalar sound very nice but I think will require an extensive test suite for all data types so it doesn't get broken accidentally.

I agree the testing would be key here, and I am willing to write such tests.

An alternative could be to store the calculated hash also in the GroupKey,

I tried to do this but I couldn't figure out how to do so using thestd::hash::Hash API. I didn't find some way to return the hash value directly, only to update the intermediate value of a Hasher

insert_hashed_no_check is a good one, though I think it still requires that consistency between create_hash and ScalarValue::hash for the case of collisions, right?

For reference, my experiments with grouping by mapping each input row to a consecutive integer used hashbrown like this:

Yes that is a cool idea. I wonder if we could use something like that as an initial partial aggregate pass: we would initially aggregate each batch partially as you describe and then update the overall aggregates from the partials.

@jhorstmann
Copy link
Contributor

I tried to do this but I couldn't figure out how to do so using thestd::hash::Hash API. I didn't find some way to return the hash value directly, only to update the intermediate value of a Hasher

You're right, coming from a Java background I always forget that this works very differently in Rust.

It should work with insert_with_hasher though since that callback gets the key as a parameter and returns an u64.

insert_hashed_no_check is a good one, though I think it still requires that consistency between create_hash and ScalarValue::hash for the case of collisions, right?

Yes, that is my understanding. The method is probably really a bit dangerous since inconsistencies will then only show up when the map gets resized.

Yes that is a cool idea. I wonder if we could use something like that as an initial partial aggregate pass: we would initially aggregate each batch partially as you describe and then update the overall aggregates from the partials.

That is also my idea, maybe not per batch, but to aggregate bigger partitions in parallel and then merging the results. The main benefit is not the hashing though, but that updating the accumulators can be done with a generic function instead of dynamic dispatch.

@Dandandan
Copy link
Contributor

@Dandandan if you have a moment, I would like to know if you have any concerns with the "change create_hashes" function item above, before I spend significant time on it

I will try to have a better look at this later. The first feeling I have is that the example/proposal is:

  • More row-based than the create_hashes as it is today. The important part of a vectorized hashing is that the inner loop should be on on the same array with the same type, and not have to move memory locations and move to different parts of the code for hashing each row.
  • Creating/keeping the more complex GroupKey created per row, making creation of the keys (allocation per key / not cache friendly) and making re-hashing of the key more expensive (no simple or even identity function as hash)
  • Harder to be further vectorized. My belief is that using the Rust HashMap is not really the end state of the hash join and hash aggregate, but an easier way to implement it.

It might be still an improvement over the current state (for hash aggregate), it looks like it simplifies some parts.

@alamb
Copy link
Contributor Author

alamb commented Aug 5, 2021

I got enough of the approach described by @Dandandan in #790 (comment) working in https://github.com/apache/arrow-datafusion/pull/808/files to take some benchmarks.

Results look as follows:

(arrow_dev) alamb@MacBook-Pro:~/Software/arrow-datafusion$ critcmp gby_new gby_new2 master1 master2
group                                                gby_new                                gby_new2                               master1                                 master2
-----                                                -------                                --------                               -------                                 -------
aggregate_query_group_by                             1.01      2.9±0.18ms        ? ?/sec    1.00      2.9±0.16ms        ? ?/sec    1.05      3.0±0.20ms        ? ?/sec     1.17      3.4±0.39ms        ? ?/sec
aggregate_query_group_by_u64 15 12                   1.00      2.8±0.09ms        ? ?/sec    1.04      3.0±0.28ms        ? ?/sec    1.07      3.0±0.34ms        ? ?/sec     1.12      3.2±0.28ms        ? ?/sec
aggregate_query_group_by_with_filter                 1.02      2.1±0.09ms        ? ?/sec    1.02      2.1±0.08ms        ? ?/sec    1.00      2.0±0.06ms        ? ?/sec     1.02      2.1±0.16ms        ? ?/sec
aggregate_query_group_by_with_filter_u64 15 12       1.02      2.0±0.09ms        ? ?/sec    1.03      2.0±0.09ms        ? ?/sec    1.04      2.0±0.14ms        ? ?/sec     1.00  1973.5±90.65µs        ? ?/sec
aggregate_query_no_group_by 15 12                    1.04  1201.4±42.15µs        ? ?/sec    1.00  1152.4±25.40µs        ? ?/sec    1.03  1190.9±51.39µs        ? ?/sec     1.10  1268.5±258.16µs        ? ?/sec
aggregate_query_no_group_by_count_distinct_narrow    1.01      5.5±0.23ms        ? ?/sec    1.02      5.5±0.24ms        ? ?/sec    1.00      5.4±0.35ms        ? ?/sec     1.13      6.1±0.61ms        ? ?/sec
aggregate_query_no_group_by_count_distinct_wide      1.00      7.5±0.44ms        ? ?/sec    1.01      7.6±0.36ms        ? ?/sec    1.03      7.8±0.62ms        ? ?/sec     1.00      7.5±0.35ms        ? ?/sec
aggregate_query_no_group_by_min_max_f64              1.02  1191.2±61.07µs        ? ?/sec    1.00  1171.9±92.30µs        ? ?/sec    1.09  1279.7±145.46µs        ? ?/sec    1.01  1180.7±89.39µs        ? ?/sec

The good news is that we didn't slow down; The not so good news is that it isn't much faster either, though this may be related to the specific benches used. I will see if I can run the tpch benchmarks and do some profiling of where time is being spent.

Also, in the "good news" category is that I ran out of RAM trying to find hash collisions -- lol. I have an idea to fix the hashing function for the test but the current create_hashes function seems pretty good at avoiding collisions on u64 values.

Benchmarks run like this:

cargo bench --bench aggregate_query_sql -- --save-baseline gby_new

@Dandandan
Copy link
Contributor

Cool. I am doing some tests with db-benchmark, which includes some more challenging queries.

@Dandandan
Copy link
Contributor

Dandandan commented Aug 6, 2021

Some good news, this is quite like mostly an improvement on the (more challenging) db-benchmark aggregates.

Master:

q1 took 37 ms
q2 took 325 ms
q3 took 1431 ms
q4 took 56 ms
q5 took 1287 ms
q7 took 1304 ms
q10 took 9380 ms

PR:

q1 took 36 ms
q2 took 360 ms
q3 took 1002 ms
q4 took 67 ms
q5 took 984 ms
q7 took 924 ms
q10 took 4024 ms

Performance of q10 is improved > 2x!
It looks like q2 and q4 are slightly slower (repeatedly), but I think the overall improvement is too big to matter that much.

@Dandandan
Copy link
Contributor

Dandandan commented Aug 6, 2021

image

Some profiling output of the gby_null_new branch.

I think it shows that:

  • There is quite some time spent around creating/dropping/mutating/comparing ScalarValue and converting it to an Array again.
    The same applies for the "accumulators". I think the "keep values/state in typed contiguous arrays" could mostly remove this overhead.
  • slicing / taking arrays takes some time and some intermediate Vec creation. This relates to the discussion to update states by index instead of grouping them in the aggregation.

@alamb
Copy link
Contributor Author

alamb commented Aug 6, 2021

Thanks @Dandandan -- I think that some of the time creating/dropping/comparing ScalarValue will go away when I complete the optimized implementation for all types inScalarValue::eq_array -- as of now, grouping on any type other than Utf8, UInt64, F32 or F64 will take the slow path.

I also did some some profiling with the tpch q1 (which has a group by on two keys and no joins); My conclusion from that exercise is that this approach about the same speed as the one on master

Profiling command:

cargo run --release --bin tpch -- benchmark datafusion --iterations 10 --path ./data --format tbl --query 1 --batch-size 10000

On master, Query 1 avg time: 2874.42 ms

On the gby_null_new branch Query 1 avg time: 2904.64 ms

Which is well within the error bounds of my measurement setup.

My profiling suggests that Q1 spends 85% of the time parsing CSV data and approximately 15% of the time doing the aggregation. Of that 15%, 10% is in create_hashes and 3% is looking up in the hash table.

My next steps are is going to be:

  1. create a benchmark program that is not IO bound with string / string dictionary grouping keys for which I think this approach will be most beneficial
  2. Complete the optimized implementation of ScalarValue::eq_array, after which perhaps you can rerun your benchmarks and we can see if they get better

@Dandandan
Copy link
Contributor

Dandandan commented Aug 6, 2021

Yeah Q1 from TCP-H is comparable to the Q1 of the db-benchmark: very little unique keys. The performance improvement listed above (db-benchmark q10 and others) is for a lot of groups which have some other hot paths. I think this benchmark only uses those types you mentioned, so I don't expect anything from that. There are a few other queries in TCP-H that are a bit more challenging on the aggregation side (q3 maybe?), but the benchmarks are all relatively easy as a lot of the data is filtered out on read and aggregate on big groups.

@Dandandan
Copy link
Contributor

I see the benchmark also uses the int32 type, so would be interesting to see whether that will speed up things.

@jhorstmann
Copy link
Contributor

@alamb Some time ago I added the --mem-table parameter to the tpch binary to load the data into memory before running the queries, to give more reproducible times.

@alamb
Copy link
Contributor Author

alamb commented Aug 6, 2021

@alamb Some time ago I added the --mem-table parameter to the tpch binary to load the data into memory before running the queries, to give more reproducible times.

@jhorstmann I was dreaming about just such a feature this morning! That is an excellent idea I will given it a try later today!

@alamb
Copy link
Contributor Author

alamb commented Aug 6, 2021

@alamb Some time ago I added the --mem-table parameter to the tpch binary to load the data into memory before running the queries, to give more reproducible times.

@jhorstmann -- I tried this and now the queries run too fast on SF 1 (70ms) lol -- I am trying to make SF10 data to see if that is reasonable enough for me to profile.

@Dandandan
Copy link
Contributor

Dandandan commented Aug 6, 2021

@alamb Some time ago I added the --mem-table parameter to the tpch binary to load the data into memory before running the queries, to give more reproducible times.

@jhorstmann -- I tried this and now the queries run too fast on SF 1 (70ms) lol -- I am trying to make SF10 data to see if that is reasonable enough for me to profile.

Increasing the number of partitions -n 16 even speed them up even further, given enough cores.

@alamb
Copy link
Contributor Author

alamb commented Aug 6, 2021

TLDR; I did some more testing and with a synthetic best-case harness and I can see a small (8%) improvement for grouping on utf8 key data (average of 22 characters each). While not as much as I had hoped it is good enough for me to polish this up and get it ready for code review.

Ok, I am out of time for the day but I will continue tomorrow.

Details: I whipped up a custom harness that is entirely CPU bound (feeds the same record batch in multiple times)

Roughly speaking, it tests grouping on 1 billion Rows, with 100 distinct keys calculating a COUNT() and AVG(f64) aggregate

I currently have single column group keys of type: each of int64 utf8 and dict.

master:

Completed query in 6.674159987s
Completed query in 7.769550938s
Completed query in 7.467556697s
Completed query in 7.452186844s
Completed query in 7.379664866s
---------------
Completed 5 iterations query in 36.743119332s 136079899.88061365 rows/sec

on gby_null_new / #808

Completed query in 6.438788872s
Completed query in 6.55647331s
Completed query in 6.716707907s
Completed query in 6.931733327s
Completed query in 6.68060826s
---------------
Completed 5 iterations query in 33.324311676s 150040608.44866526 rows/sec

This gives an idea of what is being tested:

Benchmarking select utf8_key, count(*), avg(f64) from t group by utf8_key
  100000 batches of 10000 rows = 1000000000 total rows
explain select utf8_key, count(*), avg(f64) from t group by utf8_key
+---------------+---------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                          |
+---------------+---------------------------------------------------------------------------------------------------------------+
| logical_plan  | Projection: #t.utf8_key, #COUNT(UInt8(1)), #AVG(t.f64)                                                        |
|               |   Aggregate: groupBy=[[#t.utf8_key]], aggr=[[COUNT(UInt8(1)), AVG(#t.f64)]]                                   |
|               |     TableScan: t projection=Some([1, 3])                                                                      |
| physical_plan | ProjectionExec: expr=[utf8_key@0 as utf8_key, COUNT(UInt8(1))@1 as COUNT(UInt8(1)), AVG(t.f64)@2 as AVG(f64)] |
|               |   HashAggregateExec: mode=FinalPartitioned, gby=[utf8_key@0 as utf8_key], aggr=[COUNT(UInt8(1)), AVG(f64)]    |
|               |     CoalesceBatchesExec: target_batch_size=4096                                                               |
|               |       RepartitionExec: partitioning=Hash([Column { name: "utf8_key", index: 0 }], 16)                         |
|               |         HashAggregateExec: mode=Partial, gby=[utf8_key@0 as utf8_key], aggr=[COUNT(UInt8(1)), AVG(f64)]       |
|               |           RepartitionExec: partitioning=RoundRobinBatch(16)                                                   |
|               |             RepeatExec repeat=100000                                                                          |
+---------------+---------------------------------------------------------------------------------------------------------------+

@Dandandan
Copy link
Contributor

Dandandan commented Aug 7, 2021

Great! Maybe it would also be good to add a benchmark that groups on unique UTF-8 keys (1 group per key), to test a workload like deduplication. Probably there is a much larger gain there, like in the db-benchmark case.

@alamb
Copy link
Contributor Author

alamb commented Aug 7, 2021

@Dandandan how do you run db-benchmark against datafusion? Are you using h2oai/db-benchmark#182

@Dandandan
Copy link
Contributor

Yes.

To generate the data:

Rscript _data/groupby-datagen.R 1e7 1e2 0 0

Then change the implementation in Cargo.toml

datafusion = { git = "https://github.com/alamb/arrow-datafusion", branch="alamb/gby_null_new", features = ["simd"]}

And running:

SRC_DATANAME=G1_1e7_1e2_0_0 RUSTFLAGS='-C target-cpu=native' cargo +nightly run --release

@Dandandan
Copy link
Contributor

Dandandan commented Aug 7, 2021

My results on the latest version.

q1 took 36 ms
q2 took 358 ms
q3 took 998 ms
q4 took 50 ms
q5 took 983 ms
q7 took 911 ms
q10 took 4075 ms

q4 is improved in the latest version compared to earlier (it used a int32 column to group on). q2 still looks a bit (~10%) slower.

The query is: SELECT id1, id2, SUM(v1) AS v1 FROM tbl GROUP BY id1, id2 (id1, id2 are utf8, v1 is an int32)

I wondering whether this comment https://github.com/apache/arrow-datafusion/pull/808/files#r683975473 might help a bit as it does some additional cloning of ScalarValues.

Another cause coulde be that hashing and comparing one Vec<u8> might be faster than hashing two single strings and combining them afterwards (however I would have expect the extra copying / rehashing to be worse than the single cost of hashing itself)

@Dandandan
Copy link
Contributor

Dandandan commented Aug 7, 2021

Some profiling results reveal that a large part of the time now is spent in ScalarValue::iter_to_array (and related calls inside the aggregation code and eq_array.

The first one could be partly resolved by updating/introducing a method that takes &ScalarValue values instead of an owned one which requires some more clones. That will require a lot of updating though probably, around the accumulators, etc.

And it can get away when the actual array contents could be stored in an array directly from the start, which I think is the more longer term plan.

@sundy-li
Copy link
Contributor

sundy-li commented Aug 9, 2021

Another cause coulde be that hashing and comparing one Vec might be faster than hashing two single strings and combining them afterwards (however I would have expect the extra copying / rehashing to be worse than the single cost of hashing itself)

Introduce the variant hash methods would help in this case.
E.G:

Query which group by 3 columns, which are [u8, u8, u16], a fixed hash key U32 will be enough.

  1. We can allocate one large fixed memory than multiple vec allocate.
  2. The fixed key saves the hash map memory size.

Refer:
https://github.com/datafuselabs/datafuse/blob/master/common/datablocks/src/kernels/data_block_group_by.rs#L17-L36

https://github.com/datafuselabs/datafuse/blob/master/common/datablocks/src/kernels/data_block_group_by_hash.rs#L264-L274

And I think in OLAP system, the hash key should always be unique (No rehashing exists).

@alamb
Copy link
Contributor Author

alamb commented Aug 9, 2021

And I think in OLAP system, the hash key should always be unique (No rehashing exists).

Thanks for the pointer @sundy-li . I agree the the value being hashed is always unique. I do think in very rare cases it is possible for multiple different input values to produce the same output hash value -- aka collisions do happen.

The first one could be partly resolved by updating/introducing a method that takes &ScalarValue values instead of an owned one which requires some more clones. That will require a lot of updating though probably, around the accumulators, etc.

And it can get away when the actual array contents could be stored in an array directly from the start, which I think is the more longer term plan.

@Dandandan yes, I think building up the actual array content (rather than using ScalarValue would be the best approach here). I tried to rework the output creation code to avoid copying ScalarValue (by taking ownership) but I couldn't figure out how to do it as the table is stored row wise (one row with multiple group columns)

@alamb
Copy link
Contributor Author

alamb commented Aug 9, 2021

FWIW I would expect ScalarValue::iter_to_array to show up in profiles only for queries that had large numbers of groups where the time spent creating the output was a substantial part of the query time.

@Dandandan
Copy link
Contributor

@sundy-li

Yes, I think for some types the hashing method might be further specialized to speed up the hashing or to reduce the amount of memory needed for the hash value instead of all ways using u64. I think in our case that's currently a small cost / win compared to the other gains we might get, but still interesting to try once.

I think for small ranges / data types we can even avoid using a hash table and move to direct indexing instead. That might be interesting for u8 values or small dictionaries.

@Dandandan
Copy link
Contributor

FWIW I would expect ScalarValue::iter_to_array to show up in profiles only for queries that had large numbers of groups where the time spent creating the output was a substantial part of the query time.

Yes, I believe this is the case based on the profiling results.

@alamb
Copy link
Contributor Author

alamb commented Aug 9, 2021

@sundy-li the idea of special casing fixed length types into fixed length keys is a great idea, FWIW. I think we would probably get non trivial performance speedup for those primitive types.

I filed #846 to track this idea for DataFusion. Thank you

@alamb alamb self-assigned this Aug 9, 2021
@alamb
Copy link
Contributor Author

alamb commented Aug 9, 2021

#808 is now ready for review by a wider group (no pun intended)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants