diff --git a/src/query/service/src/pipelines/pipeline_builder.rs b/src/query/service/src/pipelines/pipeline_builder.rs index 2629bff864d7..ced210183185 100644 --- a/src/query/service/src/pipelines/pipeline_builder.rs +++ b/src/query/service/src/pipelines/pipeline_builder.rs @@ -56,6 +56,7 @@ use common_sql::executor::UnionAll; use common_sql::plans::JoinType; use common_sql::ColumnBinding; use common_sql::IndexType; +use common_storage::DataOperator; use super::processors::ProfileWrapper; use crate::api::ExchangeSorting; @@ -66,7 +67,9 @@ use crate::pipelines::processors::transforms::HashJoinDesc; use crate::pipelines::processors::transforms::PartialSingleStateAggregator; use crate::pipelines::processors::transforms::RightSemiAntiJoinCompactor; use crate::pipelines::processors::transforms::TransformAggregateSerializer; +use crate::pipelines::processors::transforms::TransformAggregateSpillWriter; use crate::pipelines::processors::transforms::TransformGroupBySerializer; +use crate::pipelines::processors::transforms::TransformGroupBySpillWriter; use crate::pipelines::processors::transforms::TransformLeftJoin; use crate::pipelines::processors::transforms::TransformMarkJoin; use crate::pipelines::processors::transforms::TransformMergeBlock; @@ -431,6 +434,44 @@ impl PipelineBuilder { } })?; + if self.ctx.get_cluster().is_empty() { + let operator = DataOperator::instance().operator(); + let location_prefix = format!("_aggregate_spill/{}", self.ctx.get_tenant()); + self.main_pipeline.add_transform(|input, output| { + let transform = match params.aggregate_functions.is_empty() { + true => with_mappedhash_method!(|T| match method.clone() { + HashMethodKind::T(method) => TransformGroupBySpillWriter::create( + input, + output, + method, + operator.clone(), + location_prefix.clone() + ), + }), + false => with_mappedhash_method!(|T| match method.clone() { + HashMethodKind::T(method) => TransformAggregateSpillWriter::create( + input, + output, + method, + operator.clone(), + params.clone(), + location_prefix.clone() + ), + }), + }; + + if self.enable_profiling { + Ok(ProcessorPtr::create(ProfileWrapper::create( + transform, + aggregate.plan_id, + self.prof_span_set.clone(), + ))) + } else { + Ok(ProcessorPtr::create(transform)) + } + })?; + } + if !self.ctx.get_cluster().is_empty() { // TODO: can serialize only when needed. self.main_pipeline.add_transform(|input, output| { diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_cell.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_cell.rs index aae332578678..889780d55eea 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_cell.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_cell.rs @@ -65,6 +65,20 @@ impl HashTableCell { arena: Area::create(), } } + + pub fn len(&self) -> usize { + self.hashtable.len() + } + + pub fn allocated_bytes(&self) -> usize { + self.hashtable.bytes_len() + + self.arena.allocated_bytes() + + self + .arena_holders + .iter() + .map(ArenaHolder::allocated_bytes) + .sum::() + } } pub trait HashTableDropper { diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_sorting.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_sorting.rs index 84e44cdfc244..12ef42738f8e 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_sorting.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_sorting.rs @@ -21,6 +21,7 @@ use common_expression::DataBlock; use crate::api::ExchangeSorting; use crate::pipelines::processors::transforms::aggregator::serde::AggregateSerdeMeta; +use crate::pipelines::processors::transforms::aggregator::serde::BUCKET_TYPE; pub struct AggregateExchangeSorting {} @@ -38,7 +39,10 @@ impl ExchangeSorting for AggregateExchangeSorting { None => Err(ErrorCode::Internal( "Internal error, AggregateExchangeSorting only recv AggregateSerdeMeta", )), - Some(meta_info) => Ok(meta_info.bucket), + Some(meta_info) => match meta_info.typ == BUCKET_TYPE { + true => Ok(meta_info.bucket), + false => Ok(-1), + }, }, } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs index 1075b9525284..e4c3d24b0555 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs @@ -43,9 +43,17 @@ impl SerializedPayload { } } +pub struct SpilledPayload { + pub bucket: isize, + pub location: String, + pub columns_layout: Vec, +} + pub enum AggregateMeta { Serialized(SerializedPayload), HashTable(HashTablePayload), + Spilling(HashTablePayload), + Spilled(SpilledPayload), Partitioned { bucket: isize, data: Vec }, } @@ -66,6 +74,26 @@ impl AggregateMeta) -> BlockMetaInfoPtr { + Box::new(AggregateMeta::::Spilling(HashTablePayload { + cell, + bucket, + arena_holder: ArenaHolder::create(None), + })) + } + + pub fn create_spilled( + bucket: isize, + location: String, + columns_layout: Vec, + ) -> BlockMetaInfoPtr { + Box::new(AggregateMeta::::Spilled(SpilledPayload { + bucket, + location, + columns_layout, + })) + } + pub fn create_partitioned(bucket: isize, data: Vec) -> BlockMetaInfoPtr { Box::new(AggregateMeta::::Partitioned { data, bucket }) } @@ -99,6 +127,8 @@ impl Debug for AggregateMeta AggregateMeta::Serialized { .. } => { f.debug_struct("AggregateMeta::Serialized").finish() } + AggregateMeta::Spilling(_) => f.debug_struct("Aggregate::Spilling").finish(), + AggregateMeta::Spilled(_) => f.debug_struct("Aggregate::Spilled").finish(), } } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/mod.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/mod.rs index 91698e2089c6..ebddedbd9779 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/mod.rs @@ -41,5 +41,9 @@ pub use utils::*; pub use self::serde::TransformAggregateDeserializer; pub use self::serde::TransformAggregateSerializer; +pub use self::serde::TransformAggregateSpillReader; +pub use self::serde::TransformAggregateSpillWriter; pub use self::serde::TransformGroupByDeserializer; pub use self::serde::TransformGroupBySerializer; +pub use self::serde::TransformGroupBySpillReader; +pub use self::serde::TransformGroupBySpillWriter; diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/mod.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/mod.rs index a11fd3bdb2d2..a801362f6fd9 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/mod.rs @@ -13,11 +13,21 @@ // limitations under the License. mod serde_meta; +mod transform_aggregate_serializer; +mod transform_aggregate_spill_writer; mod transform_deserializer; -mod transform_serializer; +mod transform_group_by_serializer; +mod transform_group_by_spill_writer; +mod transform_spill_reader; pub use serde_meta::AggregateSerdeMeta; +pub use serde_meta::BUCKET_TYPE; +pub use serde_meta::SPILLED_TYPE; +pub use transform_aggregate_serializer::TransformAggregateSerializer; +pub use transform_aggregate_spill_writer::TransformAggregateSpillWriter; pub use transform_deserializer::TransformAggregateDeserializer; pub use transform_deserializer::TransformGroupByDeserializer; -pub use transform_serializer::TransformAggregateSerializer; -pub use transform_serializer::TransformGroupBySerializer; +pub use transform_group_by_serializer::TransformGroupBySerializer; +pub use transform_group_by_spill_writer::TransformGroupBySpillWriter; +pub use transform_spill_reader::TransformAggregateSpillReader; +pub use transform_spill_reader::TransformGroupBySpillReader; diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/serde_meta.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/serde_meta.rs index 18a5516cf342..4a3da49056ff 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/serde_meta.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/serde_meta.rs @@ -18,14 +18,39 @@ use common_expression::BlockMetaInfo; use common_expression::BlockMetaInfoDowncast; use common_expression::BlockMetaInfoPtr; +pub const BUCKET_TYPE: usize = 1; +pub const SPILLED_TYPE: usize = 2; + +// Cannot change to enum, because bincode cannot deserialize custom enum #[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)] pub struct AggregateSerdeMeta { + pub typ: usize, pub bucket: isize, + pub location: Option, + pub columns_layout: Vec, } impl AggregateSerdeMeta { pub fn create(bucket: isize) -> BlockMetaInfoPtr { - Box::new(AggregateSerdeMeta { bucket }) + Box::new(AggregateSerdeMeta { + typ: BUCKET_TYPE, + bucket, + location: None, + columns_layout: vec![], + }) + } + + pub fn create_spilled( + bucket: isize, + location: String, + columns_layout: Vec, + ) -> BlockMetaInfoPtr { + Box::new(AggregateSerdeMeta { + typ: SPILLED_TYPE, + bucket, + columns_layout, + location: Some(location), + }) } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs new file mode 100644 index 000000000000..cebf286b6655 --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs @@ -0,0 +1,123 @@ +// Copyright 2023 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_exception::Result; +use common_expression::types::string::StringColumnBuilder; +use common_expression::Column; +use common_expression::DataBlock; +use common_functions::aggregates::StateAddr; +use common_hashtable::HashtableEntryRefLike; +use common_hashtable::HashtableLike; +use common_pipeline_core::processors::port::InputPort; +use common_pipeline_core::processors::port::OutputPort; +use common_pipeline_core::processors::processor::ProcessorPtr; +use common_pipeline_transforms::processors::transforms::BlockMetaTransform; +use common_pipeline_transforms::processors::transforms::BlockMetaTransformer; + +use crate::pipelines::processors::transforms::aggregator::aggregate_meta::AggregateMeta; +use crate::pipelines::processors::transforms::aggregator::aggregate_meta::HashTablePayload; +use crate::pipelines::processors::transforms::aggregator::estimated_key_size; +use crate::pipelines::processors::transforms::aggregator::serde::serde_meta::AggregateSerdeMeta; +use crate::pipelines::processors::transforms::group_by::HashMethodBounds; +use crate::pipelines::processors::transforms::group_by::KeysColumnBuilder; +use crate::pipelines::processors::AggregatorParams; + +pub struct TransformAggregateSerializer { + method: Method, + params: Arc, +} + +impl TransformAggregateSerializer { + pub fn try_create( + input: Arc, + output: Arc, + method: Method, + params: Arc, + ) -> Result { + Ok(ProcessorPtr::create(BlockMetaTransformer::create( + input, + output, + TransformAggregateSerializer { method, params }, + ))) + } +} + +impl BlockMetaTransform> + for TransformAggregateSerializer +where Method: HashMethodBounds +{ + const NAME: &'static str = "TransformAggregateSerializer"; + + fn transform(&mut self, meta: AggregateMeta) -> Result { + match meta { + AggregateMeta::Spilling(_) => unreachable!(), + AggregateMeta::Partitioned { .. } => unreachable!(), + AggregateMeta::Serialized(_) => unreachable!(), + AggregateMeta::Spilled(payload) => Ok(DataBlock::empty_with_meta( + AggregateSerdeMeta::create_spilled( + payload.bucket, + payload.location, + payload.columns_layout, + ), + )), + AggregateMeta::HashTable(payload) => { + let bucket = payload.bucket; + let data_block = serialize_aggregate(&self.method, &self.params, payload)?; + data_block.add_meta(Some(AggregateSerdeMeta::create(bucket))) + } + } + } +} + +pub fn serialize_aggregate( + method: &Method, + params: &Arc, + payload: HashTablePayload, +) -> Result { + let keys_len = payload.cell.hashtable.len(); + let value_size = estimated_key_size(&payload.cell.hashtable); + + let funcs = ¶ms.aggregate_functions; + let offsets_aggregate_states = ¶ms.offsets_aggregate_states; + + // Builders. + let mut state_builders = (0..funcs.len()) + .map(|_| StringColumnBuilder::with_capacity(keys_len, keys_len * 4)) + .collect::>(); + + let mut group_key_builder = method.keys_column_builder(keys_len, value_size); + + for group_entity in payload.cell.hashtable.iter() { + let place = Into::::into(*group_entity.get()); + + for (idx, func) in funcs.iter().enumerate() { + let arg_place = place.next(offsets_aggregate_states[idx]); + func.serialize(arg_place, &mut state_builders[idx].data)?; + state_builders[idx].commit_row(); + } + + group_key_builder.append_value(group_entity.key()); + } + + let mut columns = Vec::with_capacity(state_builders.len() + 1); + + for builder in state_builders.into_iter() { + columns.push(Column::String(builder.build())); + } + + columns.push(group_key_builder.finish()); + Ok(DataBlock::new_from_columns(columns)) +} diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_spill_writer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_spill_writer.rs new file mode 100644 index 000000000000..98178679de8c --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_spill_writer.rs @@ -0,0 +1,201 @@ +// Copyright 2023 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::sync::Arc; +use std::time::Instant; + +use common_base::base::GlobalUniqName; +use common_exception::ErrorCode; +use common_exception::Result; +use common_expression::arrow::serialize_column; +use common_expression::BlockEntry; +use common_expression::BlockMetaInfoDowncast; +use common_expression::BlockMetaInfoPtr; +use common_expression::DataBlock; +use common_pipeline_core::processors::port::InputPort; +use common_pipeline_core::processors::port::OutputPort; +use common_pipeline_core::processors::processor::Event; +use common_pipeline_core::processors::Processor; +use opendal::Operator; +use tracing::info; + +use crate::pipelines::processors::transforms::aggregator::aggregate_meta::AggregateMeta; +use crate::pipelines::processors::transforms::aggregator::serde::transform_aggregate_serializer::serialize_aggregate; +use crate::pipelines::processors::transforms::group_by::HashMethodBounds; +use crate::pipelines::processors::AggregatorParams; + +pub struct TransformAggregateSpillWriter { + method: Method, + input: Arc, + output: Arc, + params: Arc, + + operator: Operator, + location_prefix: String, + spilled_meta: Option, + spilling_meta: Option>, + writing_data_block: Option<(isize, usize, Vec>)>, +} + +impl TransformAggregateSpillWriter { + pub fn create( + input: Arc, + output: Arc, + method: Method, + operator: Operator, + params: Arc, + location_prefix: String, + ) -> Box { + Box::new(TransformAggregateSpillWriter:: { + method, + input, + output, + params, + operator, + location_prefix, + spilled_meta: None, + spilling_meta: None, + writing_data_block: None, + }) + } +} + +#[async_trait::async_trait] +impl Processor for TransformAggregateSpillWriter { + fn name(&self) -> String { + String::from("TransformAggregateSpillWriter") + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn event(&mut self) -> Result { + if self.output.is_finished() { + self.input.finish(); + return Ok(Event::Finished); + } + + if !self.output.can_push() { + self.input.set_not_need_data(); + return Ok(Event::NeedConsume); + } + + if let Some(spilled_meta) = self.spilled_meta.take() { + self.output + .push_data(Ok(DataBlock::empty_with_meta(spilled_meta))); + return Ok(Event::NeedConsume); + } + + if self.writing_data_block.is_some() { + self.input.set_not_need_data(); + return Ok(Event::Async); + } + + if self.spilling_meta.is_some() { + self.input.set_not_need_data(); + return Ok(Event::Sync); + } + + if self.input.has_data() { + let mut data_block = self.input.pull_data().unwrap()?; + + if let Some(block_meta) = data_block + .get_meta() + .and_then(AggregateMeta::::downcast_ref_from) + { + if matches!(block_meta, AggregateMeta::Spilling(_)) { + self.input.set_not_need_data(); + let block_meta = data_block.take_meta().unwrap(); + self.spilling_meta = AggregateMeta::::downcast_from(block_meta); + return Ok(Event::Sync); + } + } + + self.output.push_data(Ok(data_block)); + return Ok(Event::NeedConsume); + } + + if self.input.is_finished() { + self.output.finish(); + return Ok(Event::Finished); + } + + self.input.set_need_data(); + Ok(Event::NeedData) + } + + fn process(&mut self) -> Result<()> { + if let Some(spilling_meta) = self.spilling_meta.take() { + if let AggregateMeta::Spilling(payload) = spilling_meta { + let bucket = payload.bucket; + let data_block = serialize_aggregate(&self.method, &self.params, payload)?; + let columns = get_columns(data_block); + + let mut total_size = 0; + let mut columns_data = Vec::with_capacity(columns.len()); + for column in columns.into_iter() { + let column = column.value.as_column().unwrap(); + let column_data = serialize_column(column); + total_size += column_data.len(); + columns_data.push(column_data); + } + + self.writing_data_block = Some((bucket, total_size, columns_data)); + return Ok(()); + } + + return Err(ErrorCode::Internal("")); + } + + Ok(()) + } + + async fn async_process(&mut self) -> Result<()> { + if let Some((bucket, total_size, data)) = self.writing_data_block.take() { + let instant = Instant::now(); + let unique_name = GlobalUniqName::unique(); + let location = format!("{}/{}", self.location_prefix, unique_name); + let object = self.operator.object(&location); + + // temp code: waiting https://github.com/datafuselabs/opendal/pull/1431 + let mut write_data = Vec::with_capacity(total_size); + let mut columns_layout = Vec::with_capacity(data.len()); + + for data in data.into_iter() { + columns_layout.push(data.len()); + write_data.extend(data); + } + + object.write(write_data).await?; + info!( + "Write aggregate spill {} successfully, elapsed: {:?}", + &location, + instant.elapsed() + ); + self.spilled_meta = Some(AggregateMeta::::create_spilled( + bucket, + location, + columns_layout, + )); + } + + Ok(()) + } +} + +fn get_columns(data_block: DataBlock) -> Vec { + data_block.columns().to_vec() +} diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_deserializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_deserializer.rs index cc125f2611a6..933b6e5381c7 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_deserializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_deserializer.rs @@ -27,6 +27,7 @@ use common_pipeline_core::processors::Processor; use crate::pipelines::processors::transforms::aggregator::aggregate_meta::AggregateMeta; use crate::pipelines::processors::transforms::aggregator::serde::serde_meta::AggregateSerdeMeta; +use crate::pipelines::processors::transforms::aggregator::serde::BUCKET_TYPE; use crate::pipelines::processors::transforms::group_by::HashMethodBounds; pub struct TransformDeserializer { @@ -75,14 +76,22 @@ where if self.input.has_data() { let mut data_block = self.input.pull_data().unwrap()?; - let block_meta = data_block + let meta = data_block .take_meta() .and_then(AggregateSerdeMeta::downcast_from) .unwrap(); self.output.push_data(Ok(DataBlock::empty_with_meta( - AggregateMeta::::create_serialized(block_meta.bucket, data_block), + match meta.typ == BUCKET_TYPE { + true => AggregateMeta::::create_serialized(meta.bucket, data_block), + false => AggregateMeta::::create_spilled( + meta.bucket, + meta.location.unwrap(), + meta.columns_layout, + ), + }, ))); + return Ok(Event::NeedConsume); } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs new file mode 100644 index 000000000000..b9664dd66ccd --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs @@ -0,0 +1,95 @@ +// Copyright 2023 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_expression::DataBlock; +use common_hashtable::HashtableEntryRefLike; +use common_hashtable::HashtableLike; +use common_pipeline_core::processors::port::InputPort; +use common_pipeline_core::processors::port::OutputPort; +use common_pipeline_core::processors::processor::ProcessorPtr; +use common_pipeline_transforms::processors::transforms::BlockMetaTransform; +use common_pipeline_transforms::processors::transforms::BlockMetaTransformer; + +use crate::pipelines::processors::transforms::aggregator::aggregate_meta::AggregateMeta; +use crate::pipelines::processors::transforms::aggregator::aggregate_meta::HashTablePayload; +use crate::pipelines::processors::transforms::aggregator::estimated_key_size; +use crate::pipelines::processors::transforms::aggregator::serde::AggregateSerdeMeta; +use crate::pipelines::processors::transforms::group_by::HashMethodBounds; +use crate::pipelines::processors::transforms::group_by::KeysColumnBuilder; + +pub struct TransformGroupBySerializer { + method: Method, +} + +impl TransformGroupBySerializer { + pub fn try_create( + input: Arc, + output: Arc, + method: Method, + ) -> common_exception::Result { + Ok(ProcessorPtr::create(BlockMetaTransformer::create( + input, + output, + TransformGroupBySerializer { method }, + ))) + } +} + +impl BlockMetaTransform> for TransformGroupBySerializer +where Method: HashMethodBounds +{ + const NAME: &'static str = "TransformGroupBySerializer"; + + fn transform( + &mut self, + meta: AggregateMeta, + ) -> common_exception::Result { + match meta { + AggregateMeta::Spilling(_) => unreachable!(), + AggregateMeta::Partitioned { .. } => unreachable!(), + AggregateMeta::Serialized(_) => unreachable!(), + AggregateMeta::Spilled(payload) => Ok(DataBlock::empty_with_meta( + AggregateSerdeMeta::create_spilled( + payload.bucket, + payload.location, + payload.columns_layout, + ), + )), + AggregateMeta::HashTable(payload) => { + let bucket = payload.bucket; + let data_block = serialize_group_by(&self.method, payload)?; + data_block.add_meta(Some(AggregateSerdeMeta::create(bucket))) + } + } + } +} + +pub fn serialize_group_by( + method: &Method, + payload: HashTablePayload, +) -> common_exception::Result { + let keys_len = payload.cell.hashtable.len(); + let value_size = estimated_key_size(&payload.cell.hashtable); + let mut group_key_builder = method.keys_column_builder(keys_len, value_size); + + for group_entity in payload.cell.hashtable.iter() { + group_key_builder.append_value(group_entity.key()); + } + + Ok(DataBlock::new_from_columns(vec![ + group_key_builder.finish(), + ])) +} diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_spill_writer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_spill_writer.rs new file mode 100644 index 000000000000..32d09a0e2e68 --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_spill_writer.rs @@ -0,0 +1,198 @@ +// Copyright 2023 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::sync::Arc; +use std::time::Instant; + +use common_base::base::GlobalUniqName; +use common_exception::ErrorCode; +use common_exception::Result; +use common_expression::arrow::serialize_column; +use common_expression::BlockEntry; +use common_expression::BlockMetaInfoDowncast; +use common_expression::BlockMetaInfoPtr; +use common_expression::DataBlock; +use common_pipeline_core::processors::port::InputPort; +use common_pipeline_core::processors::port::OutputPort; +use common_pipeline_core::processors::processor::Event; +use common_pipeline_core::processors::Processor; +use opendal::Operator; +use tracing::info; + +use crate::pipelines::processors::transforms::aggregator::aggregate_meta::AggregateMeta; +use crate::pipelines::processors::transforms::aggregator::serde::transform_group_by_serializer::serialize_group_by; +use crate::pipelines::processors::transforms::group_by::HashMethodBounds; + +pub struct TransformGroupBySpillWriter { + method: Method, + input: Arc, + output: Arc, + + operator: Operator, + location_prefix: String, + spilled_meta: Option, + spilling_meta: Option>, + writing_data_block: Option<(isize, usize, Vec>)>, +} + +impl TransformGroupBySpillWriter { + pub fn create( + input: Arc, + output: Arc, + method: Method, + operator: Operator, + location_prefix: String, + ) -> Box { + Box::new(TransformGroupBySpillWriter:: { + method, + input, + output, + operator, + location_prefix, + spilled_meta: None, + spilling_meta: None, + writing_data_block: None, + }) + } +} + +#[async_trait::async_trait] +impl Processor for TransformGroupBySpillWriter { + fn name(&self) -> String { + String::from("TransformGroupBySpillWriter") + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn event(&mut self) -> Result { + if self.output.is_finished() { + self.input.finish(); + return Ok(Event::Finished); + } + + if !self.output.can_push() { + self.input.set_not_need_data(); + return Ok(Event::NeedConsume); + } + + if let Some(spilled_meta) = self.spilled_meta.take() { + self.output + .push_data(Ok(DataBlock::empty_with_meta(spilled_meta))); + return Ok(Event::NeedConsume); + } + + if self.writing_data_block.is_some() { + self.input.set_not_need_data(); + return Ok(Event::Async); + } + + if self.spilling_meta.is_some() { + self.input.set_not_need_data(); + return Ok(Event::Sync); + } + + if self.input.has_data() { + let mut data_block = self.input.pull_data().unwrap()?; + + if let Some(block_meta) = data_block + .get_meta() + .and_then(AggregateMeta::::downcast_ref_from) + { + if matches!(block_meta, AggregateMeta::Spilling(_)) { + self.input.set_not_need_data(); + let block_meta = data_block.take_meta().unwrap(); + self.spilling_meta = AggregateMeta::::downcast_from(block_meta); + return Ok(Event::Sync); + } + } + + self.output.push_data(Ok(data_block)); + return Ok(Event::NeedConsume); + } + + if self.input.is_finished() { + self.output.finish(); + return Ok(Event::Finished); + } + + self.input.set_need_data(); + Ok(Event::NeedData) + } + + fn process(&mut self) -> Result<()> { + if let Some(spilling_meta) = self.spilling_meta.take() { + if let AggregateMeta::Spilling(payload) = spilling_meta { + let bucket = payload.bucket; + let data_block = serialize_group_by(&self.method, payload)?; + let columns = get_columns(data_block); + + let mut total_size = 0; + let mut columns_data = Vec::with_capacity(columns.len()); + for column in columns.into_iter() { + let column = column.value.as_column().unwrap(); + let column_data = serialize_column(column); + total_size += column_data.len(); + columns_data.push(column_data); + } + + self.writing_data_block = Some((bucket, total_size, columns_data)); + return Ok(()); + } + + return Err(ErrorCode::Internal("")); + } + + Ok(()) + } + + async fn async_process(&mut self) -> Result<()> { + if let Some((bucket, total_size, data)) = self.writing_data_block.take() { + let instant = Instant::now(); + let unique_name = GlobalUniqName::unique(); + let location = format!("{}/{}", self.location_prefix, unique_name); + let object = self.operator.object(&location); + + // temp code: waiting https://github.com/datafuselabs/opendal/pull/1431 + let mut write_data = Vec::with_capacity(total_size); + let mut columns_layout = Vec::with_capacity(data.len()); + + for data in data.into_iter() { + columns_layout.push(data.len()); + write_data.extend(data); + } + + object.write(write_data).await?; + info!( + "Write aggregate spill {} successfully, elapsed: {:?}", + &location, + instant.elapsed() + ); + + self.spilled_meta = Some(AggregateMeta::::create_spilled( + bucket, + location, + columns_layout, + )); + } + + Ok(()) + } +} + +fn get_columns(data_block: DataBlock) -> Vec { + data_block.columns().to_vec() +} diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_serializer.rs deleted file mode 100644 index cedcdc864d10..000000000000 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_serializer.rs +++ /dev/null @@ -1,148 +0,0 @@ -// Copyright 2023 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use common_exception::Result; -use common_expression::types::string::StringColumnBuilder; -use common_expression::Column; -use common_expression::DataBlock; -use common_functions::aggregates::StateAddr; -use common_hashtable::HashtableEntryRefLike; -use common_hashtable::HashtableLike; -use common_pipeline_core::processors::port::InputPort; -use common_pipeline_core::processors::port::OutputPort; -use common_pipeline_core::processors::processor::ProcessorPtr; -use common_pipeline_transforms::processors::transforms::BlockMetaTransform; -use common_pipeline_transforms::processors::transforms::BlockMetaTransformer; - -use crate::pipelines::processors::transforms::aggregator::aggregate_meta::AggregateMeta; -use crate::pipelines::processors::transforms::aggregator::estimated_key_size; -use crate::pipelines::processors::transforms::aggregator::serde::serde_meta::AggregateSerdeMeta; -use crate::pipelines::processors::transforms::group_by::HashMethodBounds; -use crate::pipelines::processors::transforms::group_by::KeysColumnBuilder; -use crate::pipelines::processors::AggregatorParams; - -pub struct TransformGroupBySerializer { - method: Method, -} - -impl TransformGroupBySerializer { - pub fn try_create( - input: Arc, - output: Arc, - method: Method, - ) -> Result { - Ok(ProcessorPtr::create(BlockMetaTransformer::create( - input, - output, - TransformGroupBySerializer { method }, - ))) - } -} - -impl BlockMetaTransform> for TransformGroupBySerializer -where Method: HashMethodBounds -{ - const NAME: &'static str = "TransformGroupBySerializer"; - - fn transform(&mut self, meta: AggregateMeta) -> Result { - match meta { - AggregateMeta::Partitioned { .. } => unreachable!(), - AggregateMeta::Serialized(_) => unreachable!(), - AggregateMeta::HashTable(payload) => { - let keys_len = payload.cell.hashtable.len(); - let value_size = estimated_key_size(&payload.cell.hashtable); - let mut group_key_builder = self.method.keys_column_builder(keys_len, value_size); - - for group_entity in payload.cell.hashtable.iter() { - group_key_builder.append_value(group_entity.key()); - } - - let data_block = DataBlock::new_from_columns(vec![group_key_builder.finish()]); - data_block.add_meta(Some(AggregateSerdeMeta::create(payload.bucket))) - } - } - } -} - -pub struct TransformAggregateSerializer { - method: Method, - params: Arc, -} - -impl TransformAggregateSerializer { - pub fn try_create( - input: Arc, - output: Arc, - method: Method, - params: Arc, - ) -> Result { - Ok(ProcessorPtr::create(BlockMetaTransformer::create( - input, - output, - TransformAggregateSerializer { method, params }, - ))) - } -} - -impl BlockMetaTransform> - for TransformAggregateSerializer -where Method: HashMethodBounds -{ - const NAME: &'static str = "TransformAggregateSerializer"; - - fn transform(&mut self, meta: AggregateMeta) -> Result { - match meta { - AggregateMeta::Partitioned { .. } => unreachable!(), - AggregateMeta::Serialized(_) => unreachable!(), - AggregateMeta::HashTable(payload) => { - let keys_len = payload.cell.hashtable.len(); - let value_size = estimated_key_size(&payload.cell.hashtable); - - let funcs = &self.params.aggregate_functions; - let offsets_aggregate_states = &self.params.offsets_aggregate_states; - - // Builders. - let mut state_builders = (0..funcs.len()) - .map(|_| StringColumnBuilder::with_capacity(keys_len, keys_len * 4)) - .collect::>(); - - let mut group_key_builder = self.method.keys_column_builder(keys_len, value_size); - - for group_entity in payload.cell.hashtable.iter() { - let place = Into::::into(*group_entity.get()); - - for (idx, func) in funcs.iter().enumerate() { - let arg_place = place.next(offsets_aggregate_states[idx]); - func.serialize(arg_place, &mut state_builders[idx].data)?; - state_builders[idx].commit_row(); - } - - group_key_builder.append_value(group_entity.key()); - } - - let mut columns = Vec::with_capacity(state_builders.len() + 1); - - for builder in state_builders.into_iter() { - columns.push(Column::String(builder.build())); - } - - columns.push(group_key_builder.finish()); - let data_block = DataBlock::new_from_columns(columns); - data_block.add_meta(Some(AggregateSerdeMeta::create(payload.bucket))) - } - } - } -} diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs new file mode 100644 index 000000000000..31b41e28b62a --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs @@ -0,0 +1,264 @@ +// Copyright 2023 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::collections::VecDeque; +use std::sync::Arc; +use std::time::Instant; + +use common_exception::ErrorCode; +use common_exception::Result; +use common_expression::arrow::deserialize_column; +use common_expression::BlockMetaInfoDowncast; +use common_expression::BlockMetaInfoPtr; +use common_expression::DataBlock; +use common_pipeline_core::processors::port::InputPort; +use common_pipeline_core::processors::port::OutputPort; +use common_pipeline_core::processors::processor::Event; +use common_pipeline_core::processors::processor::ProcessorPtr; +use common_pipeline_core::processors::Processor; +use itertools::Itertools; +use opendal::Operator; +use tracing::info; + +use crate::pipelines::processors::transforms::aggregator::aggregate_meta::AggregateMeta; +use crate::pipelines::processors::transforms::aggregator::aggregate_meta::SerializedPayload; +use crate::pipelines::processors::transforms::aggregator::aggregate_meta::SpilledPayload; +use crate::pipelines::processors::transforms::group_by::HashMethodBounds; + +type DeserializingMeta = (AggregateMeta, VecDeque>); + +pub struct TransformSpillReader { + input: Arc, + output: Arc, + + operator: Operator, + deserialized_meta: Option, + reading_meta: Option>, + deserializing_meta: Option>, +} + +#[async_trait::async_trait] +impl Processor + for TransformSpillReader +{ + fn name(&self) -> String { + String::from("TransformSpillReader") + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn event(&mut self) -> Result { + if self.output.is_finished() { + self.input.finish(); + return Ok(Event::Finished); + } + + if !self.output.can_push() { + self.input.set_not_need_data(); + return Ok(Event::NeedConsume); + } + + if let Some(deserialized_meta) = self.deserialized_meta.take() { + self.output + .push_data(Ok(DataBlock::empty_with_meta(deserialized_meta))); + return Ok(Event::NeedConsume); + } + + if self.deserializing_meta.is_some() { + self.input.set_not_need_data(); + return Ok(Event::Sync); + } + + if self.reading_meta.is_some() { + self.input.set_not_need_data(); + return Ok(Event::Async); + } + + if self.input.has_data() { + let mut data_block = self.input.pull_data().unwrap()?; + + if let Some(block_meta) = data_block + .get_meta() + .and_then(AggregateMeta::::downcast_ref_from) + { + if matches!(block_meta, AggregateMeta::Spilled(_)) { + self.input.set_not_need_data(); + let block_meta = data_block.take_meta().unwrap(); + self.reading_meta = AggregateMeta::::downcast_from(block_meta); + return Ok(Event::Async); + } + + if let AggregateMeta::Partitioned { data, .. } = block_meta { + for meta in data { + if matches!(meta, AggregateMeta::Spilled(_)) { + self.input.set_not_need_data(); + let block_meta = data_block.take_meta().unwrap(); + self.reading_meta = + AggregateMeta::::downcast_from(block_meta); + return Ok(Event::Async); + } + } + } + } + + self.output.push_data(Ok(data_block)); + return Ok(Event::NeedConsume); + } + + if self.input.is_finished() { + self.output.finish(); + return Ok(Event::Finished); + } + + self.input.set_need_data(); + Ok(Event::NeedData) + } + + fn process(&mut self) -> Result<()> { + if let Some((meta, mut read_data)) = self.deserializing_meta.take() { + match meta { + AggregateMeta::Spilling(_) => unreachable!(), + AggregateMeta::HashTable(_) => unreachable!(), + AggregateMeta::Serialized(_) => unreachable!(), + AggregateMeta::Spilled(payload) => { + debug_assert!(read_data.len() == 1); + let data = read_data.pop_front().unwrap(); + + self.deserialized_meta = Some(Box::new(Self::deserialize(payload, data))); + } + AggregateMeta::Partitioned { bucket, data } => { + let mut new_data = Vec::with_capacity(data.len()); + + for meta in data { + if matches!(&meta, AggregateMeta::Spilled(_)) { + if let AggregateMeta::Spilled(payload) = meta { + let data = read_data.pop_front().unwrap(); + new_data.push(Self::deserialize(payload, data)); + } + + continue; + } + + new_data.push(meta); + } + + self.deserialized_meta = Some(AggregateMeta::::create_partitioned( + bucket, new_data, + )); + } + } + } + + Ok(()) + } + + async fn async_process(&mut self) -> Result<()> { + if let Some(block_meta) = self.reading_meta.take() { + match &block_meta { + AggregateMeta::Spilling(_) => unreachable!(), + AggregateMeta::HashTable(_) => unreachable!(), + AggregateMeta::Serialized(_) => unreachable!(), + AggregateMeta::Spilled(payload) => { + // TODO: need retry read + let instant = Instant::now(); + let object = self.operator.object(&payload.location); + let data = object.read().await?; + info!( + "Read aggregate spill {} successfully, elapsed: {:?}", + &payload.location, + instant.elapsed() + ); + // TODO: can remove this location + self.deserializing_meta = Some((block_meta, VecDeque::from(vec![data]))); + } + AggregateMeta::Partitioned { data, .. } => { + let mut read_data = Vec::with_capacity(data.len()); + for meta in data { + // TODO: need retry read + if let AggregateMeta::Spilled(payload) = meta { + let location = payload.location.clone(); + let operator = self.operator.clone(); + read_data.push(common_base::base::tokio::spawn(async move { + let instant = Instant::now(); + let object = operator.object(&location); + let data = object.read().await?; + info!( + "Read aggregate spill {} successfully, elapsed: {:?}", + location, + instant.elapsed() + ); + Ok(data) + // TODO: can remove this location + })); + } + } + + match futures::future::try_join_all(read_data).await { + Err(_) => { + return Err(ErrorCode::TokioError("Cannot join tokio job")); + } + Ok(read_data) => { + let read_data: Result>, opendal::Error> = + read_data.into_iter().try_collect(); + + self.deserializing_meta = Some((block_meta, read_data?)); + } + }; + } + } + } + + Ok(()) + } +} + +impl TransformSpillReader { + pub fn create( + input: Arc, + output: Arc, + operator: Operator, + ) -> Result { + Ok(ProcessorPtr::create(Box::new(TransformSpillReader::< + Method, + V, + > { + input, + output, + operator, + deserialized_meta: None, + reading_meta: None, + deserializing_meta: None, + }))) + } + + fn deserialize(payload: SpilledPayload, data: Vec) -> AggregateMeta { + let mut begin = 0; + let mut columns = Vec::with_capacity(payload.columns_layout.len()); + for column_layout in payload.columns_layout { + columns.push(deserialize_column(&data[begin..begin + column_layout]).unwrap()); + begin += column_layout; + } + + AggregateMeta::::Serialized(SerializedPayload { + bucket: payload.bucket, + data_block: DataBlock::new_from_columns(columns), + }) + } +} + +pub type TransformGroupBySpillReader = TransformSpillReader; +pub type TransformAggregateSpillReader = TransformSpillReader; diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs index c16189f47fed..8c235eca5ed3 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs @@ -74,6 +74,8 @@ where Method: HashMethodBounds for bucket_data in data { match bucket_data { + AggregateMeta::Spilled(_) => unreachable!(), + AggregateMeta::Spilling(_) => unreachable!(), AggregateMeta::Partitioned { .. } => unreachable!(), AggregateMeta::Serialized(payload) => { debug_assert!(bucket == payload.bucket); diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs index 52000529440b..5b253c809556 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs @@ -34,6 +34,7 @@ use crate::pipelines::processors::transforms::aggregator::aggregate_cell::Aggreg use crate::pipelines::processors::transforms::aggregator::aggregate_meta::AggregateMeta; use crate::pipelines::processors::transforms::group_by::HashMethodBounds; use crate::pipelines::processors::transforms::group_by::PartitionedHashMethod; +use crate::pipelines::processors::transforms::group_by::PolymorphicKeysHelper; use crate::pipelines::processors::transforms::HashTableCell; use crate::pipelines::processors::transforms::PartitionedHashTableDropper; use crate::pipelines::processors::AggregatorParams; @@ -52,20 +53,29 @@ impl Default for HashTable { } } -struct GroupBySettings { +struct AggregateSettings { convert_threshold: usize, spilling_bytes_threshold_per_proc: usize, } -impl TryFrom> for GroupBySettings { +impl TryFrom> for AggregateSettings { type Error = ErrorCode; fn try_from(ctx: Arc) -> std::result::Result { let settings = ctx.get_settings(); let convert_threshold = settings.get_group_by_two_level_threshold()? as usize; - Ok(GroupBySettings { + let mut spilling_bytes_threshold_per_proc = usize::MAX; + + if ctx.get_cluster().is_empty() { + let value = settings.get_spilling_bytes_threshold_per_proc()?; + if value != 0 { + spilling_bytes_threshold_per_proc = value; + } + } + + Ok(AggregateSettings { convert_threshold, - spilling_bytes_threshold_per_proc: usize::MAX, + spilling_bytes_threshold_per_proc, }) } } @@ -73,7 +83,7 @@ impl TryFrom> for GroupBySettings { // SELECT column_name, agg(xxx) FROM table_name GROUP BY column_name pub struct TransformPartialAggregate { method: Method, - settings: GroupBySettings, + settings: AggregateSettings, hash_table: HashTable, params: Arc, @@ -106,7 +116,7 @@ impl TransformPartialAggregate { method, params, hash_table, - settings: GroupBySettings::try_from(ctx)?, + settings: AggregateSettings::try_from(ctx)?, }, )) } @@ -232,8 +242,8 @@ impl AccumulatingTransform for TransformPartialAggrega #[allow(clippy::collapsible_if)] if Method::SUPPORT_PARTITIONED { if matches!(&self.hash_table, HashTable::HashTable(cell) - if cell.hashtable.len() >= self.settings.convert_threshold || - cell.hashtable.bytes_len() >= self.settings.spilling_bytes_threshold_per_proc + if cell.len() >= self.settings.convert_threshold || + cell.allocated_bytes() >= self.settings.spilling_bytes_threshold_per_proc ) { if let HashTable::HashTable(cell) = std::mem::take(&mut self.hash_table) { self.hash_table = HashTable::PartitionedHashTable( @@ -241,6 +251,35 @@ impl AccumulatingTransform for TransformPartialAggrega ); } } + + if matches!(&self.hash_table, HashTable::PartitionedHashTable(cell) if cell.allocated_bytes() > self.settings.spilling_bytes_threshold_per_proc) + { + if let HashTable::PartitionedHashTable(v) = std::mem::take(&mut self.hash_table) { + let _dropper = v._dropper.clone(); + let cells = PartitionedHashTableDropper::split_cell(v); + let mut blocks = Vec::with_capacity(cells.len()); + for (bucket, cell) in cells.into_iter().enumerate() { + if cell.hashtable.len() != 0 { + blocks.push(DataBlock::empty_with_meta( + AggregateMeta::::create_spilling( + bucket as isize, + cell, + ), + )); + } + } + + let method = PartitionedHashMethod::::create(self.method.clone()); + let new_hashtable = method.create_hash_table()?; + self.hash_table = HashTable::PartitionedHashTable(HashTableCell::create( + new_hashtable, + _dropper.unwrap(), + )); + return Ok(blocks); + } + + unreachable!() + } } Ok(vec![]) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs index 70c411347a89..23db118c83ad 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs @@ -62,6 +62,8 @@ where Method: HashMethodBounds let mut hashtable = self.method.create_hash_table::<()>()?; 'merge_hashtable: for bucket_data in data { match bucket_data { + AggregateMeta::Spilled(_) => unreachable!(), + AggregateMeta::Spilling(_) => unreachable!(), AggregateMeta::Partitioned { .. } => unreachable!(), AggregateMeta::Serialized(payload) => { debug_assert!(bucket == payload.bucket); diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_partial.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_partial.rs index 2640eabcb1c5..62d4ffe7dc36 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_partial.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_partial.rs @@ -32,6 +32,7 @@ use crate::pipelines::processors::transforms::aggregator::aggregate_cell::HashTa use crate::pipelines::processors::transforms::aggregator::aggregate_meta::AggregateMeta; use crate::pipelines::processors::transforms::group_by::HashMethodBounds; use crate::pipelines::processors::transforms::group_by::PartitionedHashMethod; +use crate::pipelines::processors::transforms::group_by::PolymorphicKeysHelper; use crate::pipelines::processors::transforms::PartitionedHashTableDropper; use crate::pipelines::processors::AggregatorParams; use crate::sessions::QueryContext; @@ -60,9 +61,18 @@ impl TryFrom> for GroupBySettings { fn try_from(ctx: Arc) -> std::result::Result { let settings = ctx.get_settings(); let convert_threshold = settings.get_group_by_two_level_threshold()? as usize; + let mut spilling_bytes_threshold_per_proc = usize::MAX; + + if ctx.get_cluster().is_empty() { + let value = settings.get_spilling_bytes_threshold_per_proc()?; + if value != 0 { + spilling_bytes_threshold_per_proc = value; + } + } + Ok(GroupBySettings { convert_threshold, - spilling_bytes_threshold_per_proc: usize::MAX, + spilling_bytes_threshold_per_proc, }) } } @@ -137,8 +147,8 @@ impl AccumulatingTransform for TransformPartialGroupBy #[allow(clippy::collapsible_if)] if Method::SUPPORT_PARTITIONED { if matches!(&self.hash_table, HashTable::HashTable(cell) - if cell.hashtable.len() >= self.settings.convert_threshold || - cell.hashtable.bytes_len() >= self.settings.spilling_bytes_threshold_per_proc + if cell.len() >= self.settings.convert_threshold || + cell.allocated_bytes() >= self.settings.spilling_bytes_threshold_per_proc ) { if let HashTable::HashTable(cell) = std::mem::take(&mut self.hash_table) { self.hash_table = HashTable::PartitionedHashTable( @@ -146,6 +156,36 @@ impl AccumulatingTransform for TransformPartialGroupBy ); } } + + if matches!(&self.hash_table, HashTable::PartitionedHashTable(cell) if cell.allocated_bytes() > self.settings.spilling_bytes_threshold_per_proc) + { + if let HashTable::PartitionedHashTable(v) = std::mem::take(&mut self.hash_table) + { + let _dropper = v._dropper.clone(); + let cells = PartitionedHashTableDropper::split_cell(v); + let mut blocks = Vec::with_capacity(cells.len()); + for (bucket, cell) in cells.into_iter().enumerate() { + if cell.hashtable.len() != 0 { + blocks.push(DataBlock::empty_with_meta( + AggregateMeta::::create_spilling( + bucket as isize, + cell, + ), + )); + } + } + + let method = PartitionedHashMethod::::create(self.method.clone()); + let new_hashtable = method.create_hash_table()?; + self.hash_table = HashTable::PartitionedHashTable(HashTableCell::create( + new_hashtable, + _dropper.unwrap(), + )); + return Ok(blocks); + } + + unreachable!() + } } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_partition_bucket.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_partition_bucket.rs index fd3e3dbe95e9..fcddb1635e1a 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_partition_bucket.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_partition_bucket.rs @@ -36,6 +36,7 @@ use common_pipeline_core::processors::processor::Event; use common_pipeline_core::processors::processor::ProcessorPtr; use common_pipeline_core::processors::Processor; use common_pipeline_core::Pipeline; +use common_storage::DataOperator; use crate::pipelines::processors::transforms::aggregator::aggregate_meta::AggregateMeta; use crate::pipelines::processors::transforms::aggregator::aggregate_meta::HashTablePayload; @@ -46,8 +47,10 @@ use crate::pipelines::processors::transforms::group_by::HashMethodBounds; use crate::pipelines::processors::transforms::group_by::KeysColumnIter; use crate::pipelines::processors::transforms::group_by::PartitionedHashMethod; use crate::pipelines::processors::transforms::PartitionedHashTableDropper; +use crate::pipelines::processors::transforms::TransformAggregateSpillReader; use crate::pipelines::processors::transforms::TransformFinalAggregate; use crate::pipelines::processors::transforms::TransformGroupByDeserializer; +use crate::pipelines::processors::transforms::TransformGroupBySpillReader; use crate::pipelines::processors::AggregatorParams; use crate::sessions::QueryContext; @@ -146,10 +149,12 @@ impl fn add_bucket(&mut self, data_block: DataBlock) -> isize { if let Some(block_meta) = data_block.get_meta() { if let Some(block_meta) = AggregateMeta::::downcast_ref_from(block_meta) { - let bucket = match block_meta { + let (bucket, res) = match block_meta { + AggregateMeta::Spilling(_) => unreachable!(), AggregateMeta::Partitioned { .. } => unreachable!(), - AggregateMeta::Serialized(payload) => payload.bucket, - AggregateMeta::HashTable(payload) => payload.bucket, + AggregateMeta::Spilled(payload) => (payload.bucket, SINGLE_LEVEL_BUCKET_NUM), + AggregateMeta::Serialized(payload) => (payload.bucket, payload.bucket), + AggregateMeta::HashTable(payload) => (payload.bucket, payload.bucket), }; if bucket > SINGLE_LEVEL_BUCKET_NUM { @@ -162,7 +167,7 @@ impl } }; - return bucket; + return res; } } } @@ -377,6 +382,8 @@ impl Processor )), Some(agg_block_meta) => { let data_blocks = match agg_block_meta { + AggregateMeta::Spilled(_) => unreachable!(), + AggregateMeta::Spilling(_) => unreachable!(), AggregateMeta::Partitioned { .. } => unreachable!(), AggregateMeta::Serialized(payload) => self.partition_block(payload)?, AggregateMeta::HashTable(payload) => self.partition_hashtable(payload)?, @@ -430,6 +437,17 @@ fn build_partition_bucket TransformGroupBySpillReader::::create(input, output, operator), + false => TransformAggregateSpillReader::::create(input, output, operator), + } + })?; + } + pipeline.add_transform( |input, output| match params.aggregate_functions.is_empty() { true => { diff --git a/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_state.rs b/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_state.rs index a61a7b05565b..6c1eb5278af9 100644 --- a/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_state.rs @@ -28,6 +28,10 @@ impl Area { Area { bump: Bump::new() } } + pub fn allocated_bytes(&self) -> usize { + self.bump.allocated_bytes() + } + pub fn alloc_layout(&mut self, layout: Layout) -> NonNull { self.bump.alloc_layout(layout) } @@ -46,6 +50,13 @@ impl ArenaHolder { _data: Arc::new(area), } } + + pub fn allocated_bytes(&self) -> usize { + match self._data.as_ref() { + None => 0, + Some(arena) => arena.allocated_bytes(), + } + } } impl Debug for ArenaHolder { diff --git a/src/query/service/src/pipelines/processors/transforms/mod.rs b/src/query/service/src/pipelines/processors/transforms/mod.rs index f414092c7172..0e2568270646 100644 --- a/src/query/service/src/pipelines/processors/transforms/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/mod.rs @@ -37,10 +37,15 @@ pub use aggregator::FinalSingleStateAggregator; pub use aggregator::HashTableCell; pub use aggregator::PartialSingleStateAggregator; pub use aggregator::PartitionedHashTableDropper; +pub use aggregator::TransformAggregateDeserializer; pub use aggregator::TransformAggregateSerializer; +pub use aggregator::TransformAggregateSpillReader; +pub use aggregator::TransformAggregateSpillWriter; pub use aggregator::TransformFinalAggregate; pub use aggregator::TransformGroupByDeserializer; pub use aggregator::TransformGroupBySerializer; +pub use aggregator::TransformGroupBySpillReader; +pub use aggregator::TransformGroupBySpillWriter; pub use aggregator::TransformPartialAggregate; pub use aggregator::TransformPartialGroupBy; use common_pipeline_transforms::processors::transforms::transform; diff --git a/src/query/service/tests/it/storages/testdata/settings_table.txt b/src/query/service/tests/it/storages/testdata/settings_table.txt index b0a2f85863af..791f2da4f173 100644 --- a/src/query/service/tests/it/storages/testdata/settings_table.txt +++ b/src/query/service/tests/it/storages/testdata/settings_table.txt @@ -29,6 +29,7 @@ DB.Table: 'system'.'settings', Table: settings-table_id:1, ver:0, Engine: System | "quoted_ident_case_sensitive" | "1" | "1" | "SESSION" | "Case sensitivity of quoted identifiers, default value: 1 (aka case-sensitive)." | "UInt64" | | "retention_period" | "12" | "12" | "SESSION" | "The retention_period in hours. By default the value is 12 hours." | "UInt64" | | "sandbox_tenant" | "" | "" | "SESSION" | "Inject a custom sandbox_tenant into this session, it's only for testing purpose and take effect when the internal_enable_sandbox_tenant is on" | "String" | +| "spilling_bytes_threshold_per_proc" | "0" | "0" | "SESSION" | "When the memory used by the aggregator exceeds the set value, the data will overflow into the storage. disable if it's 0." | "UInt64" | | "sql_dialect" | "PostgreSQL" | "PostgreSQL" | "SESSION" | "SQL dialect, support \"PostgreSQL\" \"MySQL\" and \"Hive\", default value: \"PostgreSQL\"." | "String" | | "storage_fetch_part_num" | "2" | "2" | "SESSION" | "The max number of part each read cycle." | "UInt64" | | "storage_io_max_page_bytes_for_read" | "524288" | "524288" | "SESSION" | "The maximum bytes of one IO request to read. Default the value is 512KB" | "UInt64" | diff --git a/src/query/settings/src/lib.rs b/src/query/settings/src/lib.rs index 4a52ec82d097..791d9ff44afe 100644 --- a/src/query/settings/src/lib.rs +++ b/src/query/settings/src/lib.rs @@ -532,6 +532,16 @@ impl Settings { Default is FALSE (disabled).", possible_values: None, }, + SettingValue { + default_value: UserSettingValue::UInt64(0), + user_setting: UserSetting::create( + "spilling_bytes_threshold_per_proc", + UserSettingValue::UInt64(0), + ), + level: ScopeLevel::Session, + desc: "When the memory used by the aggregator exceeds the set value, the data will overflow into the storage. disable if it's 0.", + possible_values: None, + }, ]; let settings: Arc> = Arc::new(DashMap::default()); @@ -883,6 +893,16 @@ impl Settings { self.try_get_u64(key).map(|v| v != 0) } + pub fn get_spilling_bytes_threshold_per_proc(&self) -> Result { + let key = "spilling_bytes_threshold_per_proc"; + self.try_get_u64(key).map(|v| v as usize) + } + + pub fn set_spilling_bytes_threshold_per_proc(&self, value: usize) -> Result<()> { + let key = "spilling_bytes_threshold_per_proc"; + self.try_set_u64(key, value as u64, false) + } + pub fn has_setting(&self, key: &str) -> bool { self.settings.get(key).is_some() } diff --git a/tests/sqllogictests/suites/base/03_common/03_0037_spill_aggregator b/tests/sqllogictests/suites/base/03_common/03_0037_spill_aggregator new file mode 100644 index 000000000000..e6e6c9bc7cf8 --- /dev/null +++ b/tests/sqllogictests/suites/base/03_common/03_0037_spill_aggregator @@ -0,0 +1,21 @@ +statement ok +set max_threads = 8; + +statement ok +set global spilling_bytes_threshold_per_proc = 1024 * 1024 * 1; + +query TIFS +SELECT COUNT() FROM (SELECT number::string, count() FROM numbers_mt(100000) group by number::string); +---- +100000 + +query TIFS +SELECT COUNT() FROM (SELECT number::string FROM numbers_mt(100000) group by number::string); +---- +100000 + +statement ok +unset max_threads; + +statement ok +set global spilling_bytes_threshold_per_proc = 0;