Skip to content

Commit

Permalink
feat: Add xxhash64 function support
Browse files Browse the repository at this point in the history
  • Loading branch information
advancedxy committed May 14, 2024
1 parent 3808306 commit dd9738d
Show file tree
Hide file tree
Showing 6 changed files with 722 additions and 53 deletions.
2 changes: 2 additions & 0 deletions core/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ once_cell = "1.18.0"
regex = "1.9.6"
crc32fast = "1.3.2"
simd-adler32 = "0.3.7"
twox-hash = "1.6.3"

[build-dependencies]
prost-build = "0.9.0"
Expand Down
49 changes: 48 additions & 1 deletion core/src/execution/datafusion/expressions/scalar_funcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::{
sync::Arc,
};

use crate::execution::datafusion::spark_hash::create_hashes;
use crate::execution::datafusion::spark_hash::{create_hashes, create_xxhash64_hashes};
use arrow::{
array::{
ArrayRef, AsArray, Decimal128Builder, Float32Array, Float64Array, GenericStringArray,
Expand Down Expand Up @@ -119,6 +119,10 @@ pub fn create_comet_physical_fun(
let func = Arc::new(spark_murmur3_hash);
make_comet_scalar_udf!("murmur3_hash", func, without data_type)
}
"xxhash64" => {
let func = Arc::new(spark_xxhash64);
make_comet_scalar_udf!("xxhash64", func, without data_type)
}
sha if sha2_functions.contains(&sha) => {
// Spark requires hex string as the result of sha2 functions, we have to wrap the
// result of digest functions as hex string
Expand Down Expand Up @@ -672,6 +676,49 @@ fn spark_murmur3_hash(args: &[ColumnarValue]) -> Result<ColumnarValue, DataFusio
}
}

fn spark_xxhash64(args: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
let length = args.len();
let seed = &args[length - 1];
match seed {
ColumnarValue::Scalar(ScalarValue::Int64(Some(seed))) => {
// iterate over the arguments to find out the length of the array
let num_rows = args[0..args.len() - 1]
.iter()
.find_map(|arg| match arg {
ColumnarValue::Array(array) => Some(array.len()),
ColumnarValue::Scalar(_) => None,
})
.unwrap_or(1);
let mut hashes: Vec<u64> = vec![0_u64; num_rows];
hashes.fill(*seed as u64);
let arrays = args[0..args.len() - 1]
.iter()
.map(|arg| match arg {
ColumnarValue::Array(array) => array.clone(),
ColumnarValue::Scalar(scalar) => {
scalar.clone().to_array_of_size(num_rows).unwrap()
}
})
.collect::<Vec<ArrayRef>>();
create_xxhash64_hashes(&arrays, &mut hashes)?;
if num_rows == 1 {
Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(
hashes[0] as i64,
))))
} else {
let hashes: Vec<i64> = hashes.into_iter().map(|x| x as i64).collect();
Ok(ColumnarValue::Array(Arc::new(Int64Array::from(hashes))))
}
}
_ => {
internal_err!(
"The seed of function xxhash64 must be an Int64 scalar value, but got: {:?}.",
seed
)
}
}
}

#[inline]
fn hex_encode<T: AsRef<[u8]>>(data: T) -> String {
let mut s = String::with_capacity(data.as_ref().len() * 2);
Expand Down
Loading

0 comments on commit dd9738d

Please sign in to comment.