-
Notifications
You must be signed in to change notification settings - Fork 156
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Implement more efficient version of xxhash64 #575
Changes from all commits
e770048
47bb4c1
6fd6930
07f5b55
91c9aea
f64dc90
50539c4
d1b975e
5869553
edcde27
0ba9134
3c20411
e979ee9
45187bd
420a87b
3ffa90d
e57c8d6
96c2bcf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
Apache DataFusion Comet | ||
Copyright 2024 The Apache Software Foundation | ||
|
||
This product includes software developed at | ||
The Apache Software Foundation (http://www.apache.org/). | ||
|
||
This product includes software from the twox-hash project (MIT License) | ||
https://github.com/shepmaster/twox-hash |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,186 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
//! xxhash64 implementation | ||
|
||
const CHUNK_SIZE: usize = 32; | ||
|
||
const PRIME_1: u64 = 11_400_714_785_074_694_791; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just curious, is this the preferred way to write these or would hex be preferred (one is just as unreadable as the other). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤷♂️ I copied this directly from xxhash64. I think either style works in this case |
||
const PRIME_2: u64 = 14_029_467_366_897_019_727; | ||
const PRIME_3: u64 = 1_609_587_929_392_839_161; | ||
const PRIME_4: u64 = 9_650_029_242_287_828_579; | ||
const PRIME_5: u64 = 2_870_177_450_012_600_261; | ||
|
||
/// Custom implementation of xxhash64 based on code from https://github.com/shepmaster/twox-hash | ||
/// but optimized for our use case by removing any intermediate buffering, which is | ||
/// not required because we are operating on data that is already in memory. | ||
#[inline] | ||
pub(crate) fn spark_compatible_xxhash64<T: AsRef<[u8]>>(data: T, seed: u64) -> u64 { | ||
let data: &[u8] = data.as_ref(); | ||
let length_bytes = data.len(); | ||
|
||
let mut v1 = seed.wrapping_add(PRIME_1).wrapping_add(PRIME_2); | ||
let mut v2 = seed.wrapping_add(PRIME_2); | ||
let mut v3 = seed; | ||
let mut v4 = seed.wrapping_sub(PRIME_1); | ||
|
||
// process chunks of 32 bytes | ||
let mut offset_u64_4 = 0; | ||
let ptr_u64 = data.as_ptr() as *const u64; | ||
unsafe { | ||
while offset_u64_4 * CHUNK_SIZE + CHUNK_SIZE <= length_bytes { | ||
v1 = ingest_one_number(v1, ptr_u64.add(offset_u64_4 * 4).read_unaligned().to_le()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this produce the right result? The original implementation of XXHash64 processes 64 bits at a time. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is also processing 64 bits at a time. We are calling |
||
v2 = ingest_one_number( | ||
v2, | ||
ptr_u64.add(offset_u64_4 * 4 + 1).read_unaligned().to_le(), | ||
); | ||
v3 = ingest_one_number( | ||
v3, | ||
ptr_u64.add(offset_u64_4 * 4 + 2).read_unaligned().to_le(), | ||
); | ||
v4 = ingest_one_number( | ||
v4, | ||
ptr_u64.add(offset_u64_4 * 4 + 3).read_unaligned().to_le(), | ||
); | ||
offset_u64_4 += 1; | ||
} | ||
} | ||
|
||
let mut hash = if length_bytes >= CHUNK_SIZE { | ||
// We have processed at least one full chunk | ||
let mut hash = v1.rotate_left(1); | ||
hash = hash.wrapping_add(v2.rotate_left(7)); | ||
hash = hash.wrapping_add(v3.rotate_left(12)); | ||
hash = hash.wrapping_add(v4.rotate_left(18)); | ||
|
||
hash = mix_one(hash, v1); | ||
hash = mix_one(hash, v2); | ||
hash = mix_one(hash, v3); | ||
hash = mix_one(hash, v4); | ||
|
||
hash | ||
} else { | ||
seed.wrapping_add(PRIME_5) | ||
}; | ||
|
||
hash = hash.wrapping_add(length_bytes as u64); | ||
|
||
// process u64s | ||
let mut offset_u64 = offset_u64_4 * 4; | ||
while offset_u64 * 8 + 8 <= length_bytes { | ||
let mut k1 = unsafe { | ||
ptr_u64 | ||
.add(offset_u64) | ||
.read_unaligned() | ||
.to_le() | ||
.wrapping_mul(PRIME_2) | ||
}; | ||
k1 = k1.rotate_left(31); | ||
k1 = k1.wrapping_mul(PRIME_1); | ||
hash ^= k1; | ||
hash = hash.rotate_left(27); | ||
hash = hash.wrapping_mul(PRIME_1); | ||
hash = hash.wrapping_add(PRIME_4); | ||
offset_u64 += 1; | ||
} | ||
|
||
// process u32s | ||
let data = &data[offset_u64 * 8..]; | ||
let ptr_u32 = data.as_ptr() as *const u32; | ||
let length_bytes = length_bytes - offset_u64 * 8; | ||
let mut offset_u32 = 0; | ||
while offset_u32 * 4 + 4 <= length_bytes { | ||
let k1 = unsafe { | ||
u64::from(ptr_u32.add(offset_u32).read_unaligned().to_le()).wrapping_mul(PRIME_1) | ||
}; | ||
hash ^= k1; | ||
hash = hash.rotate_left(23); | ||
hash = hash.wrapping_mul(PRIME_2); | ||
hash = hash.wrapping_add(PRIME_3); | ||
offset_u32 += 1; | ||
} | ||
|
||
// process u8s | ||
let data = &data[offset_u32 * 4..]; | ||
let length_bytes = length_bytes - offset_u32 * 4; | ||
let mut offset_u8 = 0; | ||
while offset_u8 < length_bytes { | ||
let k1 = u64::from(data[offset_u8]).wrapping_mul(PRIME_5); | ||
hash ^= k1; | ||
hash = hash.rotate_left(11); | ||
hash = hash.wrapping_mul(PRIME_1); | ||
offset_u8 += 1; | ||
} | ||
|
||
// The final intermixing | ||
hash ^= hash >> 33; | ||
hash = hash.wrapping_mul(PRIME_2); | ||
hash ^= hash >> 29; | ||
hash = hash.wrapping_mul(PRIME_3); | ||
hash ^= hash >> 32; | ||
|
||
hash | ||
} | ||
|
||
#[inline(always)] | ||
fn ingest_one_number(mut current_value: u64, mut value: u64) -> u64 { | ||
value = value.wrapping_mul(PRIME_2); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, nvm. I was reading this wrong. |
||
current_value = current_value.wrapping_add(value); | ||
current_value = current_value.rotate_left(31); | ||
current_value.wrapping_mul(PRIME_1) | ||
} | ||
|
||
#[inline(always)] | ||
fn mix_one(mut hash: u64, mut value: u64) -> u64 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know this is based on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the logic looks the same? XXH64_mergeRound calles XXH64_Round which does this:
Then XXH64_mergeRound does this:
|
||
value = value.wrapping_mul(PRIME_2); | ||
value = value.rotate_left(31); | ||
value = value.wrapping_mul(PRIME_1); | ||
hash ^= value; | ||
hash = hash.wrapping_mul(PRIME_1); | ||
hash.wrapping_add(PRIME_4) | ||
} | ||
|
||
#[cfg(test)] | ||
mod test { | ||
use super::spark_compatible_xxhash64; | ||
use rand::Rng; | ||
use std::hash::Hasher; | ||
use twox_hash::XxHash64; | ||
|
||
#[test] | ||
fn test_xxhash64_random() { | ||
let mut rng = rand::thread_rng(); | ||
for len in 0..128 { | ||
for _ in 0..10 { | ||
let data: Vec<u8> = (0..len).map(|_| rng.gen()).collect(); | ||
let seed = rng.gen(); | ||
check_xxhash64(&data, seed); | ||
} | ||
} | ||
} | ||
|
||
fn check_xxhash64(data: &[u8], seed: u64) { | ||
let mut hasher = XxHash64::with_seed(seed); | ||
hasher.write(data.as_ref()); | ||
let hash1 = hasher.finish(); | ||
let hash2 = spark_compatible_xxhash64(data, seed); | ||
if hash1 != hash2 { | ||
panic!("input: {} with seed {seed} produced incorrect hash (comet={hash2}, twox-hash={hash1})", | ||
data.iter().map(|byte| format!("{:02x}", byte)).collect::<String>()) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like arrow-rs mentions the file names. Should we mention
core/src/execution/datafusion/expressions/xxhash64.rs
here?