Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(query): support aggregate spill to object storage #10273

Merged
merged 16 commits into from
Mar 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions src/query/service/src/pipelines/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use common_sql::executor::UnionAll;
use common_sql::plans::JoinType;
use common_sql::ColumnBinding;
use common_sql::IndexType;
use common_storage::DataOperator;

use super::processors::ProfileWrapper;
use crate::api::ExchangeSorting;
Expand All @@ -66,7 +67,9 @@ use crate::pipelines::processors::transforms::HashJoinDesc;
use crate::pipelines::processors::transforms::PartialSingleStateAggregator;
use crate::pipelines::processors::transforms::RightSemiAntiJoinCompactor;
use crate::pipelines::processors::transforms::TransformAggregateSerializer;
use crate::pipelines::processors::transforms::TransformAggregateSpillWriter;
use crate::pipelines::processors::transforms::TransformGroupBySerializer;
use crate::pipelines::processors::transforms::TransformGroupBySpillWriter;
use crate::pipelines::processors::transforms::TransformLeftJoin;
use crate::pipelines::processors::transforms::TransformMarkJoin;
use crate::pipelines::processors::transforms::TransformMergeBlock;
Expand Down Expand Up @@ -431,6 +434,44 @@ impl PipelineBuilder {
}
})?;

if self.ctx.get_cluster().is_empty() {
let operator = DataOperator::instance().operator();
let location_prefix = format!("_aggregate_spill/{}", self.ctx.get_tenant());
self.main_pipeline.add_transform(|input, output| {
let transform = match params.aggregate_functions.is_empty() {
true => with_mappedhash_method!(|T| match method.clone() {
HashMethodKind::T(method) => TransformGroupBySpillWriter::create(
input,
output,
method,
operator.clone(),
location_prefix.clone()
),
}),
false => with_mappedhash_method!(|T| match method.clone() {
HashMethodKind::T(method) => TransformAggregateSpillWriter::create(
input,
output,
method,
operator.clone(),
params.clone(),
location_prefix.clone()
),
}),
};

if self.enable_profiling {
Ok(ProcessorPtr::create(ProfileWrapper::create(
transform,
aggregate.plan_id,
self.prof_span_set.clone(),
)))
} else {
Ok(ProcessorPtr::create(transform))
}
})?;
}

if !self.ctx.get_cluster().is_empty() {
// TODO: can serialize only when needed.
self.main_pipeline.add_transform(|input, output| {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,20 @@ impl<T: HashMethodBounds, V: Send + Sync + 'static> HashTableCell<T, V> {
arena: Area::create(),
}
}

pub fn len(&self) -> usize {
self.hashtable.len()
}

pub fn allocated_bytes(&self) -> usize {
self.hashtable.bytes_len()
+ self.arena.allocated_bytes()
+ self
.arena_holders
.iter()
.map(ArenaHolder::allocated_bytes)
.sum::<usize>()
}
}

pub trait HashTableDropper<T: HashMethodBounds, V: Send + Sync + 'static> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use common_expression::DataBlock;

use crate::api::ExchangeSorting;
use crate::pipelines::processors::transforms::aggregator::serde::AggregateSerdeMeta;
use crate::pipelines::processors::transforms::aggregator::serde::BUCKET_TYPE;

pub struct AggregateExchangeSorting {}

Expand All @@ -38,7 +39,10 @@ impl ExchangeSorting for AggregateExchangeSorting {
None => Err(ErrorCode::Internal(
"Internal error, AggregateExchangeSorting only recv AggregateSerdeMeta",
)),
Some(meta_info) => Ok(meta_info.bucket),
Some(meta_info) => match meta_info.typ == BUCKET_TYPE {
true => Ok(meta_info.bucket),
false => Ok(-1),
},
},
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,17 @@ impl SerializedPayload {
}
}

pub struct SpilledPayload {
pub bucket: isize,
pub location: String,
pub columns_layout: Vec<usize>,
}

pub enum AggregateMeta<Method: HashMethodBounds, V: Send + Sync + 'static> {
Serialized(SerializedPayload),
HashTable(HashTablePayload<Method, V>),
Spilling(HashTablePayload<Method, V>),
Spilled(SpilledPayload),

Partitioned { bucket: isize, data: Vec<Self> },
}
Expand All @@ -66,6 +74,26 @@ impl<Method: HashMethodBounds, V: Send + Sync + 'static> AggregateMeta<Method, V
}))
}

pub fn create_spilling(bucket: isize, cell: HashTableCell<Method, V>) -> BlockMetaInfoPtr {
Box::new(AggregateMeta::<Method, V>::Spilling(HashTablePayload {
cell,
bucket,
arena_holder: ArenaHolder::create(None),
}))
}

pub fn create_spilled(
bucket: isize,
location: String,
columns_layout: Vec<usize>,
) -> BlockMetaInfoPtr {
Box::new(AggregateMeta::<Method, V>::Spilled(SpilledPayload {
bucket,
location,
columns_layout,
}))
}

pub fn create_partitioned(bucket: isize, data: Vec<Self>) -> BlockMetaInfoPtr {
Box::new(AggregateMeta::<Method, V>::Partitioned { data, bucket })
}
Expand Down Expand Up @@ -99,6 +127,8 @@ impl<Method: HashMethodBounds, V: Send + Sync + 'static> Debug for AggregateMeta
AggregateMeta::Serialized { .. } => {
f.debug_struct("AggregateMeta::Serialized").finish()
}
AggregateMeta::Spilling(_) => f.debug_struct("Aggregate::Spilling").finish(),
AggregateMeta::Spilled(_) => f.debug_struct("Aggregate::Spilled").finish(),
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,9 @@ pub use utils::*;

pub use self::serde::TransformAggregateDeserializer;
pub use self::serde::TransformAggregateSerializer;
pub use self::serde::TransformAggregateSpillReader;
pub use self::serde::TransformAggregateSpillWriter;
pub use self::serde::TransformGroupByDeserializer;
pub use self::serde::TransformGroupBySerializer;
pub use self::serde::TransformGroupBySpillReader;
pub use self::serde::TransformGroupBySpillWriter;
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,21 @@
// limitations under the License.

mod serde_meta;
mod transform_aggregate_serializer;
mod transform_aggregate_spill_writer;
mod transform_deserializer;
mod transform_serializer;
mod transform_group_by_serializer;
mod transform_group_by_spill_writer;
mod transform_spill_reader;

pub use serde_meta::AggregateSerdeMeta;
pub use serde_meta::BUCKET_TYPE;
pub use serde_meta::SPILLED_TYPE;
pub use transform_aggregate_serializer::TransformAggregateSerializer;
pub use transform_aggregate_spill_writer::TransformAggregateSpillWriter;
pub use transform_deserializer::TransformAggregateDeserializer;
pub use transform_deserializer::TransformGroupByDeserializer;
pub use transform_serializer::TransformAggregateSerializer;
pub use transform_serializer::TransformGroupBySerializer;
pub use transform_group_by_serializer::TransformGroupBySerializer;
pub use transform_group_by_spill_writer::TransformGroupBySpillWriter;
pub use transform_spill_reader::TransformAggregateSpillReader;
pub use transform_spill_reader::TransformGroupBySpillReader;
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,39 @@ use common_expression::BlockMetaInfo;
use common_expression::BlockMetaInfoDowncast;
use common_expression::BlockMetaInfoPtr;

pub const BUCKET_TYPE: usize = 1;
pub const SPILLED_TYPE: usize = 2;

// Cannot change to enum, because bincode cannot deserialize custom enum
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)]
pub struct AggregateSerdeMeta {
pub typ: usize,
pub bucket: isize,
pub location: Option<String>,
pub columns_layout: Vec<usize>,
}

impl AggregateSerdeMeta {
pub fn create(bucket: isize) -> BlockMetaInfoPtr {
Box::new(AggregateSerdeMeta { bucket })
Box::new(AggregateSerdeMeta {
typ: BUCKET_TYPE,
bucket,
location: None,
columns_layout: vec![],
})
}

pub fn create_spilled(
bucket: isize,
location: String,
columns_layout: Vec<usize>,
) -> BlockMetaInfoPtr {
Box::new(AggregateSerdeMeta {
typ: SPILLED_TYPE,
bucket,
columns_layout,
location: Some(location),
})
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2023 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_exception::Result;
use common_expression::types::string::StringColumnBuilder;
use common_expression::Column;
use common_expression::DataBlock;
use common_functions::aggregates::StateAddr;
use common_hashtable::HashtableEntryRefLike;
use common_hashtable::HashtableLike;
use common_pipeline_core::processors::port::InputPort;
use common_pipeline_core::processors::port::OutputPort;
use common_pipeline_core::processors::processor::ProcessorPtr;
use common_pipeline_transforms::processors::transforms::BlockMetaTransform;
use common_pipeline_transforms::processors::transforms::BlockMetaTransformer;

use crate::pipelines::processors::transforms::aggregator::aggregate_meta::AggregateMeta;
use crate::pipelines::processors::transforms::aggregator::aggregate_meta::HashTablePayload;
use crate::pipelines::processors::transforms::aggregator::estimated_key_size;
use crate::pipelines::processors::transforms::aggregator::serde::serde_meta::AggregateSerdeMeta;
use crate::pipelines::processors::transforms::group_by::HashMethodBounds;
use crate::pipelines::processors::transforms::group_by::KeysColumnBuilder;
use crate::pipelines::processors::AggregatorParams;

pub struct TransformAggregateSerializer<Method: HashMethodBounds> {
method: Method,
params: Arc<AggregatorParams>,
}

impl<Method: HashMethodBounds> TransformAggregateSerializer<Method> {
pub fn try_create(
input: Arc<InputPort>,
output: Arc<OutputPort>,
method: Method,
params: Arc<AggregatorParams>,
) -> Result<ProcessorPtr> {
Ok(ProcessorPtr::create(BlockMetaTransformer::create(
input,
output,
TransformAggregateSerializer { method, params },
)))
}
}

impl<Method> BlockMetaTransform<AggregateMeta<Method, usize>>
for TransformAggregateSerializer<Method>
where Method: HashMethodBounds
{
const NAME: &'static str = "TransformAggregateSerializer";

fn transform(&mut self, meta: AggregateMeta<Method, usize>) -> Result<DataBlock> {
match meta {
AggregateMeta::Spilling(_) => unreachable!(),
AggregateMeta::Partitioned { .. } => unreachable!(),
AggregateMeta::Serialized(_) => unreachable!(),
AggregateMeta::Spilled(payload) => Ok(DataBlock::empty_with_meta(
AggregateSerdeMeta::create_spilled(
payload.bucket,
payload.location,
payload.columns_layout,
),
)),
AggregateMeta::HashTable(payload) => {
let bucket = payload.bucket;
let data_block = serialize_aggregate(&self.method, &self.params, payload)?;
data_block.add_meta(Some(AggregateSerdeMeta::create(bucket)))
}
}
}
}

pub fn serialize_aggregate<Method: HashMethodBounds>(
method: &Method,
params: &Arc<AggregatorParams>,
payload: HashTablePayload<Method, usize>,
) -> Result<DataBlock> {
let keys_len = payload.cell.hashtable.len();
let value_size = estimated_key_size(&payload.cell.hashtable);

let funcs = &params.aggregate_functions;
let offsets_aggregate_states = &params.offsets_aggregate_states;

// Builders.
let mut state_builders = (0..funcs.len())
.map(|_| StringColumnBuilder::with_capacity(keys_len, keys_len * 4))
.collect::<Vec<_>>();

let mut group_key_builder = method.keys_column_builder(keys_len, value_size);

for group_entity in payload.cell.hashtable.iter() {
let place = Into::<StateAddr>::into(*group_entity.get());

for (idx, func) in funcs.iter().enumerate() {
let arg_place = place.next(offsets_aggregate_states[idx]);
func.serialize(arg_place, &mut state_builders[idx].data)?;
state_builders[idx].commit_row();
}

group_key_builder.append_value(group_entity.key());
}

let mut columns = Vec::with_capacity(state_builders.len() + 1);

for builder in state_builders.into_iter() {
columns.push(Column::String(builder.build()));
}

columns.push(group_key_builder.finish());
Ok(DataBlock::new_from_columns(columns))
}
Loading