diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index ecc434a5af5c..0c0b3663b9d8 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -39,7 +39,7 @@ path = "src/lib.rs" [features] default = ["crypto_expressions", "regex_expressions", "unicode_expressions"] simd = ["arrow/simd"] -crypto_expressions = ["md-5", "sha2", "blake2"] +crypto_expressions = ["md-5", "sha2", "blake2", "blake3"] regex_expressions = ["regex", "lazy_static"] unicode_expressions = ["unicode-segmentation"] # Used for testing ONLY: causes all values to hash to the same value (test for collisions) @@ -65,6 +65,7 @@ log = "^0.4" md-5 = { version = "^0.9.1", optional = true } sha2 = { version = "^0.9.1", optional = true } blake2 = { version = "^0.9.2", optional = true } +blake3 = { version = "1.0", optional = true } ordered-float = "2.0" unicode-segmentation = { version = "^1.7.1", optional = true } regex = { version = "^1.4.3", optional = true } diff --git a/datafusion/src/physical_plan/crypto_expressions.rs b/datafusion/src/physical_plan/crypto_expressions.rs index 8c575bcd0264..25d63b81ce1b 100644 --- a/datafusion/src/physical_plan/crypto_expressions.rs +++ b/datafusion/src/physical_plan/crypto_expressions.rs @@ -29,6 +29,7 @@ use arrow::{ datatypes::DataType, }; use blake2::{Blake2b, Blake2s, Digest}; +use blake3::Hasher as Blake3; use md5::Md5; use sha2::{Sha224, Sha256, Sha384, Sha512}; use std::any::type_name; @@ -51,6 +52,7 @@ enum DigestAlgorithm { Sha512, Blake2s, Blake2b, + Blake3, } fn digest_process( @@ -117,6 +119,11 @@ impl DigestAlgorithm { Self::Sha512 => digest_to_scalar!(Sha512, value), Self::Blake2b => digest_to_scalar!(Blake2b, value), Self::Blake2s => digest_to_scalar!(Blake2s, value), + Self::Blake3 => ScalarValue::Binary(value.as_ref().map(|v| { + let mut digest = Blake3::default(); + digest.update(v.as_bytes()); + digest.finalize().as_bytes().to_vec() + })), }) } @@ -142,6 +149,19 @@ impl DigestAlgorithm { Self::Sha512 => digest_to_array!(Sha512, input_value), Self::Blake2b => digest_to_array!(Blake2b, input_value), Self::Blake2s => digest_to_array!(Blake2s, input_value), + Self::Blake3 => { + let binary_array: BinaryArray = input_value + .iter() + .map(|opt| { + opt.map(|x| { + let mut digest = Blake3::default(); + digest.update(x.as_bytes()); + digest.finalize().as_bytes().to_vec() + }) + }) + .collect(); + Arc::new(binary_array) + } }; Ok(ColumnarValue::Array(array)) } @@ -164,11 +184,27 @@ impl FromStr for DigestAlgorithm { "sha512" => Self::Sha512, "blake2b" => Self::Blake2b, "blake2s" => Self::Blake2s, + "blake3" => Self::Blake3, _ => { + let options = [ + Self::Md5, + Self::Sha224, + Self::Sha256, + Self::Sha384, + Self::Sha512, + Self::Blake2s, + Self::Blake2b, + Self::Blake3, + ] + .iter() + .map(|i| i.to_string()) + .collect::>() + .join(", "); return Err(DataFusionError::Plan(format!( - "There is no built-in digest algorithm named {}", - name - ))) + "There is no built-in digest algorithm named '{}', currently supported algorithms are: {}", + name, + options, + ))); } }) } @@ -271,6 +307,11 @@ define_digest_function!( Blake2s, "computes blake2s hash digest of the given input" ); +define_digest_function!( + blake3, + Blake3, + "computes blake3 hash digest of the given input" +); /// Digest computes a binary hash of the given data, accepts Utf8 or LargeUtf8 and returns a [`ColumnarValue`]. /// Second argument is the algorithm to use. diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index e82254203af0..6c85f35d6855 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -4079,6 +4079,10 @@ async fn test_crypto_expressions() -> Result<()> { "digest('tom','blake2s')", "5fc3f2b3a07cade5023c3df566e4d697d3823ba1b72bfb3e84cf7e768b2e7529" ); + test_expression!( + "digest('','blake3')", + "af1349b9f5f9a1a6a0404dea36dcc9499bcb25c9adc112b7cc9a93cae41f3262" + ); Ok(()) }