Skip to content

Commit

Permalink
feat(query): support spill for new agg hashtable (databendlabs#14905)
Browse files Browse the repository at this point in the history
  • Loading branch information
Freejww authored and yufan022 committed Jun 18, 2024
1 parent fcd21c2 commit 6017ebf
Show file tree
Hide file tree
Showing 45 changed files with 1,108 additions and 355 deletions.
33 changes: 27 additions & 6 deletions src/query/expression/src/aggregate/aggregate_hashtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
// A new AggregateHashtable which inspired by duckdb's https://duckdb.org/2022/03/07/aggregate-hashtable.html

use std::sync::atomic::Ordering;
use std::sync::Arc;

use bumpalo::Bump;
use databend_common_exception::Result;

use super::partitioned_payload::PartitionedPayload;
Expand All @@ -24,6 +26,7 @@ use super::probe_state::ProbeState;
use crate::aggregate::payload_row::row_match_columns;
use crate::group_hash_columns;
use crate::new_sel;
use crate::read;
use crate::types::DataType;
use crate::AggregateFunctionRef;
use crate::Column;
Expand All @@ -44,7 +47,7 @@ pub struct AggregateHashTable {
pub payload: PartitionedPayload,
// use for append rows directly during deserialize
pub direct_append: bool,
config: HashTableConfig,
pub config: HashTableConfig,
current_radix_bits: u64,
entries: Vec<Entry>,
count: usize,
Expand All @@ -59,23 +62,30 @@ impl AggregateHashTable {
group_types: Vec<DataType>,
aggrs: Vec<AggregateFunctionRef>,
config: HashTableConfig,
arena: Arc<Bump>,
) -> Self {
let capacity = Self::initial_capacity();
Self::new_with_capacity(group_types, aggrs, config, capacity)
Self::new_with_capacity(group_types, aggrs, config, capacity, arena)
}

pub fn new_with_capacity(
group_types: Vec<DataType>,
aggrs: Vec<AggregateFunctionRef>,
config: HashTableConfig,
capacity: usize,
arena: Arc<Bump>,
) -> Self {
Self {
entries: vec![0u64; capacity],
count: 0,
direct_append: false,
current_radix_bits: config.initial_radix_bits,
payload: PartitionedPayload::new(group_types, aggrs, 1 << config.initial_radix_bits),
payload: PartitionedPayload::new(
group_types,
aggrs,
1 << config.initial_radix_bits,
vec![arena],
),
capacity,
config,
}
Expand Down Expand Up @@ -150,8 +160,8 @@ impl AggregateHashTable {
if !self.payload.aggrs.is_empty() {
for i in 0..row_count {
state.state_places[i] = unsafe {
StateAddr::new(core::ptr::read::<u64>(
state.addresses[i].add(self.payload.state_offset) as _,
StateAddr::new(read::<u64>(
state.addresses[i].add(self.payload.state_offset) as _
) as usize)
};
}
Expand Down Expand Up @@ -356,7 +366,7 @@ impl AggregateHashTable {
if !self.payload.aggrs.is_empty() {
for i in 0..row_count {
flush_state.probe_state.state_places[i] = unsafe {
StateAddr::new(core::ptr::read::<u64>(
StateAddr::new(read::<u64>(
flush_state.probe_state.addresses[i].add(self.payload.state_offset)
as _,
) as usize)
Expand Down Expand Up @@ -446,6 +456,7 @@ impl AggregateHashTable {
self.payload.group_types.clone(),
self.payload.aggrs.clone(),
1,
vec![Arc::new(Bump::new())],
);
let payload = std::mem::replace(&mut self.payload, temp_payload);
let mut state = PayloadFlushState::default();
Expand Down Expand Up @@ -530,6 +541,16 @@ impl AggregateHashTable {
pub fn reset_count(&mut self) {
self.count = 0;
}

pub fn allocated_bytes(&self) -> usize {
self.payload.memory_size()
+ self
.payload
.arenas
.iter()
.map(|arena| arena.allocated_bytes())
.sum::<usize>()
}
}

/// Upper 16 bits are salt
Expand Down
25 changes: 17 additions & 8 deletions src/query/expression/src/aggregate/partitioned_payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use itertools::Itertools;

use super::payload::Payload;
use super::probe_state::ProbeState;
use crate::read;
use crate::types::DataType;
use crate::AggregateFunctionRef;
use crate::Column;
Expand Down Expand Up @@ -54,14 +55,13 @@ impl PartitionedPayload {
group_types: Vec<DataType>,
aggrs: Vec<AggregateFunctionRef>,
partition_count: u64,
arenas: Vec<Arc<Bump>>,
) -> Self {
let radix_bits = partition_count.trailing_zeros() as u64;
debug_assert_eq!(1 << radix_bits, partition_count);

let arena = Arc::new(Bump::new());

let payloads = (0..partition_count)
.map(|_| Payload::new(arena.clone(), group_types.clone(), aggrs.clone()))
.map(|_| Payload::new(arenas[0].clone(), group_types.clone(), aggrs.clone()))
.collect_vec();

let group_sizes = payloads[0].group_sizes.clone();
Expand All @@ -85,7 +85,7 @@ impl PartitionedPayload {
state_layout,
partition_count,

arenas: vec![arena],
arenas,
mask_v: mask(radix_bits),
shift_v: shift(radix_bits),
}
Expand Down Expand Up @@ -145,13 +145,14 @@ impl PartitionedPayload {
self.group_types.clone(),
self.aggrs.clone(),
new_partition_count as u64,
self.arenas.clone(),
);

new_partition_payload.combine(self, state);
new_partition_payload
}

pub fn combine(&mut self, mut other: PartitionedPayload, state: &mut PayloadFlushState) {
pub fn combine(&mut self, other: PartitionedPayload, state: &mut PayloadFlushState) {
if other.partition_count == self.partition_count {
for (l, r) in self.payloads.iter_mut().zip(other.payloads.into_iter()) {
l.combine(r);
Expand All @@ -163,7 +164,6 @@ impl PartitionedPayload {
self.combine_single(payload, state)
}
}
self.arenas.append(&mut other.arenas);
}

pub fn combine_single(&mut self, mut other: Payload, state: &mut PayloadFlushState) {
Expand Down Expand Up @@ -218,8 +218,7 @@ impl PartitionedPayload {
for idx in 0..rows {
state.addresses[idx] = other.data_ptr(page, idx + state.flush_page_row);

let hash =
unsafe { core::ptr::read::<u64>(state.addresses[idx].add(self.hash_offset) as _) };
let hash = unsafe { read::<u64>(state.addresses[idx].add(self.hash_offset) as _) };

let partition_idx = ((hash & self.mask_v) >> self.shift_v) as usize;

Expand Down Expand Up @@ -250,6 +249,16 @@ impl PartitionedPayload {
pub fn memory_size(&self) -> usize {
self.payloads.iter().map(|x| x.memory_size()).sum()
}

pub fn include_arena(&self, other: &Arc<Bump>) -> bool {
for arena in self.arenas.iter() {
if Arc::ptr_eq(arena, other) {
return true;
}
}

false
}
}

#[inline]
Expand Down
35 changes: 25 additions & 10 deletions src/query/expression/src/aggregate/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@ use std::mem::MaybeUninit;
use std::sync::Arc;

use bumpalo::Bump;
use itertools::Itertools;
use strength_reduce::StrengthReducedU64;

use super::payload_row::rowformat_size;
use super::payload_row::serialize_column_to_rowformat;
use crate::get_layout_offsets;
use crate::read;
use crate::store;
use crate::types::DataType;
use crate::AggregateFunctionRef;
use crate::Column;
use crate::ColumnBuilder;
use crate::DataBlock;
use crate::PayloadFlushState;
use crate::SelectVector;
use crate::StateAddr;
Expand All @@ -40,7 +44,6 @@ use crate::MAX_PAGE_SIZE;
// [STATE_ADDRS] is the state_addrs of the aggregate functions, 8 bytes each
pub struct Payload {
pub arena: Arc<Bump>,
pub arenas: Vec<Arc<Bump>>,
// if true, the states are moved out of the payload into other payload, and will not be dropped
pub state_move_out: bool,
pub group_types: Vec<DataType>,
Expand Down Expand Up @@ -123,8 +126,7 @@ impl Payload {
let row_per_page = (u16::MAX as usize).min(MAX_PAGE_SIZE / tuple_size).max(1);

Self {
arena: arena.clone(),
arenas: vec![arena],
arena,
state_move_out: false,
pages: vec![],
current_write_page: 0,
Expand Down Expand Up @@ -235,14 +237,14 @@ impl Payload {
for idx in select_vector.iter().take(new_group_rows).copied() {
unsafe {
let dst = address[idx].add(write_offset);
store(val, dst as *mut u8);
store::<u8>(&val, dst as *mut u8);
}
}
} else {
for idx in select_vector.iter().take(new_group_rows).copied() {
unsafe {
let dst = address[idx].add(write_offset);
store(bitmap.get_bit(idx) as u8, dst as *mut u8);
store::<u8>(&(bitmap.get_bit(idx) as u8), dst as *mut u8);
}
}
}
Expand Down Expand Up @@ -273,7 +275,7 @@ impl Payload {
for idx in select_vector.iter().take(new_group_rows).copied() {
unsafe {
let dst = address[idx].add(write_offset);
store(group_hashes[idx], dst as *mut u8);
store::<u64>(&group_hashes[idx], dst as *mut u8);
}
}

Expand All @@ -285,7 +287,7 @@ impl Payload {
let place = self.arena.alloc_layout(layout);
unsafe {
let dst = address[idx].add(write_offset);
store(place.as_ptr() as u64, dst as *mut u8);
store::<u64>(&(place.as_ptr() as u64), dst as *mut u8);
}

let place = StateAddr::from(place);
Expand Down Expand Up @@ -363,8 +365,7 @@ impl Payload {
for idx in 0..rows {
state.addresses[idx] = self.data_ptr(page, idx + state.flush_page_row);

let hash =
unsafe { core::ptr::read::<u64>(state.addresses[idx].add(self.hash_offset) as _) };
let hash = unsafe { read::<u64>(state.addresses[idx].add(self.hash_offset) as _) };

let partition_idx = (hash % mods) as usize;

Expand All @@ -375,6 +376,20 @@ impl Payload {
state.flush_page_row = end;
true
}

pub fn empty_block(&self) -> DataBlock {
let columns = self
.aggrs
.iter()
.map(|f| ColumnBuilder::with_capacity(&f.return_type().unwrap(), 0).build())
.chain(
self.group_types
.iter()
.map(|t| ColumnBuilder::with_capacity(t, 0).build()),
)
.collect_vec();
DataBlock::new_from_columns(columns)
}
}

impl Drop for Payload {
Expand All @@ -386,7 +401,7 @@ impl Drop for Payload {
for page in self.pages.iter() {
for row in 0..page.rows {
unsafe {
let state_place = StateAddr::new(core::ptr::read::<u64>(
let state_place = StateAddr::new(read::<u64>(
self.data_ptr(page, row).add(self.state_offset) as _,
)
as usize);
Expand Down
Loading

0 comments on commit 6017ebf

Please sign in to comment.