diff --git a/Cargo.lock b/Cargo.lock index d7ebc2ccd01bf..7593ea78731c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -110,6 +110,12 @@ dependencies = [ "nodrop", ] +[[package]] +name = "arrayvec" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" + [[package]] name = "arrayvec" version = "0.7.2" @@ -491,6 +497,16 @@ dependencies = [ "crunchy", ] +[[package]] +name = "bitvec" +version = "0.17.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41262f11d771fd4a61aa3ce019fca363b4b6c282fca9da2a31186d3965a47a5c" +dependencies = [ + "either", + "radium 0.3.0", +] + [[package]] name = "bitvec" version = "0.22.3" @@ -498,7 +514,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5237f00a8c86130a0cc317830e558b966dd7850d48a953d998c813f01a41b527" dependencies = [ "funty", - "radium", + "radium 0.6.2", "tap", "wyz", ] @@ -574,6 +590,12 @@ version = "3.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4a45a46ab1f2412e53d3a0ade76ffad2025804294569aae387231a0cd6e0899" +[[package]] +name = "byte-slice-cast" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0a5e3906bcbf133e33c1d4d95afc664ad37fbdb9f6568d8043e7ea8c27d93d3" + [[package]] name = "byte-unit" version = "4.0.14" @@ -1028,6 +1050,7 @@ dependencies = [ "common-exception", "common-io", "pretty_assertions", + "primitive-types", "regex", "serde", "serde_json", @@ -1055,6 +1078,7 @@ dependencies = [ "ordered-float 2.10.0", "paste", "pretty_assertions", + "primitive-types", "rand 0.8.5", "serde", "serde_json", @@ -2007,6 +2031,7 @@ dependencies = [ "petgraph", "poem", "pretty_assertions", + "primitive-types", "prost 0.10.1", "rand 0.8.5", "regex", @@ -2386,6 +2411,18 @@ dependencies = [ "winapi", ] +[[package]] +name = "fixed-hash" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3367952ceb191f4ab95dd5685dc163ac539e36202f9fcfd0cb22f9f9c542fefc" +dependencies = [ + "byteorder", + "rand 0.7.3", + "rustc-hex", + "static_assertions", +] + [[package]] name = "fixedbitset" version = "0.4.1" @@ -3144,6 +3181,15 @@ dependencies = [ "version_check", ] +[[package]] +name = "impl-codec" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1be51a921b067b0eaca2fad532d9400041561aa922221cc65f95a85641c6bf53" +dependencies = [ + "parity-scale-codec", +] + [[package]] name = "indexmap" version = "1.8.2" @@ -3941,7 +3987,7 @@ dependencies = [ "bigdecimal", "bindgen", "bitflags", - "bitvec", + "bitvec 0.22.3", "byteorder", "bytes 1.1.0", "cc", @@ -4509,6 +4555,18 @@ dependencies = [ "sha2 0.9.9", ] +[[package]] +name = "parity-scale-codec" +version = "1.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4b26b16c7687c3075982af47719e481815df30bc544f7a6690763a25ca16e9d" +dependencies = [ + "arrayvec 0.5.2", + "bitvec 0.17.4", + "byte-slice-cast", + "serde", +] + [[package]] name = "parking" version = "2.0.0" @@ -4961,6 +5019,17 @@ dependencies = [ "syn", ] +[[package]] +name = "primitive-types" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4336f4f5d5524fa60bcbd6fe626f9223d8142a50e7053e979acdf0da41ab975" +dependencies = [ + "fixed-hash", + "impl-codec", + "uint", +] + [[package]] name = "proc-macro-crate" version = "1.1.3" @@ -5245,6 +5314,12 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "radium" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "def50a86306165861203e7f84ecffbbdfdea79f0e51039b33de1e952358c47ac" + [[package]] name = "radium" version = "0.6.2" @@ -5643,6 +5718,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" +[[package]] +name = "rustc-hex" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e75f6a532d0fd9f7f13144f392b6ad56a32696bfcd9c78f797f16bbb6f072d6" + [[package]] name = "rustc_version" version = "0.4.0" @@ -7052,6 +7133,18 @@ dependencies = [ "syn", ] +[[package]] +name = "uint" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9db035e67dfaf7edd9aebfe8676afcd63eed53c8a4044fed514c8cccf1835177" +dependencies = [ + "byteorder", + "crunchy", + "rustc-hex", + "static_assertions", +] + [[package]] name = "uncased" version = "0.9.7" diff --git a/common/datablocks/Cargo.toml b/common/datablocks/Cargo.toml index d933f7c5729c9..9d5407c29ff90 100644 --- a/common/datablocks/Cargo.toml +++ b/common/datablocks/Cargo.toml @@ -23,6 +23,7 @@ common-io = { path = "../io" } # Crates.io dependencies ahash = "0.7.6" comfy-table = "5.0.1" +primitive-types = "0.6.1" regex = "1.5.5" serde = { version = "1.0.136", features = ["derive"] } serde_json = "1.0.79" diff --git a/common/datablocks/src/kernels/data_block_group_by.rs b/common/datablocks/src/kernels/data_block_group_by.rs index a1fa28e2f98a3..ae5f8ace1b766 100644 --- a/common/datablocks/src/kernels/data_block_group_by.rs +++ b/common/datablocks/src/kernels/data_block_group_by.rs @@ -26,6 +26,9 @@ use crate::kernels::HashMethodKind; use crate::kernels::HashMethodSerializer; use crate::DataBlock; use crate::HashMethod; +use crate::HashMethodKeysU128; +use crate::HashMethodKeysU256; +use crate::HashMethodKeysU512; use crate::HashMethodSingleString; impl DataBlock { @@ -80,7 +83,9 @@ impl DataBlock { 2 => Ok(HashMethodKind::KeysU16(HashMethodKeysU16::default())), 3..=4 => Ok(HashMethodKind::KeysU32(HashMethodKeysU32::default())), 5..=8 => Ok(HashMethodKind::KeysU64(HashMethodKeysU64::default())), - // TODO support u128, u256 + 9..=16 => Ok(HashMethodKind::KeysU128(HashMethodKeysU128::default())), + 17..=32 => Ok(HashMethodKind::KeysU256(HashMethodKeysU256::default())), + 33..=64 => Ok(HashMethodKind::KeysU512(HashMethodKeysU512::default())), _ => Ok(HashMethodKind::Serializer(HashMethodSerializer::default())), } } @@ -138,6 +143,31 @@ impl DataBlock { .collect(); blocks } + + HashMethodKind::KeysU128(s) => { + let blocks = s + .group_by(block, column_names)? + .iter() + .map(|(_, _, b)| b.clone()) + .collect(); + blocks + } + HashMethodKind::KeysU256(s) => { + let blocks = s + .group_by(block, column_names)? + .iter() + .map(|(_, _, b)| b.clone()) + .collect(); + blocks + } + HashMethodKind::KeysU512(s) => { + let blocks = s + .group_by(block, column_names)? + .iter() + .map(|(_, _, b)| b.clone()) + .collect(); + blocks + } }) } } diff --git a/common/datablocks/src/kernels/data_block_group_by_hash.rs b/common/datablocks/src/kernels/data_block_group_by_hash.rs index 13da6b15984df..1e412786c7e1e 100644 --- a/common/datablocks/src/kernels/data_block_group_by_hash.rs +++ b/common/datablocks/src/kernels/data_block_group_by_hash.rs @@ -21,6 +21,8 @@ use std::ops::Not; use common_datavalues::prelude::*; use common_exception::Result; use common_io::prelude::FormatSettings; +use primitive_types::U256; +use primitive_types::U512; use crate::DataBlock; @@ -126,6 +128,9 @@ pub type HashMethodKeysU8 = HashMethodFixedKeys; pub type HashMethodKeysU16 = HashMethodFixedKeys; pub type HashMethodKeysU32 = HashMethodFixedKeys; pub type HashMethodKeysU64 = HashMethodFixedKeys; +pub type HashMethodKeysU128 = HashMethodFixedKeys; +pub type HashMethodKeysU256 = HashMethodFixedKeys; +pub type HashMethodKeysU512 = HashMethodFixedKeys; /// These methods are `generic` method to generate hash key, /// that is the 'numeric' or 'binary` representation of each column value as hash key. @@ -136,6 +141,9 @@ pub enum HashMethodKind { KeysU16(HashMethodKeysU16), KeysU32(HashMethodKeysU32), KeysU64(HashMethodKeysU64), + KeysU128(HashMethodKeysU128), + KeysU256(HashMethodKeysU256), + KeysU512(HashMethodKeysU512), } impl HashMethodKind { @@ -147,6 +155,9 @@ impl HashMethodKind { HashMethodKind::KeysU16(v) => v.name(), HashMethodKind::KeysU32(v) => v.name(), HashMethodKind::KeysU64(v) => v.name(), + HashMethodKind::KeysU128(v) => v.name(), + HashMethodKind::KeysU256(v) => v.name(), + HashMethodKind::KeysU512(v) => v.name(), } } pub fn data_type(&self) -> DataTypeImpl { @@ -157,6 +168,9 @@ impl HashMethodKind { HashMethodKind::KeysU16(_) => u16::to_data_type(), HashMethodKind::KeysU32(_) => u32::to_data_type(), HashMethodKind::KeysU64(_) => u64::to_data_type(), + HashMethodKind::KeysU128(_) => Vu8::to_data_type(), + HashMethodKind::KeysU256(_) => Vu8::to_data_type(), + HashMethodKind::KeysU512(_) => Vu8::to_data_type(), } } } @@ -274,14 +288,16 @@ pub struct HashMethodFixedKeys { impl HashMethodFixedKeys where T: PrimitiveType { - pub fn default() -> Self { - HashMethodFixedKeys { t: PhantomData } - } - #[inline] pub fn get_key(&self, column: &PrimitiveColumn, row: usize) -> T { unsafe { column.value_unchecked(row) } } +} + +impl HashMethodFixedKeys { + pub fn default() -> Self { + HashMethodFixedKeys { t: PhantomData } + } pub fn deserialize_group_columns( &self, @@ -375,9 +391,7 @@ where T: PrimitiveType } impl HashMethod for HashMethodFixedKeys -where - T: PrimitiveType, - T: std::cmp::Eq + Hash + Clone + Debug, +where T: std::cmp::Eq + Hash + Clone + Debug + Default + 'static { type HashKey<'a> = T; diff --git a/common/datavalues/Cargo.toml b/common/datavalues/Cargo.toml index d11750e0ecce5..fb40fe8fc9205 100644 --- a/common/datavalues/Cargo.toml +++ b/common/datavalues/Cargo.toml @@ -17,6 +17,7 @@ common-base = { path = "../base" } common-exception = { path = "../exception" } common-io = { path = "../io" } common-macros = { path = "../macros" } +primitive-types = "0.6.1" # Github dependencies opensrv-clickhouse = { git = "https://github.com/datafuselabs/opensrv", rev = "15786d3", package = "opensrv-clickhouse" } diff --git a/common/datavalues/src/types/type_traits.rs b/common/datavalues/src/types/type_traits.rs index f76bf16ce467e..738b936e27a1c 100644 --- a/common/datavalues/src/types/type_traits.rs +++ b/common/datavalues/src/types/type_traits.rs @@ -13,7 +13,11 @@ // limitations under the License. use common_arrow::arrow::compute::arithmetics::basic::NativeArithmetics; +use common_exception::ErrorCode; +use common_exception::Result; use num::NumCast; +use primitive_types::U256; +use primitive_types::U512; use serde::de::DeserializeOwned; use serde::Serialize; @@ -138,3 +142,49 @@ impl ObjectType for VariantValue { VariantType::new_impl() } } + +pub trait LargePrimitive: Default + Sized + 'static { + const BYTE_SIZE: usize; + fn serialize_to(&self, _bytes: &mut [u8]); + fn from_bytes(v: &[u8]) -> Result; +} + +impl LargePrimitive for u128 { + const BYTE_SIZE: usize = 16; + fn serialize_to(&self, bytes: &mut [u8]) { + let bs = self.to_le_bytes(); + bytes.copy_from_slice(&bs); + } + + fn from_bytes(v: &[u8]) -> Result { + let bs: [u8; 16] = v.try_into().map_err(|_| { + ErrorCode::StrParseError(format!( + "Unable to parse into u128, unexpected byte size: {}", + v.len() + )) + })?; + Ok(u128::from_le_bytes(bs)) + } +} + +impl LargePrimitive for U256 { + const BYTE_SIZE: usize = 32; + fn serialize_to(&self, bytes: &mut [u8]) { + self.to_little_endian(bytes); + } + + fn from_bytes(v: &[u8]) -> Result { + Ok(U256::from_little_endian(v)) + } +} + +impl LargePrimitive for U512 { + const BYTE_SIZE: usize = 64; + fn serialize_to(&self, bytes: &mut [u8]) { + self.to_little_endian(bytes); + } + + fn from_bytes(v: &[u8]) -> Result { + Ok(U512::from_little_endian(v)) + } +} diff --git a/query/Cargo.toml b/query/Cargo.toml index 41fc11a67eed1..c615e0c5d3b7c 100644 --- a/query/Cargo.toml +++ b/query/Cargo.toml @@ -94,6 +94,7 @@ openssl = { version = "0.10", features = ["vendored"] } paste = "1.0.7" petgraph = "0.6.0" poem = { version = "=1.3.16", features = ["rustls", "multipart", "compression"] } +primitive-types = "0.6.2" prost = "=0.10.1" rand = "0.8.5" regex = "1.5.5" diff --git a/query/src/common/hashtable/hash_table_key.rs b/query/src/common/hashtable/hash_table_key.rs index 73bd4ec947afc..dbf534d0e0f90 100644 --- a/query/src/common/hashtable/hash_table_key.rs +++ b/query/src/common/hashtable/hash_table_key.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use primitive_types::U256; +use primitive_types::U512; + pub trait HashTableKeyable: Eq + Sized { const BEFORE_EQ_HASH: bool; @@ -57,3 +60,62 @@ primitive_hasher_impl!(u8); primitive_hasher_impl!(u16); primitive_hasher_impl!(u32); primitive_hasher_impl!(u64); + +impl HashTableKeyable for u128 { + const BEFORE_EQ_HASH: bool = false; + #[inline(always)] + fn is_zero(&self) -> bool { + *self == 0u128 + } + + #[inline(always)] + fn fast_hash(&self) -> u64 { + let mut hash_value = *self; + hash_value ^= hash_value >> 33; + hash_value = hash_value.wrapping_mul(0xff51afd7ed558ccd_u128); + hash_value ^= hash_value >> 33; + hash_value = hash_value.wrapping_mul(0xc4ceb9fe1a85ec53_u128); + hash_value ^= hash_value >> 33; + + hash_value as u64 + } + #[inline(always)] + fn set_key(&mut self, new_value: &u128) { + *self = *new_value; + } +} + +impl HashTableKeyable for U256 { + const BEFORE_EQ_HASH: bool = false; + #[inline(always)] + fn is_zero(&self) -> bool { + self.is_zero() + } + #[inline(always)] + fn fast_hash(&self) -> u64 { + self.low_u128().fast_hash() ^ (*self >> 128).low_u128().fast_hash() + } + #[inline(always)] + fn set_key(&mut self, new_value: &U256) { + *self = *new_value; + } +} + +impl HashTableKeyable for U512 { + const BEFORE_EQ_HASH: bool = false; + #[inline(always)] + fn is_zero(&self) -> bool { + self.is_zero() + } + #[inline(always)] + fn fast_hash(&self) -> u64 { + self.low_u128().fast_hash() + ^ (*self >> 128).low_u128().fast_hash() + ^ (*self >> 256).low_u128().fast_hash() + ^ (*self >> 384).low_u128().fast_hash() + } + #[inline(always)] + fn set_key(&mut self, new_value: &U512) { + *self = *new_value; + } +} diff --git a/query/src/pipelines/new/processors/transforms/aggregator/aggregator_final.rs b/query/src/pipelines/new/processors/transforms/aggregator/aggregator_final.rs index 16b9d9dc98a7b..28fa3ebba8f03 100644 --- a/query/src/pipelines/new/processors/transforms/aggregator/aggregator_final.rs +++ b/query/src/pipelines/new/processors/transforms/aggregator/aggregator_final.rs @@ -17,8 +17,11 @@ use std::sync::Arc; use common_datablocks::DataBlock; use common_datablocks::HashMethod; +use common_datablocks::HashMethodKeysU128; use common_datablocks::HashMethodKeysU16; +use common_datablocks::HashMethodKeysU256; use common_datablocks::HashMethodKeysU32; +use common_datablocks::HashMethodKeysU512; use common_datablocks::HashMethodKeysU64; use common_datablocks::HashMethodKeysU8; use common_datablocks::HashMethodSerializer; @@ -45,6 +48,13 @@ pub type KeysU8FinalAggregator = FinalAggregator = FinalAggregator; pub type KeysU32FinalAggregator = FinalAggregator; pub type KeysU64FinalAggregator = FinalAggregator; +pub type KeysU128FinalAggregator = + FinalAggregator; +pub type KeysU256FinalAggregator = + FinalAggregator; +pub type KeysU512FinalAggregator = + FinalAggregator; + pub type SingleStringFinalAggregator = FinalAggregator; diff --git a/query/src/pipelines/new/processors/transforms/aggregator/aggregator_partial.rs b/query/src/pipelines/new/processors/transforms/aggregator/aggregator_partial.rs index 397e5741c5d37..7eb4cec949ad2 100644 --- a/query/src/pipelines/new/processors/transforms/aggregator/aggregator_partial.rs +++ b/query/src/pipelines/new/processors/transforms/aggregator/aggregator_partial.rs @@ -17,8 +17,11 @@ use std::sync::Arc; use bytes::BytesMut; use common_datablocks::DataBlock; use common_datablocks::HashMethod; +use common_datablocks::HashMethodKeysU128; use common_datablocks::HashMethodKeysU16; +use common_datablocks::HashMethodKeysU256; use common_datablocks::HashMethodKeysU32; +use common_datablocks::HashMethodKeysU512; use common_datablocks::HashMethodKeysU64; use common_datablocks::HashMethodKeysU8; use common_datablocks::HashMethodSerializer; @@ -47,6 +50,15 @@ pub type KeysU32PartialAggregator = PartialAggregator; pub type KeysU64PartialAggregator = PartialAggregator; +pub type KeysU128PartialAggregator = + PartialAggregator; + +pub type KeysU256PartialAggregator = + PartialAggregator; + +pub type KeysU512PartialAggregator = + PartialAggregator; + pub type SerializerPartialAggregator = PartialAggregator; pub type SingleStringPartialAggregator = diff --git a/query/src/pipelines/new/processors/transforms/aggregator/mod.rs b/query/src/pipelines/new/processors/transforms/aggregator/mod.rs index 402299b479975..57e328bb00ad4 100644 --- a/query/src/pipelines/new/processors/transforms/aggregator/mod.rs +++ b/query/src/pipelines/new/processors/transforms/aggregator/mod.rs @@ -18,16 +18,22 @@ mod aggregator_partial; mod aggregator_single_key; pub use aggregator_final::FinalAggregator; +pub use aggregator_final::KeysU128FinalAggregator; pub use aggregator_final::KeysU16FinalAggregator; +pub use aggregator_final::KeysU256FinalAggregator; pub use aggregator_final::KeysU32FinalAggregator; +pub use aggregator_final::KeysU512FinalAggregator; pub use aggregator_final::KeysU64FinalAggregator; pub use aggregator_final::KeysU8FinalAggregator; pub use aggregator_final::SerializerFinalAggregator; pub use aggregator_final::SingleStringFinalAggregator; pub use aggregator_params::AggregatorParams; pub use aggregator_params::AggregatorTransformParams; +pub use aggregator_partial::KeysU128PartialAggregator; pub use aggregator_partial::KeysU16PartialAggregator; +pub use aggregator_partial::KeysU256PartialAggregator; pub use aggregator_partial::KeysU32PartialAggregator; +pub use aggregator_partial::KeysU512PartialAggregator; pub use aggregator_partial::KeysU64PartialAggregator; pub use aggregator_partial::KeysU8PartialAggregator; pub use aggregator_partial::PartialAggregator; diff --git a/query/src/pipelines/new/processors/transforms/transform_aggregator.rs b/query/src/pipelines/new/processors/transforms/transform_aggregator.rs index 797edef8f9a9d..a326a9cc6d130 100644 --- a/query/src/pipelines/new/processors/transforms/transform_aggregator.rs +++ b/query/src/pipelines/new/processors/transforms/transform_aggregator.rs @@ -79,6 +79,21 @@ impl TransformAggregator { transform_params.transform_output_port, SerializerFinalAggregator::::create(ctx, method, aggregator_params)?, ), + HashMethodKind::KeysU128(method) => AggregatorTransform::create( + transform_params.transform_input_port, + transform_params.transform_output_port, + KeysU128FinalAggregator::::create(ctx, method, aggregator_params)?, + ), + HashMethodKind::KeysU256(method) => AggregatorTransform::create( + transform_params.transform_input_port, + transform_params.transform_output_port, + KeysU256FinalAggregator::::create(ctx, method, aggregator_params)?, + ), + HashMethodKind::KeysU512(method) => AggregatorTransform::create( + transform_params.transform_input_port, + transform_params.transform_output_port, + KeysU512FinalAggregator::::create(ctx, method, aggregator_params)?, + ), }, false => match transform_params.method { HashMethodKind::KeysU8(method) => AggregatorTransform::create( @@ -111,6 +126,21 @@ impl TransformAggregator { transform_params.transform_output_port, SerializerFinalAggregator::::create(ctx, method, aggregator_params)?, ), + HashMethodKind::KeysU128(method) => AggregatorTransform::create( + transform_params.transform_input_port, + transform_params.transform_output_port, + KeysU128FinalAggregator::::create(ctx, method, aggregator_params)?, + ), + HashMethodKind::KeysU256(method) => AggregatorTransform::create( + transform_params.transform_input_port, + transform_params.transform_output_port, + KeysU256FinalAggregator::::create(ctx, method, aggregator_params)?, + ), + HashMethodKind::KeysU512(method) => AggregatorTransform::create( + transform_params.transform_input_port, + transform_params.transform_output_port, + KeysU512FinalAggregator::::create(ctx, method, aggregator_params)?, + ), }, } } @@ -153,6 +183,21 @@ impl TransformAggregator { transform_params.transform_output_port, KeysU64PartialAggregator::::create(ctx, method, aggregator_params), ), + HashMethodKind::KeysU128(method) => AggregatorTransform::create( + transform_params.transform_input_port, + transform_params.transform_output_port, + KeysU128PartialAggregator::::create(ctx, method, aggregator_params), + ), + HashMethodKind::KeysU256(method) => AggregatorTransform::create( + transform_params.transform_input_port, + transform_params.transform_output_port, + KeysU256PartialAggregator::::create(ctx, method, aggregator_params), + ), + HashMethodKind::KeysU512(method) => AggregatorTransform::create( + transform_params.transform_input_port, + transform_params.transform_output_port, + KeysU512PartialAggregator::::create(ctx, method, aggregator_params), + ), HashMethodKind::SingleString(method) => AggregatorTransform::create( transform_params.transform_input_port, transform_params.transform_output_port, @@ -185,6 +230,21 @@ impl TransformAggregator { transform_params.transform_output_port, KeysU64PartialAggregator::::create(ctx, method, aggregator_params), ), + HashMethodKind::KeysU128(method) => AggregatorTransform::create( + transform_params.transform_input_port, + transform_params.transform_output_port, + KeysU128PartialAggregator::::create(ctx, method, aggregator_params), + ), + HashMethodKind::KeysU256(method) => AggregatorTransform::create( + transform_params.transform_input_port, + transform_params.transform_output_port, + KeysU256PartialAggregator::::create(ctx, method, aggregator_params), + ), + HashMethodKind::KeysU512(method) => AggregatorTransform::create( + transform_params.transform_input_port, + transform_params.transform_output_port, + KeysU512PartialAggregator::::create(ctx, method, aggregator_params), + ), HashMethodKind::SingleString(method) => AggregatorTransform::create( transform_params.transform_input_port, transform_params.transform_output_port, diff --git a/query/src/pipelines/transforms/group_by/aggregator_groups_builder.rs b/query/src/pipelines/transforms/group_by/aggregator_groups_builder.rs index 66c50e84cf177..9262ba3ee8a84 100644 --- a/query/src/pipelines/transforms/group_by/aggregator_groups_builder.rs +++ b/query/src/pipelines/transforms/group_by/aggregator_groups_builder.rs @@ -19,7 +19,6 @@ use common_datavalues::DataField; use common_datavalues::DataType; use common_datavalues::MutableColumn; use common_datavalues::MutableStringColumn; -use common_datavalues::PrimitiveType; use common_datavalues::ScalarColumnBuilder; use common_datavalues::TypeDeserializer; use common_exception::Result; @@ -33,17 +32,13 @@ pub trait GroupColumnsBuilder { fn finish(self) -> Result>; } -pub struct FixedKeysGroupColumnsBuilder -where T: PrimitiveType -{ +pub struct FixedKeysGroupColumnsBuilder { data: Vec, groups_fields: Vec, } impl FixedKeysGroupColumnsBuilder -where - T: PrimitiveType, - for<'a> HashMethodFixedKeys: HashMethod = T>, +where for<'a> HashMethodFixedKeys: HashMethod = T> { pub fn create(capacity: usize, params: &AggregatorParams) -> Self { Self { @@ -53,10 +48,8 @@ where } } -impl GroupColumnsBuilder for FixedKeysGroupColumnsBuilder -where - T: PrimitiveType, - for<'a> HashMethodFixedKeys: HashMethod = T>, +impl GroupColumnsBuilder for FixedKeysGroupColumnsBuilder +where for<'a> HashMethodFixedKeys: HashMethod = T> { #[inline] fn append_value(&mut self, v: &T) { diff --git a/query/src/pipelines/transforms/group_by/aggregator_keys_builder.rs b/query/src/pipelines/transforms/group_by/aggregator_keys_builder.rs index efe3f5ca45fb5..741830856b3eb 100644 --- a/query/src/pipelines/transforms/group_by/aggregator_keys_builder.rs +++ b/query/src/pipelines/transforms/group_by/aggregator_keys_builder.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::marker::PhantomData; + use common_datablocks::HashMethod; use common_datablocks::HashMethodFixedKeys; use common_datavalues::prelude::*; @@ -62,3 +64,30 @@ impl KeysColumnBuilder for SerializedKeysColumnBuilder { } } } + +pub struct LargeFixedKeysColumnBuilder +where T: LargePrimitive +{ + pub inner_builder: MutableStringColumn, + pub _t: PhantomData, +} + +impl KeysColumnBuilder for LargeFixedKeysColumnBuilder +where + T: LargePrimitive, + for<'a> HashMethodFixedKeys: HashMethod = T>, +{ + #[inline] + fn finish(mut self) -> ColumnRef { + self.inner_builder.to_column() + } + + #[inline] + fn append_value(&mut self, v: &T) { + let values = self.inner_builder.values_mut(); + let new_len = values.len() + T::BYTE_SIZE; + values.resize(new_len, 0); + v.serialize_to(&mut values[new_len - T::BYTE_SIZE..]); + self.inner_builder.offsets_mut().push(new_len as i64); + } +} diff --git a/query/src/pipelines/transforms/group_by/aggregator_keys_iter.rs b/query/src/pipelines/transforms/group_by/aggregator_keys_iter.rs index b3d293628fb0a..dcf92886a32d0 100644 --- a/query/src/pipelines/transforms/group_by/aggregator_keys_iter.rs +++ b/query/src/pipelines/transforms/group_by/aggregator_keys_iter.rs @@ -12,8 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_datavalues::Column; +use common_datavalues::LargePrimitive; use common_datavalues::PrimitiveColumn; use common_datavalues::PrimitiveType; +use common_datavalues::ScalarColumn; use common_datavalues::StringColumn; use common_exception::Result; @@ -47,6 +50,33 @@ where T: PrimitiveType } } +pub struct LargeFixedKeysColumnIter +where T: LargePrimitive +{ + pub inner: Vec, +} + +impl LargeFixedKeysColumnIter +where T: LargePrimitive +{ + pub fn create(inner: &StringColumn) -> Result { + let mut result = Vec::with_capacity(inner.len()); + for bs in inner.scalar_iter() { + result.push(T::from_bytes(bs)?); + } + + Ok(Self { inner: result }) + } +} + +impl KeysColumnIter for LargeFixedKeysColumnIter +where T: LargePrimitive +{ + fn get_slice(&self) -> &[T] { + self.inner.as_slice() + } +} + pub struct SerializedKeysColumnIter { inner: Vec, #[allow(unused)] diff --git a/query/src/pipelines/transforms/group_by/aggregator_polymorphic_keys.rs b/query/src/pipelines/transforms/group_by/aggregator_polymorphic_keys.rs index aa62aec57fc94..093ea5ebea3dd 100644 --- a/query/src/pipelines/transforms/group_by/aggregator_polymorphic_keys.rs +++ b/query/src/pipelines/transforms/group_by/aggregator_polymorphic_keys.rs @@ -12,18 +12,24 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::marker::PhantomData; + use bumpalo::Bump; use common_datablocks::HashMethod; -use common_datablocks::HashMethodKeysU16; -use common_datablocks::HashMethodKeysU32; -use common_datablocks::HashMethodKeysU64; -use common_datablocks::HashMethodKeysU8; +use common_datablocks::HashMethodFixedKeys; +use common_datablocks::HashMethodKeysU128; +use common_datablocks::HashMethodKeysU256; +use common_datablocks::HashMethodKeysU512; use common_datablocks::HashMethodSerializer; use common_datablocks::HashMethodSingleString; use common_datavalues::prelude::*; use common_exception::Result; +use primitive_types::U256; +use primitive_types::U512; use super::aggregator_groups_builder::SingleStringGroupColumnsBuilder; +use super::aggregator_keys_builder::LargeFixedKeysColumnBuilder; +use super::aggregator_keys_iter::LargeFixedKeysColumnIter; use crate::common::HashMapKind; use crate::pipelines::new::processors::AggregatorParams; use crate::pipelines::transforms::group_by::aggregator_groups_builder::FixedKeysGroupColumnsBuilder; @@ -94,24 +100,21 @@ pub trait PolymorphicKeysHelper { ) -> Self::GroupColumnsBuilder; } -impl PolymorphicKeysHelper for HashMethodKeysU8 { +impl PolymorphicKeysHelper> for HashMethodFixedKeys { type State = ShortFixedKeysAggregatorState; fn aggregate_state(&self) -> Self::State { Self::State::create((u8::MAX as usize) + 1) } - type ColumnBuilder = FixedKeysColumnBuilder; fn keys_column_builder(&self, capacity: usize) -> Self::ColumnBuilder { FixedKeysColumnBuilder:: { inner_builder: MutablePrimitiveColumn::::with_capacity(capacity), } } - type KeysColumnIter = FixedKeysColumnIter; fn keys_iter_from_column(&self, column: &ColumnRef) -> Result { - FixedKeysColumnIter::create(Series::check_get::(column)?) + FixedKeysColumnIter::create(Series::check_get::>(column)?) } - type GroupColumnsBuilder = FixedKeysGroupColumnsBuilder; fn group_columns_builder( &self, @@ -122,24 +125,21 @@ impl PolymorphicKeysHelper for HashMethodKeysU8 { } } -impl PolymorphicKeysHelper for HashMethodKeysU16 { +impl PolymorphicKeysHelper> for HashMethodFixedKeys { type State = ShortFixedKeysAggregatorState; fn aggregate_state(&self) -> Self::State { Self::State::create((u16::MAX as usize) + 1) } - type ColumnBuilder = FixedKeysColumnBuilder; fn keys_column_builder(&self, capacity: usize) -> Self::ColumnBuilder { FixedKeysColumnBuilder:: { inner_builder: MutablePrimitiveColumn::::with_capacity(capacity), } } - type KeysColumnIter = FixedKeysColumnIter; fn keys_iter_from_column(&self, column: &ColumnRef) -> Result { - FixedKeysColumnIter::create(Series::check_get::(column)?) + FixedKeysColumnIter::create(Series::check_get::>(column)?) } - type GroupColumnsBuilder = FixedKeysGroupColumnsBuilder; fn group_columns_builder( &self, @@ -150,28 +150,21 @@ impl PolymorphicKeysHelper for HashMethodKeysU16 { } } -impl PolymorphicKeysHelper for HashMethodKeysU32 { +impl PolymorphicKeysHelper> for HashMethodFixedKeys { type State = LongerFixedKeysAggregatorState; fn aggregate_state(&self) -> Self::State { - LongerFixedKeysAggregatorState:: { - area: Bump::new(), - data: HashMapKind::create_hash_table(), - two_level_flag: false, - } + Self::State::default() } - type ColumnBuilder = FixedKeysColumnBuilder; fn keys_column_builder(&self, capacity: usize) -> Self::ColumnBuilder { FixedKeysColumnBuilder:: { inner_builder: MutablePrimitiveColumn::::with_capacity(capacity), } } - type KeysColumnIter = FixedKeysColumnIter; fn keys_iter_from_column(&self, column: &ColumnRef) -> Result { - FixedKeysColumnIter::create(Series::check_get::(column)?) + FixedKeysColumnIter::create(Series::check_get::>(column)?) } - type GroupColumnsBuilder = FixedKeysGroupColumnsBuilder; fn group_columns_builder( &self, @@ -182,28 +175,21 @@ impl PolymorphicKeysHelper for HashMethodKeysU32 { } } -impl PolymorphicKeysHelper for HashMethodKeysU64 { +impl PolymorphicKeysHelper> for HashMethodFixedKeys { type State = LongerFixedKeysAggregatorState; fn aggregate_state(&self) -> Self::State { - LongerFixedKeysAggregatorState:: { - area: Bump::new(), - data: HashMapKind::create_hash_table(), - two_level_flag: false, - } + Self::State::default() } - type ColumnBuilder = FixedKeysColumnBuilder; fn keys_column_builder(&self, capacity: usize) -> Self::ColumnBuilder { FixedKeysColumnBuilder:: { inner_builder: MutablePrimitiveColumn::::with_capacity(capacity), } } - type KeysColumnIter = FixedKeysColumnIter; fn keys_iter_from_column(&self, column: &ColumnRef) -> Result { - FixedKeysColumnIter::create(Series::check_get::(column)?) + FixedKeysColumnIter::create(Series::check_get::>(column)?) } - type GroupColumnsBuilder = FixedKeysGroupColumnsBuilder; fn group_columns_builder( &self, @@ -214,6 +200,93 @@ impl PolymorphicKeysHelper for HashMethodKeysU64 { } } +impl PolymorphicKeysHelper for HashMethodKeysU128 { + type State = LongerFixedKeysAggregatorState; + fn aggregate_state(&self) -> Self::State { + Self::State::default() + } + + type ColumnBuilder = LargeFixedKeysColumnBuilder; + fn keys_column_builder(&self, capacity: usize) -> Self::ColumnBuilder { + LargeFixedKeysColumnBuilder { + inner_builder: MutableStringColumn::with_capacity(capacity * 16), + _t: PhantomData, + } + } + + type KeysColumnIter = LargeFixedKeysColumnIter; + fn keys_iter_from_column(&self, column: &ColumnRef) -> Result { + LargeFixedKeysColumnIter::create(Series::check_get::(column)?) + } + + type GroupColumnsBuilder = FixedKeysGroupColumnsBuilder; + fn group_columns_builder( + &self, + capacity: usize, + params: &AggregatorParams, + ) -> Self::GroupColumnsBuilder { + FixedKeysGroupColumnsBuilder::create(capacity, params) + } +} + +impl PolymorphicKeysHelper for HashMethodKeysU256 { + type State = LongerFixedKeysAggregatorState; + fn aggregate_state(&self) -> Self::State { + Self::State::default() + } + + type ColumnBuilder = LargeFixedKeysColumnBuilder; + fn keys_column_builder(&self, capacity: usize) -> Self::ColumnBuilder { + LargeFixedKeysColumnBuilder { + inner_builder: MutableStringColumn::with_capacity(capacity * 32), + _t: PhantomData, + } + } + + type KeysColumnIter = LargeFixedKeysColumnIter; + fn keys_iter_from_column(&self, column: &ColumnRef) -> Result { + LargeFixedKeysColumnIter::create(Series::check_get::(column)?) + } + + type GroupColumnsBuilder = FixedKeysGroupColumnsBuilder; + fn group_columns_builder( + &self, + capacity: usize, + params: &AggregatorParams, + ) -> Self::GroupColumnsBuilder { + FixedKeysGroupColumnsBuilder::create(capacity, params) + } +} + +impl PolymorphicKeysHelper for HashMethodKeysU512 { + type State = LongerFixedKeysAggregatorState; + fn aggregate_state(&self) -> Self::State { + Self::State::default() + } + + type ColumnBuilder = LargeFixedKeysColumnBuilder; + fn keys_column_builder(&self, capacity: usize) -> Self::ColumnBuilder { + LargeFixedKeysColumnBuilder { + inner_builder: MutableStringColumn::with_capacity(capacity * 64), + _t: PhantomData, + } + } + + type KeysColumnIter = LargeFixedKeysColumnIter; + fn keys_iter_from_column(&self, column: &ColumnRef) -> Result { + LargeFixedKeysColumnIter::create(Series::check_get::(column)?) + } + + type GroupColumnsBuilder = FixedKeysGroupColumnsBuilder; + fn group_columns_builder( + &self, + capacity: usize, + params: &AggregatorParams, + ) -> Self::GroupColumnsBuilder { + FixedKeysGroupColumnsBuilder::create(capacity, params) + } +} + impl PolymorphicKeysHelper for HashMethodSingleString { type State = SerializedKeysAggregatorState; fn aggregate_state(&self) -> Self::State { diff --git a/query/src/pipelines/transforms/group_by/aggregator_state.rs b/query/src/pipelines/transforms/group_by/aggregator_state.rs index a748236139967..70bf97aeaca07 100644 --- a/query/src/pipelines/transforms/group_by/aggregator_state.rs +++ b/query/src/pipelines/transforms/group_by/aggregator_state.rs @@ -205,6 +205,16 @@ pub struct LongerFixedKeysAggregatorState { pub two_level_flag: bool, } +impl Default for LongerFixedKeysAggregatorState { + fn default() -> Self { + Self { + area: Bump::new(), + data: HashMapKind::create_hash_table(), + two_level_flag: false, + } + } +} + // TODO:(Winter) Hack: // The *mut KeyValueEntity needs to be used externally, but we can ensure that *mut KeyValueEntity // will not be used multiple async, so KeyValueEntity is Send @@ -216,9 +226,9 @@ unsafe impl Send for LongerFixedKeysAggregatorState< // will not be used multiple async, so KeyValueEntity is Sync unsafe impl Sync for LongerFixedKeysAggregatorState {} -impl AggregatorState> for LongerFixedKeysAggregatorState +impl AggregatorState> + for LongerFixedKeysAggregatorState where - T: PrimitiveType, for<'a> HashMethodFixedKeys: HashMethod = T>, for<'a> as HashMethod>::HashKey<'a>: HashTableKeyable, { diff --git a/query/src/pipelines/transforms/transform_group_by_final.rs b/query/src/pipelines/transforms/transform_group_by_final.rs index fb4bef5d9b511..2ea80e959d54e 100644 --- a/query/src/pipelines/transforms/transform_group_by_final.rs +++ b/query/src/pipelines/transforms/transform_group_by_final.rs @@ -22,6 +22,7 @@ use bumpalo::Bump; use common_base::infallible::RwLock; use common_datablocks::DataBlock; use common_datablocks::HashMethodKind; +use common_datablocks::HashMethodSerializer; use common_datavalues::prelude::MutableColumn; use common_datavalues::prelude::*; use common_exception::ErrorCode; @@ -266,6 +267,10 @@ impl Processor for GroupByFinalTransform { HashMethodKind::KeysU64(hash_method) => { apply! { hash_method , &UInt64Column, RwLock> } } + _ => { + let method = HashMethodSerializer::default(); + apply! { method , &StringColumn, RwLock, usize, ahash::RandomState>> } + } } }}; } diff --git a/query/src/pipelines/transforms/transform_group_by_partial.rs b/query/src/pipelines/transforms/transform_group_by_partial.rs index a00fd62e157a9..c314787aea30d 100644 --- a/query/src/pipelines/transforms/transform_group_by_partial.rs +++ b/query/src/pipelines/transforms/transform_group_by_partial.rs @@ -19,6 +19,7 @@ use std::time::Instant; use common_datablocks::DataBlock; use common_datablocks::HashMethod; use common_datablocks::HashMethodKind; +use common_datablocks::HashMethodSerializer; use common_datavalues::prelude::*; use common_exception::Result; use common_planners::Expression; @@ -143,6 +144,10 @@ impl Processor for GroupByPartialTransform { HashMethodKind::KeysU64(method) => self.aggregate(method, group_cols).await, HashMethodKind::SingleString(method) => self.aggregate(method, group_cols).await, HashMethodKind::Serializer(method) => self.aggregate(method, group_cols).await, + _ => { + let method = HashMethodSerializer::default(); + self.aggregate(method, group_cols).await + } } } } diff --git a/tests/suites/0_stateless/03_dml/03_0001_select_aggregator.result b/tests/suites/0_stateless/03_dml/03_0001_select_aggregator.result index 0bac9e9034531..2703ffaa96730 100644 --- a/tests/suites/0_stateless/03_dml/03_0001_select_aggregator.result +++ b/tests/suites/0_stateless/03_dml/03_0001_select_aggregator.result @@ -3,3 +3,4 @@ 1000 3000 0 NULL NULL NULL +342 396 450 100 diff --git a/tests/suites/0_stateless/03_dml/03_0001_select_aggregator.sql b/tests/suites/0_stateless/03_dml/03_0001_select_aggregator.sql index 384472f8e89fb..5aea8e4d48d1c 100644 --- a/tests/suites/0_stateless/03_dml/03_0001_select_aggregator.sql +++ b/tests/suites/0_stateless/03_dml/03_0001_select_aggregator.sql @@ -3,3 +3,4 @@ select count(*) = count(1) from numbers(1000); select count(1) from numbers(1000); select sum(3) from numbers(1000); select count(null), min(null), sum(null), avg(null) from numbers(1000); +select sum(a), sum(b), sum(c), sum(e) from ( select (number % 8)::UInt64 as a,(number % 9)::UInt64 as b,(number % 10)::UInt64 as c, count() as e from numbers(100) group by a ,b,c);