diff --git a/src/query/expression/src/block.rs b/src/query/expression/src/block.rs index d3a11cd01116..7911dd33d14c 100644 --- a/src/query/expression/src/block.rs +++ b/src/query/expression/src/block.rs @@ -133,9 +133,10 @@ impl DataBlock { c.check_valid()?; if c.len() != num_rows { return Err(ErrorCode::Internal(format!( - "DataBlock corrupted, column length mismatch, col: {}, num_rows: {}", + "DataBlock corrupted, column length mismatch, col rows: {}, num_rows: {}, datatype: {}", c.len(), - num_rows + num_rows, + c.data_type() ))); } } diff --git a/src/query/expression/src/kernels/concat.rs b/src/query/expression/src/kernels/concat.rs index 92de1d47286d..957fb93e6180 100644 --- a/src/query/expression/src/kernels/concat.rs +++ b/src/query/expression/src/kernels/concat.rs @@ -21,6 +21,7 @@ use databend_common_exception::ErrorCode; use databend_common_exception::Result; use itertools::Itertools; +use crate::copy_continuous_bits; use crate::kernels::take::BIT_MASK; use crate::kernels::utils::copy_advance_aligned; use crate::kernels::utils::set_vec_len_by_ptr; @@ -462,7 +463,7 @@ impl Column { } let remaining = len - idx; if remaining > 0 { - let (cur_buf, cur_unset_bits) = Self::copy_continuous_bits( + let (cur_buf, cur_unset_bits) = copy_continuous_bits( &mut builder_ptr, bitmap_slice, builder_idx, diff --git a/src/query/expression/src/kernels/filter.rs b/src/query/expression/src/kernels/filter.rs index edebecee1310..98916fbeb66b 100644 --- a/src/query/expression/src/kernels/filter.rs +++ b/src/query/expression/src/kernels/filter.rs @@ -20,27 +20,16 @@ use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_arrow::arrow::buffer::Buffer; use databend_common_exception::Result; +use crate::copy_continuous_bits; use crate::kernels::take::BIT_MASK; use crate::kernels::utils::copy_advance_aligned; use crate::kernels::utils::set_vec_len_by_ptr; use crate::kernels::utils::store_advance_aligned; -use crate::kernels::utils::BitChunks; -use crate::types::array::ArrayColumn; -use crate::types::array::ArrayColumnBuilder; use crate::types::binary::BinaryColumn; -use crate::types::decimal::DecimalColumn; -use crate::types::geography::GeographyColumn; -use crate::types::map::KvColumnBuilder; use crate::types::nullable::NullableColumn; -use crate::types::number::NumberColumn; use crate::types::string::StringColumn; -use crate::types::AnyType; -use crate::types::ArrayType; -use crate::types::BooleanType; -use crate::types::MapType; -use crate::types::ValueType; -use crate::with_decimal_type; -use crate::with_number_type; +use crate::types::*; +use crate::visitor::ValueVisitor; use crate::BlockEntry; use crate::Column; use crate::ColumnBuilder; @@ -60,18 +49,27 @@ impl DataBlock { if count_zeros == self.num_rows() { return Ok(self.slice(0..0)); } + + let mut filter_visitor = FilterVisitor::new(bitmap); + let after_columns = self .columns() .iter() - .map(|entry| match &entry.value { - Value::Column(c) => { - let value = Value::Column(Column::filter(c, bitmap)); - BlockEntry::new(entry.data_type.clone(), value) - } - _ => entry.clone(), + .map(|entry| { + filter_visitor.visit_value(entry.value.clone())?; + let result = filter_visitor.result.take().unwrap(); + Ok(BlockEntry { + value: result, + data_type: entry.data_type.clone(), + }) }) - .collect(); - Ok(DataBlock::new(after_columns, self.num_rows() - count_zeros)) + .collect::>>()?; + + Ok(DataBlock::new_with_meta( + after_columns, + self.num_rows() - count_zeros, + self.get_meta().cloned(), + )) } } } @@ -95,127 +93,97 @@ impl DataBlock { } impl Column { - pub fn filter(&self, filter: &Bitmap) -> Column { - let length = filter.len() - filter.unset_bits(); - if length == self.len() { - return self.clone(); + pub fn filter(&self, bitmap: &Bitmap) -> Column { + let mut filter_visitor = FilterVisitor::new(bitmap); + filter_visitor + .visit_value(Value::Column(self.clone())) + .unwrap(); + filter_visitor + .result + .take() + .unwrap() + .as_column() + .unwrap() + .clone() + } +} + +struct FilterVisitor<'a> { + filter: &'a Bitmap, + result: Option>, + num_rows: usize, +} + +impl<'a> FilterVisitor<'a> { + pub fn new(filter: &'a Bitmap) -> Self { + let num_rows = filter.len() - filter.unset_bits(); + Self { + filter, + result: None, + num_rows, } + } +} - match self { - Column::Null { .. } => Column::Null { len: length }, - Column::EmptyArray { .. } => Column::EmptyArray { len: length }, - Column::EmptyMap { .. } => Column::EmptyMap { len: length }, - Column::Number(column) => with_number_type!(|NUM_TYPE| match column { - NumberColumn::NUM_TYPE(values) => { - Column::Number(NumberColumn::NUM_TYPE(Self::filter_primitive_types( - values, filter, - ))) - } - }), - Column::Decimal(column) => with_decimal_type!(|DECIMAL_TYPE| match column { - DecimalColumn::DECIMAL_TYPE(values, size) => { - Column::Decimal(DecimalColumn::DECIMAL_TYPE( - Self::filter_primitive_types(values, filter), - *size, - )) +impl<'a> ValueVisitor for FilterVisitor<'a> { + fn visit_value(&mut self, value: Value) -> Result<()> { + match value { + Value::Scalar(c) => self.visit_scalar(c), + Value::Column(c) => { + if c.len() == self.num_rows || c.len() == 0 { + self.result = Some(Value::Column(c)); + } else if self.num_rows == 0 { + self.result = Some(Value::Column(c.slice(0..1))); + } else { + self.visit_column(c)?; } - }), - Column::Boolean(bm) => { - let column = Self::filter_boolean_types(bm, filter); - Column::Boolean(column) - } - Column::Binary(column) => { - let column = Self::filter_binary_scalars(column, filter); - Column::Binary(column) - } - Column::String(column) => { - let column = Self::filter_string_scalars(column, filter); - Column::String(column) - } - Column::Timestamp(column) => { - let ts = Self::filter_primitive_types(column, filter); - Column::Timestamp(ts) - } - Column::Date(column) => { - let d = Self::filter_primitive_types(column, filter); - Column::Date(d) - } - Column::Array(column) => { - let mut offsets = Vec::with_capacity(length + 1); - offsets.push(0); - let builder = ColumnBuilder::with_capacity(&column.values.data_type(), length); - let builder = ArrayColumnBuilder { builder, offsets }; - Self::filter_scalar_types::>(column, builder, filter) - } - Column::Map(column) => { - let mut offsets = Vec::with_capacity(length + 1); - offsets.push(0); - let builder = ColumnBuilder::from_column( - ColumnBuilder::with_capacity(&column.values.data_type(), length).build(), - ); - let (key_builder, val_builder) = match builder { - ColumnBuilder::Tuple(fields) => (fields[0].clone(), fields[1].clone()), - _ => unreachable!(), - }; - let builder = KvColumnBuilder { - keys: key_builder, - values: val_builder, - }; - let builder = ArrayColumnBuilder { builder, offsets }; - let column = ArrayColumn::try_downcast(column).unwrap(); - Self::filter_scalar_types::>(&column, builder, filter) - } - Column::Bitmap(column) => { - let column = Self::filter_binary_scalars(column, filter); - Column::Bitmap(column) - } - - Column::Nullable(c) => { - let column = Self::filter(&c.column, filter); - let validity = Self::filter_boolean_types(&c.validity, filter); - NullableColumn::new_column(column, validity) - } - Column::Tuple(fields) => { - let fields = fields.iter().map(|c| c.filter(filter)).collect(); - Column::Tuple(fields) - } - Column::Variant(column) => { - let column = Self::filter_binary_scalars(column, filter); - Column::Variant(column) - } - Column::Geometry(column) => { - let column = Self::filter_binary_scalars(column, filter); - Column::Geometry(column) - } - Column::Geography(column) => { - let column = Self::filter_binary_scalars(&column.0, filter); - Column::Geography(GeographyColumn(column)) + Ok(()) } } } - fn filter_scalar_types( - col: &T::Column, - mut builder: T::ColumnBuilder, - filter: &Bitmap, - ) -> Column { + fn visit_scalar(&mut self, scalar: crate::Scalar) -> Result<()> { + self.result = Some(Value::Scalar(scalar)); + Ok(()) + } + + fn visit_nullable(&mut self, column: Box>) -> Result<()> { + self.visit_boolean(column.validity.clone())?; + let validity = + BooleanType::try_downcast_column(self.result.take().unwrap().as_column().unwrap()) + .unwrap(); + + self.visit_column(column.column)?; + let result = self.result.take().unwrap(); + let result = result.as_column().unwrap(); + self.result = Some(Value::Column(NullableColumn::new_column( + result.clone(), + validity, + ))); + Ok(()) + } + + fn visit_typed_column(&mut self, column: ::Column) -> Result<()> { + let c = T::upcast_column(column.clone()); + let builder = ColumnBuilder::with_capacity(&c.data_type(), c.len()); + let mut builder = T::try_downcast_owned_builder(builder).unwrap(); + const CHUNK_SIZE: usize = 64; - let (mut slice, offset, mut length) = filter.as_slice(); + let (mut slice, offset, mut length) = self.filter.as_slice(); let mut start_index: usize = 0; - if offset > 0 { // If `offset` > 0, the valid bits of this byte start at `offset`, and the // max num of valid bits is `8 - offset`, but we also need to ensure that // we cannot iterate more than `length` bits. let n = std::cmp::min(8 - offset, length); start_index += n; - filter + self.filter .iter() .enumerate() .take(n) .for_each(|(index, is_selected)| { if is_selected { - T::push_item(&mut builder, T::index_column(col, index).unwrap()); + T::push_item(&mut builder, T::index_column(&column, index).unwrap()); } }); slice = &slice[1..]; @@ -231,7 +199,7 @@ impl Column { while mask != 0 { let n = mask.trailing_zeros() as usize; let index = mask_index * CHUNK_SIZE + n + start_index; - T::push_item(&mut builder, T::index_column(col, index).unwrap()); + T::push_item(&mut builder, T::index_column(&column, index).unwrap()); mask = mask & (mask - 1); } }); @@ -243,27 +211,242 @@ impl Column { .for_each(|(mask_index, is_selected)| { if is_selected { let index = mask_index + remainder_start + start_index; - T::push_item(&mut builder, T::index_column(col, index).unwrap()); + T::push_item(&mut builder, T::index_column(&column, index).unwrap()); } }); - T::upcast_column(T::build_column(builder)) + self.result = Some(Value::Column(T::upcast_column(T::build_column(builder)))); + Ok(()) } - /// low-level API using unsafe to improve performance. - fn filter_primitive_types(values: &Buffer, filter: &Bitmap) -> Buffer { - debug_assert_eq!(values.len(), filter.len()); - let num_rows = filter.len() - filter.unset_bits(); - if num_rows == values.len() { - return values.clone(); - } else if num_rows == 0 { - return vec![].into(); + fn visit_number( + &mut self, + buffer: as ValueType>::Column, + ) -> Result<()> { + self.result = Some(Value::Column(NumberType::::upcast_column( + self.filter_primitive_types(buffer), + ))); + Ok(()) + } + + fn visit_timestamp(&mut self, buffer: Buffer) -> Result<()> { + self.result = Some(Value::Column(TimestampType::upcast_column( + self.filter_primitive_types(buffer), + ))); + Ok(()) + } + + fn visit_date(&mut self, buffer: Buffer) -> Result<()> { + self.result = Some(Value::Column(DateType::upcast_column( + self.filter_primitive_types(buffer), + ))); + Ok(()) + } + + fn visit_decimal( + &mut self, + buffer: Buffer, + size: DecimalSize, + ) -> Result<()> { + self.result = Some(Value::Column(T::upcast_column( + self.filter_primitive_types(buffer), + size, + ))); + Ok(()) + } + + fn visit_boolean(&mut self, bitmap: Bitmap) -> Result<()> { + let capacity = self.num_rows.saturating_add(7) / 8; + let mut builder: Vec = Vec::with_capacity(capacity); + let mut builder_ptr = builder.as_mut_ptr(); + let mut builder_idx = 0; + let mut unset_bits = 0; + let mut buf = 0; + + let (bitmap_slice, bitmap_offset, _) = bitmap.as_slice(); + let mut bitmap_idx = 0; + + let (mut filter_slice, filter_offset, mut filter_length) = self.filter.as_slice(); + unsafe { + if filter_offset > 0 { + let mut mask = filter_slice[0]; + while mask != 0 { + let n = mask.trailing_zeros() as usize; + // If `filter_length` > 0, the valid bits of this byte start at `filter_offset`, we also + // need to ensure that we cannot iterate more than `filter_length` bits. + if n >= filter_offset && n < filter_offset + filter_length { + if bitmap.get_bit_unchecked(n - filter_offset) { + buf |= BIT_MASK[builder_idx % 8]; + } else { + unset_bits += 1; + } + builder_idx += 1; + } + mask = mask & (mask - 1); + } + let bits_to_align = 8 - filter_offset; + filter_length = if filter_length >= bits_to_align { + filter_length - bits_to_align + } else { + 0 + }; + filter_slice = &filter_slice[1..]; + bitmap_idx += bits_to_align; + } + + const CHUNK_SIZE: usize = 64; + let mut mask_chunks = BitChunksExact::::new(filter_slice, filter_length); + let mut continuous_selected = 0; + for mut mask in mask_chunks.by_ref() { + if mask == u64::MAX { + continuous_selected += CHUNK_SIZE; + } else { + if continuous_selected > 0 { + if builder_idx % 8 != 0 { + while continuous_selected > 0 { + if bitmap.get_bit_unchecked(bitmap_idx) { + buf |= BIT_MASK[builder_idx % 8]; + } else { + unset_bits += 1; + } + bitmap_idx += 1; + builder_idx += 1; + continuous_selected -= 1; + if builder_idx % 8 == 0 { + store_advance_aligned(buf, &mut builder_ptr); + buf = 0; + break; + } + } + } + + if continuous_selected > 0 { + let (cur_buf, cur_unset_bits) = copy_continuous_bits( + &mut builder_ptr, + bitmap_slice, + builder_idx, + bitmap_idx + bitmap_offset, + continuous_selected, + ); + builder_idx += continuous_selected; + bitmap_idx += continuous_selected; + unset_bits += cur_unset_bits; + buf = cur_buf; + continuous_selected = 0; + } + } + + while mask != 0 { + let n = mask.trailing_zeros() as usize; + if bitmap.get_bit_unchecked(bitmap_idx + n) { + buf |= BIT_MASK[builder_idx % 8]; + } else { + unset_bits += 1; + } + builder_idx += 1; + if builder_idx % 8 == 0 { + store_advance_aligned(buf, &mut builder_ptr); + buf = 0; + } + mask = mask & (mask - 1); + } + bitmap_idx += CHUNK_SIZE; + } + } + + if continuous_selected > 0 { + if builder_idx % 8 != 0 { + while continuous_selected > 0 { + if bitmap.get_bit_unchecked(bitmap_idx) { + buf |= BIT_MASK[builder_idx % 8]; + } else { + unset_bits += 1; + } + bitmap_idx += 1; + builder_idx += 1; + continuous_selected -= 1; + if builder_idx % 8 == 0 { + store_advance_aligned(buf, &mut builder_ptr); + buf = 0; + break; + } + } + } + + if continuous_selected > 0 { + let (cur_buf, cur_unset_bits) = copy_continuous_bits( + &mut builder_ptr, + bitmap_slice, + builder_idx, + bitmap_idx + bitmap_offset, + continuous_selected, + ); + builder_idx += continuous_selected; + bitmap_idx += continuous_selected; + unset_bits += cur_unset_bits; + buf = cur_buf; + } + } + + for (i, is_selected) in mask_chunks.remainder_iter().enumerate() { + if is_selected { + if bitmap.get_bit_unchecked(bitmap_idx + i) { + buf |= BIT_MASK[builder_idx % 8]; + } else { + unset_bits += 1; + } + builder_idx += 1; + if builder_idx % 8 == 0 { + store_advance_aligned(buf, &mut builder_ptr); + buf = 0; + } + } + } + + if builder_idx % 8 != 0 { + store_advance_aligned(buf, &mut builder_ptr); + } } - let mut builder: Vec = Vec::with_capacity(num_rows); + let bitmap = unsafe { + set_vec_len_by_ptr(&mut builder, builder_ptr); + Bitmap::from_inner(Arc::new(builder.into()), 0, self.num_rows, unset_bits) + .ok() + .unwrap() + }; + self.result = Some(Value::Column(BooleanType::upcast_column(bitmap))); + return Ok(()); + } + + fn visit_binary(&mut self, col: BinaryColumn) -> Result<()> { + self.result = Some(Value::Column(BinaryType::upcast_column( + self.filter_binary_types(&col), + ))); + Ok(()) + } + + fn visit_string(&mut self, column: StringColumn) -> Result<()> { + let column: BinaryColumn = column.into(); + self.result = Some(Value::Column(StringType::upcast_column(unsafe { + StringColumn::from_binary_unchecked(self.filter_binary_types(&column)) + }))); + Ok(()) + } + + fn visit_variant(&mut self, column: BinaryColumn) -> Result<()> { + self.result = Some(Value::Column(VariantType::upcast_column( + self.filter_binary_types(&column), + ))); + Ok(()) + } +} + +impl<'a> FilterVisitor<'a> { + fn filter_primitive_types(&mut self, buffer: Buffer) -> Buffer { + let mut builder: Vec = Vec::with_capacity(self.num_rows); let mut ptr = builder.as_mut_ptr(); - let mut values_ptr = values.as_slice().as_ptr(); - let (mut slice, offset, mut length) = filter.as_slice(); + let mut values_ptr = buffer.as_slice().as_ptr(); + let (mut slice, offset, mut length) = self.filter.as_slice(); unsafe { if offset > 0 { @@ -324,23 +507,14 @@ impl Column { builder.into() } - /// low-level API using unsafe to improve performance. - fn filter_binary_scalars(values: &BinaryColumn, filter: &Bitmap) -> BinaryColumn { - debug_assert_eq!(values.len(), filter.len()); - let num_rows = filter.len() - filter.unset_bits(); - if num_rows == values.len() { - return values.clone(); - } else if num_rows == 0 { - return BinaryColumn::new(vec![].into(), vec![0].into()); - } - + fn filter_binary_types(&mut self, values: &BinaryColumn) -> BinaryColumn { // Each element of `items` is (string pointer(u64), string length). - let mut items: Vec<(u64, usize)> = Vec::with_capacity(num_rows); + let mut items: Vec<(u64, usize)> = Vec::with_capacity(self.num_rows); // [`BinaryColumn`] consists of [`data`] and [`offset`], we build [`data`] and [`offset`] respectively, // and then call `BinaryColumn::new(data.into(), offsets.into())` to create [`BinaryColumn`]. let values_offset = values.offsets().as_slice(); let values_data_ptr = values.data().as_slice().as_ptr(); - let mut offsets: Vec = Vec::with_capacity(num_rows + 1); + let mut offsets: Vec = Vec::with_capacity(self.num_rows + 1); let mut offsets_ptr = offsets.as_mut_ptr(); let mut items_ptr = items.as_mut_ptr(); let mut data_size = 0; @@ -349,7 +523,7 @@ impl Column { unsafe { store_advance_aligned::(0, &mut offsets_ptr); let mut idx = 0; - let (mut slice, offset, mut length) = filter.as_slice(); + let (mut slice, offset, mut length) = self.filter.as_slice(); if offset > 0 { let mut mask = slice[0]; while mask != 0 { @@ -454,230 +628,4 @@ impl Column { BinaryColumn::new(data.into(), offsets.into()) } - - fn filter_string_scalars(values: &StringColumn, filter: &Bitmap) -> StringColumn { - unsafe { - StringColumn::from_binary_unchecked(Self::filter_binary_scalars( - &values.clone().into(), - filter, - )) - } - } - - /// # Safety - /// * `src` + `src_idx`(in bits) must be [valid] for reads of `len` bits. - /// * `ptr` must be [valid] for writes of `len` bits. - pub unsafe fn copy_continuous_bits( - ptr: &mut *mut u8, - src: &[u8], - mut dst_idx: usize, - mut src_idx: usize, - len: usize, - ) -> (u8, usize) { - let mut unset_bits = 0; - let chunks = BitChunks::new(src, src_idx, len); - chunks.iter().for_each(|chunk| { - unset_bits += chunk.count_zeros(); - copy_advance_aligned(&chunk as *const _ as *const u8, ptr, 8); - }); - - let mut remainder = chunks.remainder_len(); - dst_idx += len - remainder; - src_idx += len - remainder; - - let mut buf = 0; - while remainder > 0 { - if (*src.as_ptr().add(src_idx >> 3) & BIT_MASK[src_idx & 7]) != 0 { - buf |= BIT_MASK[dst_idx % 8]; - } else { - unset_bits += 1; - } - src_idx += 1; - dst_idx += 1; - remainder -= 1; - if dst_idx % 8 == 0 { - store_advance_aligned(buf, ptr); - buf = 0; - } - } - (buf, unset_bits as usize) - } - - /// low-level API using unsafe to improve performance. - fn filter_boolean_types(bitmap: &Bitmap, filter: &Bitmap) -> Bitmap { - debug_assert_eq!(bitmap.len(), filter.len()); - let num_rows = filter.len() - filter.unset_bits(); - if num_rows == bitmap.len() { - return bitmap.clone(); - } else if num_rows == 0 { - return Bitmap::new(); - } - // Fast path. - if num_rows <= bitmap.len() - && (bitmap.unset_bits() == 0 || bitmap.unset_bits() == bitmap.len()) - { - let mut bitmap = bitmap.clone(); - bitmap.slice(0, num_rows); - return bitmap; - } - - let capacity = num_rows.saturating_add(7) / 8; - let mut builder: Vec = Vec::with_capacity(capacity); - let mut builder_ptr = builder.as_mut_ptr(); - let mut builder_idx = 0; - let mut unset_bits = 0; - let mut buf = 0; - - let (bitmap_slice, bitmap_offset, _) = bitmap.as_slice(); - let mut bitmap_idx = 0; - - let (mut filter_slice, filter_offset, mut filter_length) = filter.as_slice(); - unsafe { - if filter_offset > 0 { - let mut mask = filter_slice[0]; - while mask != 0 { - let n = mask.trailing_zeros() as usize; - // If `filter_length` > 0, the valid bits of this byte start at `filter_offset`, we also - // need to ensure that we cannot iterate more than `filter_length` bits. - if n >= filter_offset && n < filter_offset + filter_length { - if bitmap.get_bit_unchecked(n - filter_offset) { - buf |= BIT_MASK[builder_idx % 8]; - } else { - unset_bits += 1; - } - builder_idx += 1; - } - mask = mask & (mask - 1); - } - let bits_to_align = 8 - filter_offset; - filter_length = if filter_length >= bits_to_align { - filter_length - bits_to_align - } else { - 0 - }; - filter_slice = &filter_slice[1..]; - bitmap_idx += bits_to_align; - } - - const CHUNK_SIZE: usize = 64; - let mut mask_chunks = BitChunksExact::::new(filter_slice, filter_length); - let mut continuous_selected = 0; - for mut mask in mask_chunks.by_ref() { - if mask == u64::MAX { - continuous_selected += CHUNK_SIZE; - } else { - if continuous_selected > 0 { - if builder_idx % 8 != 0 { - while continuous_selected > 0 { - if bitmap.get_bit_unchecked(bitmap_idx) { - buf |= BIT_MASK[builder_idx % 8]; - } else { - unset_bits += 1; - } - bitmap_idx += 1; - builder_idx += 1; - continuous_selected -= 1; - if builder_idx % 8 == 0 { - store_advance_aligned(buf, &mut builder_ptr); - buf = 0; - break; - } - } - } - - if continuous_selected > 0 { - let (cur_buf, cur_unset_bits) = Self::copy_continuous_bits( - &mut builder_ptr, - bitmap_slice, - builder_idx, - bitmap_idx + bitmap_offset, - continuous_selected, - ); - builder_idx += continuous_selected; - bitmap_idx += continuous_selected; - unset_bits += cur_unset_bits; - buf = cur_buf; - continuous_selected = 0; - } - } - - while mask != 0 { - let n = mask.trailing_zeros() as usize; - if bitmap.get_bit_unchecked(bitmap_idx + n) { - buf |= BIT_MASK[builder_idx % 8]; - } else { - unset_bits += 1; - } - builder_idx += 1; - if builder_idx % 8 == 0 { - store_advance_aligned(buf, &mut builder_ptr); - buf = 0; - } - mask = mask & (mask - 1); - } - bitmap_idx += CHUNK_SIZE; - } - } - - if continuous_selected > 0 { - if builder_idx % 8 != 0 { - while continuous_selected > 0 { - if bitmap.get_bit_unchecked(bitmap_idx) { - buf |= BIT_MASK[builder_idx % 8]; - } else { - unset_bits += 1; - } - bitmap_idx += 1; - builder_idx += 1; - continuous_selected -= 1; - if builder_idx % 8 == 0 { - store_advance_aligned(buf, &mut builder_ptr); - buf = 0; - break; - } - } - } - - if continuous_selected > 0 { - let (cur_buf, cur_unset_bits) = Self::copy_continuous_bits( - &mut builder_ptr, - bitmap_slice, - builder_idx, - bitmap_idx + bitmap_offset, - continuous_selected, - ); - builder_idx += continuous_selected; - bitmap_idx += continuous_selected; - unset_bits += cur_unset_bits; - buf = cur_buf; - } - } - - for (i, is_selected) in mask_chunks.remainder_iter().enumerate() { - if is_selected { - if bitmap.get_bit_unchecked(bitmap_idx + i) { - buf |= BIT_MASK[builder_idx % 8]; - } else { - unset_bits += 1; - } - builder_idx += 1; - if builder_idx % 8 == 0 { - store_advance_aligned(buf, &mut builder_ptr); - buf = 0; - } - } - } - - if builder_idx % 8 != 0 { - store_advance_aligned(buf, &mut builder_ptr); - } - } - - unsafe { - set_vec_len_by_ptr(&mut builder, builder_ptr); - Bitmap::from_inner(Arc::new(builder.into()), 0, num_rows, unset_bits) - .ok() - .unwrap() - } - } } diff --git a/src/query/expression/src/kernels/sort.rs b/src/query/expression/src/kernels/sort.rs index b4cb346a4917..9c4c478834f4 100644 --- a/src/query/expression/src/kernels/sort.rs +++ b/src/query/expression/src/kernels/sort.rs @@ -12,26 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::cmp::Ordering; use std::sync::Arc; -use databend_common_arrow::arrow::array::ord as arrow_ord; -use databend_common_arrow::arrow::array::ord::DynComparator; -use databend_common_arrow::arrow::array::Array; -use databend_common_arrow::arrow::array::PrimitiveArray; -use databend_common_arrow::arrow::compute::sort as arrow_sort; -use databend_common_arrow::arrow::datatypes::DataType as ArrowType; -use databend_common_arrow::arrow::error::Error as ArrowError; -use databend_common_arrow::arrow::error::Result as ArrowResult; use databend_common_exception::Result; -use crate::converts::arrow2::ARROW_EXT_TYPE_EMPTY_ARRAY; -use crate::converts::arrow2::ARROW_EXT_TYPE_EMPTY_MAP; -use crate::converts::arrow2::ARROW_EXT_TYPE_VARIANT; use crate::types::DataType; -use crate::utils::arrow::column_to_arrow_array; use crate::visitor::ValueVisitor; -use crate::Column; use crate::ColumnBuilder; use crate::DataBlock; use crate::Scalar; @@ -131,97 +117,6 @@ impl DataBlock { let permutations = sort_compare.take_permutation(); DataBlock::take(block, &permutations, &mut None) } - - // TODO remove these - #[allow(dead_code)] - pub fn sort_old( - block: &DataBlock, - descriptions: &[SortColumnDescription], - limit: Option, - ) -> Result { - let num_rows = block.num_rows(); - if num_rows <= 1 { - return Ok(block.clone()); - } - let order_columns = descriptions - .iter() - .map(|d| column_to_arrow_array(block.get_by_offset(d.offset), num_rows)) - .collect::>(); - - let order_arrays = descriptions - .iter() - .zip(order_columns.iter()) - .map(|(d, array)| arrow_sort::SortColumn { - values: array.as_ref(), - options: Some(arrow_sort::SortOptions { - descending: !d.asc, - nulls_first: d.nulls_first, - }), - }) - .collect::>(); - - let indices: PrimitiveArray = - arrow_sort::lexsort_to_indices_impl(&order_arrays, limit, &build_compare)?; - DataBlock::take(block, indices.values(), &mut None) - } -} - -fn compare_variant(left: &dyn Array, right: &dyn Array) -> ArrowResult { - let left = Column::from_arrow(left, &DataType::Variant) - .unwrap() - .as_variant() - .cloned() - .unwrap(); - let right = Column::from_arrow(right, &DataType::Variant) - .unwrap() - .as_variant() - .cloned() - .unwrap(); - Ok(Box::new(move |i, j| { - let l = unsafe { left.index_unchecked(i) }; - let r = unsafe { right.index_unchecked(j) }; - jsonb::compare(l, r).unwrap() - })) -} - -fn compare_null() -> ArrowResult { - Ok(Box::new(move |_, _| Ordering::Equal)) -} - -fn compare_decimal256(left: &dyn Array, right: &dyn Array) -> ArrowResult { - let left = left - .as_any() - .downcast_ref::>() - .unwrap() - .clone(); - let right = right - .as_any() - .downcast_ref::>() - .unwrap() - .clone(); - - Ok(Box::new(move |i, j| left.value(i).cmp(&right.value(j)))) -} - -fn build_compare(left: &dyn Array, right: &dyn Array) -> ArrowResult { - assert_eq!(left.data_type(), right.data_type()); - match left.data_type() { - ArrowType::Extension(name, _, _) => match name.as_str() { - ARROW_EXT_TYPE_VARIANT => compare_variant(left, right), - ARROW_EXT_TYPE_EMPTY_ARRAY | ARROW_EXT_TYPE_EMPTY_MAP => compare_null(), - _ => Err(ArrowError::NotYetImplemented(format!( - "Sort not supported for data type {:?}", - left.data_type() - ))), - }, - ArrowType::Null => compare_null(), - ArrowType::Decimal256(_, _) => compare_decimal256(left, right), - _ => arrow_ord::build_compare(left, right), - } } pub fn compare_scalars(rows: Vec>, data_types: &[DataType]) -> Result> { diff --git a/src/query/expression/src/kernels/take.rs b/src/query/expression/src/kernels/take.rs index 5ed4a9749a44..84d120b0390c 100644 --- a/src/query/expression/src/kernels/take.rs +++ b/src/query/expression/src/kernels/take.rs @@ -20,32 +20,12 @@ use databend_common_exception::Result; use crate::kernels::utils::copy_advance_aligned; use crate::kernels::utils::set_vec_len_by_ptr; -use crate::types::array::ArrayColumn; -use crate::types::array::ArrayColumnBuilder; use crate::types::binary::BinaryColumn; -use crate::types::bitmap::BitmapType; -use crate::types::decimal::DecimalColumn; -use crate::types::geography::GeographyColumn; -use crate::types::geometry::GeometryType; -use crate::types::map::KvColumnBuilder; use crate::types::nullable::NullableColumn; -use crate::types::number::NumberColumn; use crate::types::string::StringColumn; -use crate::types::AnyType; -use crate::types::ArgType; -use crate::types::ArrayType; -use crate::types::BinaryType; -use crate::types::BooleanType; -use crate::types::GeographyType; -use crate::types::MapType; -use crate::types::NumberType; -use crate::types::StringType; -use crate::types::ValueType; -use crate::types::VariantType; -use crate::with_decimal_type; -use crate::with_number_mapped_type; +use crate::types::*; +use crate::visitor::ValueVisitor; use crate::BlockEntry; -use crate::Column; use crate::ColumnBuilder; use crate::DataBlock; use crate::Value; @@ -65,19 +45,20 @@ impl DataBlock { return Ok(self.slice(0..0)); } + let mut taker = TakeVisitor::new(indices, string_items_buf); + let after_columns = self .columns() .iter() - .map(|entry| match &entry.value { - Value::Scalar(s) => { - BlockEntry::new(entry.data_type.clone(), Value::Scalar(s.clone())) - } - Value::Column(c) => BlockEntry::new( - entry.data_type.clone(), - Value::Column(Column::take(c, indices, string_items_buf)), - ), + .map(|entry| { + taker.visit_value(entry.value.clone())?; + let result = taker.result.take().unwrap(); + Ok(BlockEntry { + value: result, + data_type: entry.data_type.clone(), + }) }) - .collect(); + .collect::>>()?; Ok(DataBlock::new_with_meta( after_columns, @@ -87,223 +68,110 @@ impl DataBlock { } } -impl Column { - pub fn take(&self, indices: &[I], string_items_buf: &mut Option>) -> Self - where I: databend_common_arrow::arrow::types::Index { - match self { - Column::Null { .. } => Column::Null { len: indices.len() }, - Column::EmptyArray { .. } => Column::EmptyArray { len: indices.len() }, - Column::EmptyMap { .. } => Column::EmptyMap { len: indices.len() }, - Column::Number(column) => with_number_mapped_type!(|NUM_TYPE| match column { - NumberColumn::NUM_TYPE(values) => { - let builder = Self::take_primitive_types(values, indices); - >::upcast_column(>::column_from_vec( - builder, - &[], - )) - } - }), - Column::Decimal(column) => with_decimal_type!(|DECIMAL_TYPE| match column { - DecimalColumn::DECIMAL_TYPE(values, size) => { - let builder = Self::take_primitive_types(values, indices); - Column::Decimal(DecimalColumn::DECIMAL_TYPE(builder.into(), *size)) - } - }), - Column::Boolean(bm) => Column::Boolean(Self::take_boolean_types(bm, indices)), - Column::Binary(column) => BinaryType::upcast_column(Self::take_binary_types( - column, - indices, - string_items_buf.as_mut(), - )), - Column::String(column) => StringType::upcast_column(Self::take_string_types( - column, - indices, - string_items_buf.as_mut(), - )), - Column::Timestamp(column) => { - let builder = Self::take_primitive_types(column, indices); - let ts = >::upcast_column(>::column_from_vec( - builder, - &[], - )) - .into_number() - .unwrap() - .into_int64() - .unwrap(); - Column::Timestamp(ts) - } - Column::Date(column) => { - let builder = Self::take_primitive_types(column, indices); - let d = >::upcast_column(>::column_from_vec( - builder, - &[], - )) - .into_number() - .unwrap() - .into_int32() - .unwrap(); - Column::Date(d) - } - Column::Array(column) => { - let mut offsets = Vec::with_capacity(indices.len() + 1); - offsets.push(0); - let builder = ColumnBuilder::with_capacity(&column.values.data_type(), self.len()); - let builder = ArrayColumnBuilder { builder, offsets }; - Self::take_value_types::, _>(column, builder, indices) - } - Column::Map(column) => { - let mut offsets = Vec::with_capacity(indices.len() + 1); - offsets.push(0); - let builder = ColumnBuilder::from_column( - ColumnBuilder::with_capacity(&column.values.data_type(), self.len()).build(), - ); - let (key_builder, val_builder) = match builder { - ColumnBuilder::Tuple(fields) => (fields[0].clone(), fields[1].clone()), - _ => unreachable!(), - }; - let builder = KvColumnBuilder { - keys: key_builder, - values: val_builder, - }; - let builder = ArrayColumnBuilder { builder, offsets }; - let column = ArrayColumn::try_downcast(column).unwrap(); - Self::take_value_types::, _>(&column, builder, indices) - } - Column::Bitmap(column) => BitmapType::upcast_column(Self::take_binary_types( - column, - indices, - string_items_buf.as_mut(), - )), - Column::Nullable(c) => { - let column = c.column.take(indices, string_items_buf); - let validity = Column::Boolean(Self::take_boolean_types(&c.validity, indices)); - Column::Nullable(Box::new(NullableColumn::new( - column, - BooleanType::try_downcast_column(&validity).unwrap(), - ))) - } - Column::Tuple(fields) => { - let fields = fields - .iter() - .map(|c| c.take(indices, string_items_buf)) - .collect(); - Column::Tuple(fields) - } - Column::Variant(column) => VariantType::upcast_column(Self::take_binary_types( - column, - indices, - string_items_buf.as_mut(), - )), - Column::Geometry(column) => GeometryType::upcast_column(Self::take_binary_types( - column, - indices, - string_items_buf.as_mut(), - )), - Column::Geography(column) => GeographyType::upcast_column(GeographyColumn( - Self::take_binary_types(&column.0, indices, string_items_buf.as_mut()), - )), +struct TakeVisitor<'a, I> +where I: databend_common_arrow::arrow::types::Index +{ + indices: &'a [I], + string_items_buf: &'a mut Option>, + result: Option>, +} + +impl<'a, I> TakeVisitor<'a, I> +where I: databend_common_arrow::arrow::types::Index +{ + fn new(indices: &'a [I], string_items_buf: &'a mut Option>) -> Self { + Self { + indices, + string_items_buf, + result: None, } } +} - pub fn take_primitive_types(col: &Buffer, indices: &[I]) -> Vec - where - T: Copy, - I: databend_common_arrow::arrow::types::Index, - { - let num_rows = indices.len(); - let mut builder: Vec = Vec::with_capacity(num_rows); - let col = col.as_slice(); - builder.extend( - indices - .iter() - .map(|index| unsafe { *col.get_unchecked(index.to_usize()) }), - ); - builder +impl<'a, I> ValueVisitor for TakeVisitor<'a, I> +where I: databend_common_arrow::arrow::types::Index +{ + fn visit_scalar(&mut self, scalar: crate::Scalar) -> Result<()> { + self.result = Some(Value::Scalar(scalar)); + Ok(()) } - pub fn take_binary_types( - col: &BinaryColumn, - indices: &[I], - string_items_buf: Option<&mut Vec<(u64, usize)>>, - ) -> BinaryColumn - where - I: databend_common_arrow::arrow::types::Index, - { - let num_rows = indices.len(); + fn visit_nullable(&mut self, column: Box>) -> Result<()> { + self.visit_boolean(column.validity.clone())?; + let validity = + BooleanType::try_downcast_column(self.result.take().unwrap().as_column().unwrap()) + .unwrap(); - // Each element of `items` is (string pointer(u64), string length), if `string_items_buf` - // can be reused, we will not re-allocate memory. - let mut items: Option> = match &string_items_buf { - Some(string_items_buf) if string_items_buf.capacity() >= num_rows => None, - _ => Some(Vec::with_capacity(num_rows)), - }; - let items = match items.is_some() { - true => items.as_mut().unwrap(), - false => string_items_buf.unwrap(), - }; + self.visit_column(column.column)?; + let result = self.result.take().unwrap(); + let result = result.as_column().unwrap(); + self.result = Some(Value::Column(NullableColumn::new_column( + result.clone(), + validity, + ))); + Ok(()) + } - // [`BinaryColumn`] consists of [`data`] and [`offset`], we build [`data`] and [`offset`] respectively, - // and then call `BinaryColumn::new(data.into(), offsets.into())` to create [`BinaryColumn`]. - let col_offset = col.offsets().as_slice(); - let col_data_ptr = col.data().as_slice().as_ptr(); - let mut offsets: Vec = Vec::with_capacity(num_rows + 1); - let mut data_size = 0; + fn visit_typed_column(&mut self, column: ::Column) -> Result<()> { + let c = T::upcast_column(column.clone()); + let builder = ColumnBuilder::with_capacity(&c.data_type(), c.len()); + let mut builder = T::try_downcast_owned_builder(builder).unwrap(); - // Build [`offset`] and calculate `data_size` required by [`data`]. - unsafe { - items.set_len(num_rows); - offsets.set_len(num_rows + 1); - *offsets.get_unchecked_mut(0) = 0; - for (i, index) in indices.iter().enumerate() { - let start = *col_offset.get_unchecked(index.to_usize()) as usize; - let len = *col_offset.get_unchecked(index.to_usize() + 1) as usize - start; - data_size += len as u64; - *items.get_unchecked_mut(i) = (col_data_ptr.add(start) as u64, len); - *offsets.get_unchecked_mut(i + 1) = data_size; - } + for index in self.indices { + T::push_item(&mut builder, unsafe { + T::index_column_unchecked(&column, index.to_usize()) + }); } + self.result = Some(Value::Column(T::upcast_column(T::build_column(builder)))); + Ok(()) + } - // Build [`data`]. - let mut data: Vec = Vec::with_capacity(data_size as usize); - let mut data_ptr = data.as_mut_ptr(); + fn visit_number( + &mut self, + buffer: as ValueType>::Column, + ) -> Result<()> { + self.result = Some(Value::Column(NumberType::::upcast_column( + self.take_primitive_types(buffer), + ))); + Ok(()) + } - unsafe { - for (str_ptr, len) in items.iter() { - copy_advance_aligned(*str_ptr as *const u8, &mut data_ptr, *len); - } - set_vec_len_by_ptr(&mut data, data_ptr); - } + fn visit_timestamp(&mut self, buffer: Buffer) -> Result<()> { + self.result = Some(Value::Column(TimestampType::upcast_column( + self.take_primitive_types(buffer), + ))); + Ok(()) + } - BinaryColumn::new(data.into(), offsets.into()) + fn visit_date(&mut self, buffer: Buffer) -> Result<()> { + self.result = Some(Value::Column(DateType::upcast_column( + self.take_primitive_types(buffer), + ))); + Ok(()) } - pub fn take_string_types( - col: &StringColumn, - indices: &[I], - string_items_buf: Option<&mut Vec<(u64, usize)>>, - ) -> StringColumn - where - I: databend_common_arrow::arrow::types::Index, - { - unsafe { - StringColumn::from_binary_unchecked(Self::take_binary_types( - &col.clone().into(), - indices, - string_items_buf, - )) - } + fn visit_decimal( + &mut self, + buffer: Buffer, + size: DecimalSize, + ) -> Result<()> { + self.result = Some(Value::Column(T::upcast_column( + self.take_primitive_types(buffer), + size, + ))); + Ok(()) } - pub fn take_boolean_types(col: &Bitmap, indices: &[I]) -> Bitmap - where I: databend_common_arrow::arrow::types::Index { - let num_rows = indices.len(); + fn visit_boolean(&mut self, col: Bitmap) -> Result<()> { + let num_rows = self.indices.len(); // Fast path: avoid iterating column to generate a new bitmap. // If this [`Bitmap`] is all true or all false and `num_rows <= bitmap.len()``, // we can just slice it. if num_rows <= col.len() && (col.unset_bits() == 0 || col.unset_bits() == col.len()) { let mut bitmap = col.clone(); bitmap.slice(0, num_rows); - return bitmap; + self.result = Some(Value::Column(BooleanType::upcast_column(bitmap))); + return Ok(()); } let capacity = num_rows.saturating_add(7) / 8; @@ -312,7 +180,7 @@ impl Column { let mut value = 0; let mut i = 0; - for index in indices.iter() { + for index in self.indices.iter() { if col.get_bit(index.to_usize()) { value |= BIT_MASK[i % 8]; } else { @@ -328,26 +196,97 @@ impl Column { builder.push(value); } - unsafe { + let result = unsafe { Bitmap::from_inner(Arc::new(builder.into()), 0, num_rows, unset_bits) .ok() .unwrap() - } + }; + self.result = Some(Value::Column(BooleanType::upcast_column(result))); + Ok(()) } - fn take_value_types( - col: &T::Column, - mut builder: T::ColumnBuilder, - indices: &[I], - ) -> Column - where - I: databend_common_arrow::arrow::types::Index, - { - for index in indices { - T::push_item(&mut builder, unsafe { - T::index_column_unchecked(col, index.to_usize()) - }); + fn visit_binary(&mut self, col: BinaryColumn) -> Result<()> { + self.result = Some(Value::Column(BinaryType::upcast_column( + self.take_binary_types(&col), + ))); + Ok(()) + } + + fn visit_string(&mut self, column: StringColumn) -> Result<()> { + let column: BinaryColumn = column.into(); + self.result = Some(Value::Column(StringType::upcast_column(unsafe { + StringColumn::from_binary_unchecked(self.take_binary_types(&column)) + }))); + Ok(()) + } + + fn visit_variant(&mut self, column: BinaryColumn) -> Result<()> { + self.result = Some(Value::Column(VariantType::upcast_column( + self.take_binary_types(&column), + ))); + Ok(()) + } +} + +impl<'a, I> TakeVisitor<'a, I> +where I: databend_common_arrow::arrow::types::Index +{ + fn take_primitive_types(&mut self, buffer: Buffer) -> Buffer { + let col = buffer.as_slice(); + let result: Vec = self + .indices + .iter() + .map(|index| unsafe { *col.get_unchecked(index.to_usize()) }) + .collect(); + result.into() + } + + fn take_binary_types(&mut self, col: &BinaryColumn) -> BinaryColumn { + let num_rows = self.indices.len(); + + // Each element of `items` is (string pointer(u64), string length), if `string_items_buf` + // can be reused, we will not re-allocate memory. + let mut items: Option> = match &self.string_items_buf { + Some(string_items_buf) if string_items_buf.capacity() >= num_rows => None, + _ => Some(Vec::with_capacity(num_rows)), + }; + let items = match items.is_some() { + true => items.as_mut().unwrap(), + false => self.string_items_buf.as_mut().unwrap(), + }; + + // [`BinaryColumn`] consists of [`data`] and [`offset`], we build [`data`] and [`offset`] respectively, + // and then call `BinaryColumn::new(data.into(), offsets.into())` to create [`BinaryColumn`]. + let col_offset = col.offsets().as_slice(); + let col_data_ptr = col.data().as_slice().as_ptr(); + let mut offsets: Vec = Vec::with_capacity(num_rows + 1); + let mut data_size = 0; + + // Build [`offset`] and calculate `data_size` required by [`data`]. + unsafe { + items.set_len(num_rows); + offsets.set_len(num_rows + 1); + *offsets.get_unchecked_mut(0) = 0; + for (i, index) in self.indices.iter().enumerate() { + let start = *col_offset.get_unchecked(index.to_usize()) as usize; + let len = *col_offset.get_unchecked(index.to_usize() + 1) as usize - start; + data_size += len as u64; + *items.get_unchecked_mut(i) = (col_data_ptr.add(start) as u64, len); + *offsets.get_unchecked_mut(i + 1) = data_size; + } + } + + // Build [`data`]. + let mut data: Vec = Vec::with_capacity(data_size as usize); + let mut data_ptr = data.as_mut_ptr(); + + unsafe { + for (str_ptr, len) in items.iter() { + copy_advance_aligned(*str_ptr as *const u8, &mut data_ptr, *len); + } + set_vec_len_by_ptr(&mut data, data_ptr); } - T::upcast_column(T::build_column(builder)) + + BinaryColumn::new(data.into(), offsets.into()) } } diff --git a/src/query/expression/src/kernels/take_chunks.rs b/src/query/expression/src/kernels/take_chunks.rs index 6cdf8ff405e9..34b0d2598fb9 100644 --- a/src/query/expression/src/kernels/take_chunks.rs +++ b/src/query/expression/src/kernels/take_chunks.rs @@ -35,23 +35,7 @@ use crate::types::nullable::NullableColumn; use crate::types::nullable::NullableColumnVec; use crate::types::number::NumberColumn; use crate::types::string::StringColumn; -use crate::types::AnyType; -use crate::types::ArgType; -use crate::types::ArrayType; -use crate::types::BinaryType; -use crate::types::BooleanType; -use crate::types::DataType; -use crate::types::DateType; -use crate::types::GeographyType; -use crate::types::MapType; -use crate::types::NumberColumnVec; -use crate::types::NumberType; -use crate::types::StringType; -use crate::types::TimestampType; -use crate::types::ValueType; -use crate::types::VariantType; -use crate::types::F32; -use crate::types::F64; +use crate::types::*; use crate::with_decimal_type; use crate::with_number_mapped_type; use crate::BlockEntry; diff --git a/src/query/expression/src/kernels/take_compact.rs b/src/query/expression/src/kernels/take_compact.rs index db29f944e605..a2f97b894956 100644 --- a/src/query/expression/src/kernels/take_compact.rs +++ b/src/query/expression/src/kernels/take_compact.rs @@ -18,31 +18,12 @@ use databend_common_exception::Result; use crate::kernels::utils::copy_advance_aligned; use crate::kernels::utils::set_vec_len_by_ptr; use crate::kernels::utils::store_advance_aligned; -use crate::types::array::ArrayColumn; -use crate::types::array::ArrayColumnBuilder; use crate::types::binary::BinaryColumn; -use crate::types::bitmap::BitmapType; -use crate::types::decimal::DecimalColumn; -use crate::types::geometry::GeometryType; -use crate::types::map::KvColumnBuilder; use crate::types::nullable::NullableColumn; -use crate::types::number::NumberColumn; use crate::types::string::StringColumn; -use crate::types::AnyType; -use crate::types::ArgType; -use crate::types::ArrayType; -use crate::types::BinaryType; -use crate::types::BooleanType; -use crate::types::GeographyType; -use crate::types::MapType; -use crate::types::NumberType; -use crate::types::StringType; -use crate::types::ValueType; -use crate::types::VariantType; -use crate::with_decimal_type; -use crate::with_number_mapped_type; +use crate::types::*; +use crate::visitor::ValueVisitor; use crate::BlockEntry; -use crate::Column; use crate::ColumnBuilder; use crate::DataBlock; use crate::Value; @@ -60,20 +41,19 @@ impl DataBlock { num_rows ); + let mut taker = TakeCompactVisitor::new(indices, num_rows); let after_columns = self .columns() .iter() - .map(|entry| match &entry.value { - Value::Scalar(s) => BlockEntry { + .map(|entry| { + taker.visit_value(entry.value.clone())?; + let result = taker.result.take().unwrap(); + Ok(BlockEntry { + value: result, data_type: entry.data_type.clone(), - value: Value::Scalar(s.clone()), - }, - Value::Column(c) => BlockEntry { - data_type: entry.data_type.clone(), - value: Value::Column(Column::take_compacted_indices(c, indices, num_rows)), - }, + }) }) - .collect(); + .collect::>>()?; Ok(DataBlock::new_with_meta( after_columns, @@ -83,133 +63,129 @@ impl DataBlock { } } -impl Column { - pub fn take_compacted_indices(&self, indices: &[(u32, u32)], num_rows: usize) -> Self { - match self { - Column::Null { .. } => Column::Null { len: num_rows }, - Column::EmptyArray { .. } => Column::EmptyArray { len: num_rows }, - Column::EmptyMap { .. } => Column::EmptyMap { len: num_rows }, - Column::Number(column) => with_number_mapped_type!(|NUM_TYPE| match column { - NumberColumn::NUM_TYPE(values) => { - let builder = Self::take_compacted_primitive_types(values, indices, num_rows); - >::upcast_column(>::column_from_vec( - builder, - &[], - )) - } - }), - Column::Decimal(column) => with_decimal_type!(|DECIMAL_TYPE| match column { - DecimalColumn::DECIMAL_TYPE(values, size) => { - let builder = Self::take_compacted_primitive_types(values, indices, num_rows); - Column::Decimal(DecimalColumn::DECIMAL_TYPE(builder.into(), *size)) - } - }), - Column::Boolean(bm) => { - Self::take_compacted_arg_types::(bm, indices, num_rows) - } - Column::Binary(column) => BinaryType::upcast_column(Self::take_compact_binary_types( - column, indices, num_rows, - )), - Column::String(column) => StringType::upcast_column(Self::take_compact_string_types( - column, indices, num_rows, - )), - Column::Timestamp(column) => { - let builder = Self::take_compacted_primitive_types(column, indices, num_rows); - let ts = >::upcast_column(>::column_from_vec( - builder, - &[], - )) - .into_number() - .unwrap() - .into_int64() - .unwrap(); - Column::Timestamp(ts) - } - Column::Date(column) => { - let builder = Self::take_compacted_primitive_types(column, indices, num_rows); - let d = >::upcast_column(>::column_from_vec( - builder, - &[], - )) - .into_number() - .unwrap() - .into_int32() +struct TakeCompactVisitor<'a> { + indices: &'a [(u32, u32)], + num_rows: usize, + result: Option>, +} + +impl<'a> TakeCompactVisitor<'a> { + fn new(indices: &'a [(u32, u32)], num_rows: usize) -> Self { + Self { + indices, + num_rows, + result: None, + } + } +} + +impl<'a> ValueVisitor for TakeCompactVisitor<'a> { + fn visit_scalar(&mut self, scalar: crate::Scalar) -> Result<()> { + self.result = Some(Value::Scalar(scalar)); + Ok(()) + } + + fn visit_nullable(&mut self, column: Box>) -> Result<()> { + self.visit_boolean(column.validity.clone())?; + let validity = + BooleanType::try_downcast_column(self.result.take().unwrap().as_column().unwrap()) .unwrap(); - Column::Date(d) - } - Column::Geography(column) => { - Self::take_compacted_arg_types::(column, indices, num_rows) - } - Column::Array(column) => { - let mut offsets = Vec::with_capacity(num_rows + 1); - offsets.push(0); - let builder = ColumnBuilder::with_capacity(&column.values.data_type(), num_rows); - let builder = ArrayColumnBuilder { builder, offsets }; - Self::take_compacted_value_types::>(column, builder, indices) - } - Column::Map(column) => { - let mut offsets = Vec::with_capacity(num_rows + 1); - offsets.push(0); - let builder = ColumnBuilder::from_column( - ColumnBuilder::with_capacity(&column.values.data_type(), num_rows).build(), - ); - let (key_builder, val_builder) = match builder { - ColumnBuilder::Tuple(fields) => (fields[0].clone(), fields[1].clone()), - _ => unreachable!(), - }; - let builder = KvColumnBuilder { - keys: key_builder, - values: val_builder, - }; - let builder = ArrayColumnBuilder { builder, offsets }; - let column = ArrayColumn::try_downcast(column).unwrap(); - Self::take_compacted_value_types::>( - &column, builder, indices, - ) - } - Column::Bitmap(column) => BitmapType::upcast_column(Self::take_compact_binary_types( - column, indices, num_rows, - )), - Column::Nullable(c) => { - let column = c.column.take_compacted_indices(indices, num_rows); - let validity = - Self::take_compacted_arg_types::(&c.validity, indices, num_rows); - Column::Nullable(Box::new(NullableColumn::new( - column, - BooleanType::try_downcast_column(&validity).unwrap(), - ))) - } - Column::Tuple(fields) => { - let fields = fields - .iter() - .map(|c| c.take_compacted_indices(indices, num_rows)) - .collect(); - Column::Tuple(fields) + + self.visit_column(column.column)?; + let result = self.result.take().unwrap(); + let result = result.as_column().unwrap(); + self.result = Some(Value::Column(NullableColumn::new_column( + result.clone(), + validity, + ))); + Ok(()) + } + + fn visit_typed_column(&mut self, column: ::Column) -> Result<()> { + let c = T::upcast_column(column.clone()); + let builder = ColumnBuilder::with_capacity(&c.data_type(), c.len()); + let mut builder = T::try_downcast_owned_builder(builder).unwrap(); + + for (index, cnt) in self.indices { + for _ in 0..*cnt { + T::push_item(&mut builder, unsafe { + T::index_column_unchecked(&column, *index as usize) + }); } - Column::Variant(column) => VariantType::upcast_column(Self::take_compact_binary_types( - column, indices, num_rows, - )), - Column::Geometry(column) => GeometryType::upcast_column( - Self::take_compact_binary_types(column, indices, num_rows), - ), } + self.result = Some(Value::Column(T::upcast_column(T::build_column(builder)))); + + Ok(()) } - pub fn take_compacted_primitive_types( - col: &Buffer, - indices: &[(u32, u32)], - num_rows: usize, - ) -> Vec - where - T: Copy, - { - let col_ptr = col.as_slice().as_ptr(); - let mut builder: Vec = Vec::with_capacity(num_rows); + fn visit_number( + &mut self, + buffer: as ValueType>::Column, + ) -> Result<()> { + self.result = Some(Value::Column(NumberType::::upcast_column( + self.take_primitive_types(buffer), + ))); + Ok(()) + } + + fn visit_timestamp(&mut self, buffer: Buffer) -> Result<()> { + self.result = Some(Value::Column(TimestampType::upcast_column( + self.take_primitive_types(buffer), + ))); + Ok(()) + } + + fn visit_date(&mut self, buffer: Buffer) -> Result<()> { + self.result = Some(Value::Column(DateType::upcast_column( + self.take_primitive_types(buffer), + ))); + Ok(()) + } + + fn visit_decimal( + &mut self, + buffer: Buffer, + size: DecimalSize, + ) -> Result<()> { + self.result = Some(Value::Column(T::upcast_column( + self.take_primitive_types(buffer), + size, + ))); + Ok(()) + } + + fn visit_binary(&mut self, col: BinaryColumn) -> Result<()> { + self.result = Some(Value::Column(BinaryType::upcast_column( + self.take_binary_types(&col), + ))); + Ok(()) + } + + fn visit_string(&mut self, column: StringColumn) -> Result<()> { + let column: BinaryColumn = column.into(); + self.result = Some(Value::Column(StringType::upcast_column(unsafe { + StringColumn::from_binary_unchecked(self.take_binary_types(&column)) + }))); + Ok(()) + } + + fn visit_variant(&mut self, column: BinaryColumn) -> Result<()> { + self.result = Some(Value::Column(VariantType::upcast_column( + self.take_binary_types(&column), + ))); + Ok(()) + } +} + +impl<'a> TakeCompactVisitor<'a> { + fn take_primitive_types(&mut self, buffer: Buffer) -> Buffer { + let col_ptr = buffer.as_slice().as_ptr(); + let mut builder: Vec = Vec::with_capacity(self.num_rows); let mut ptr = builder.as_mut_ptr(); let mut remain; unsafe { - for (index, cnt) in indices.iter() { + for (index, cnt) in self.indices.iter() { if *cnt == 1 { copy_advance_aligned(col_ptr.add(*index as usize), &mut ptr, 1); continue; @@ -238,28 +214,24 @@ impl Column { set_vec_len_by_ptr(&mut builder, ptr); } - builder + builder.into() } - pub fn take_compact_binary_types( - col: &BinaryColumn, - indices: &[(u32, u32)], - num_rows: usize, - ) -> BinaryColumn { + fn take_binary_types(&mut self, col: &BinaryColumn) -> BinaryColumn { // Each element of `items` is (string(&[u8]), repeat times). - let mut items = Vec::with_capacity(indices.len()); + let mut items = Vec::with_capacity(self.indices.len()); let mut items_ptr = items.as_mut_ptr(); // [`BinaryColumn`] consists of [`data`] and [`offset`], we build [`data`] and [`offset`] respectively, // and then call `BinaryColumn::new(data.into(), offsets.into())` to create [`BinaryColumn`]. - let mut offsets = Vec::with_capacity(num_rows + 1); + let mut offsets = Vec::with_capacity(self.num_rows + 1); let mut offsets_ptr = offsets.as_mut_ptr(); let mut data_size = 0; // Build [`offset`] and calculate `data_size` required by [`data`]. unsafe { store_advance_aligned::(0, &mut offsets_ptr); - for (index, cnt) in indices.iter() { + for (index, cnt) in self.indices.iter() { let item = col.index_unchecked(*index as usize); store_advance_aligned((item, *cnt), &mut items_ptr); for _ in 0..*cnt { @@ -310,50 +282,4 @@ impl Column { BinaryColumn::new(data.into(), offsets.into()) } - - pub fn take_compact_string_types( - col: &StringColumn, - indices: &[(u32, u32)], - num_rows: usize, - ) -> StringColumn { - unsafe { - StringColumn::from_binary_unchecked(Self::take_compact_binary_types( - &col.clone().into(), - indices, - num_rows, - )) - } - } - - fn take_compacted_arg_types( - col: &T::Column, - indices: &[(u32, u32)], - num_rows: usize, - ) -> Column { - let mut builder = T::create_builder(num_rows, &[]); - for (index, cnt) in indices { - for _ in 0..*cnt { - T::push_item(&mut builder, unsafe { - T::index_column_unchecked(col, *index as usize) - }); - } - } - let column = T::build_column(builder); - T::upcast_column(column) - } - - fn take_compacted_value_types( - col: &T::Column, - mut builder: T::ColumnBuilder, - indices: &[(u32, u32)], - ) -> Column { - for (index, cnt) in indices { - for _ in 0..*cnt { - T::push_item(&mut builder, unsafe { - T::index_column_unchecked(col, *index as usize) - }); - } - } - T::upcast_column(T::build_column(builder)) - } } diff --git a/src/query/expression/src/kernels/take_ranges.rs b/src/query/expression/src/kernels/take_ranges.rs index a5dcda765d78..71fbd0acc1af 100644 --- a/src/query/expression/src/kernels/take_ranges.rs +++ b/src/query/expression/src/kernels/take_ranges.rs @@ -19,28 +19,17 @@ use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_arrow::arrow::buffer::Buffer; use databend_common_exception::Result; +use crate::copy_continuous_bits; use crate::kernels::take::BIT_MASK; use crate::kernels::utils::copy_advance_aligned; use crate::kernels::utils::set_vec_len_by_ptr; use crate::kernels::utils::store_advance_aligned; -use crate::types::array::ArrayColumn; -use crate::types::array::ArrayColumnBuilder; use crate::types::binary::BinaryColumn; -use crate::types::decimal::DecimalColumn; -use crate::types::map::KvColumnBuilder; use crate::types::nullable::NullableColumn; -use crate::types::number::NumberColumn; use crate::types::string::StringColumn; -use crate::types::AnyType; -use crate::types::ArgType; -use crate::types::ArrayType; -use crate::types::GeographyType; -use crate::types::MapType; -use crate::types::ValueType; -use crate::with_decimal_type; -use crate::with_number_type; +use crate::types::*; +use crate::visitor::ValueVisitor; use crate::BlockEntry; -use crate::Column; use crate::ColumnBuilder; use crate::DataBlock; use crate::Value; @@ -56,216 +45,121 @@ impl DataBlock { num_rows ); - let columns = self + let mut taker = TakeRangeVisitor::new(ranges, num_rows); + let after_columns = self .columns() .iter() - .map(|entry| match &entry.value { - Value::Column(c) => { - let value = Value::Column(Column::take_ranges(c, ranges, num_rows)); - BlockEntry::new(entry.data_type.clone(), value) - } - _ => entry.clone(), + .map(|entry| { + taker.visit_value(entry.value.clone())?; + let result = taker.result.take().unwrap(); + Ok(BlockEntry { + value: result, + data_type: entry.data_type.clone(), + }) }) - .collect(); - Ok(DataBlock::new(columns, num_rows)) + .collect::>>()?; + + Ok(DataBlock::new_with_meta( + after_columns, + num_rows, + self.get_meta().cloned(), + )) } } -impl Column { - // Generate a new `Column` by the specified indices ranges. - fn take_ranges(&self, ranges: &[Range], num_rows: usize) -> Column { - match self { - Column::Null { .. } => Column::Null { len: num_rows }, - Column::EmptyArray { .. } => Column::EmptyArray { len: num_rows }, - Column::EmptyMap { .. } => Column::EmptyMap { len: num_rows }, - Column::Number(column) => with_number_type!(|NUM_TYPE| match column { - NumberColumn::NUM_TYPE(values) => { - Column::Number(NumberColumn::NUM_TYPE(Self::take_ranges_primitive_types( - values, ranges, num_rows, - ))) - } - }), - Column::Decimal(column) => with_decimal_type!(|DECIMAL_TYPE| match column { - DecimalColumn::DECIMAL_TYPE(values, size) => { - Column::Decimal(DecimalColumn::DECIMAL_TYPE( - Self::take_ranges_primitive_types(values, ranges, num_rows), - *size, - )) - } - }), - Column::Boolean(bm) => { - let column = Self::take_ranges_boolean_types(bm, ranges, num_rows); - Column::Boolean(column) - } - Column::Binary(column) => { - let column = Self::take_ranges_binary_types(column, ranges, num_rows); - Column::Binary(column) - } - Column::String(column) => { - let column = Self::take_ranges_string_types(column, ranges, num_rows); - Column::String(column) - } - Column::Timestamp(column) => { - let ts = Self::take_ranges_primitive_types(column, ranges, num_rows); - Column::Timestamp(ts) - } - Column::Date(column) => { - let d = Self::take_ranges_primitive_types(column, ranges, num_rows); - Column::Date(d) - } - Column::Array(column) => { - let mut offsets = Vec::with_capacity(num_rows + 1); - offsets.push(0); - let builder = ColumnBuilder::with_capacity(&column.values.data_type(), num_rows); - let builder = ArrayColumnBuilder { builder, offsets }; - Self::take_ranges_scalar_types::>( - column, builder, ranges, num_rows, - ) - } - Column::Map(column) => { - let mut offsets = Vec::with_capacity(num_rows + 1); - offsets.push(0); - let builder = ColumnBuilder::from_column( - ColumnBuilder::with_capacity(&column.values.data_type(), num_rows).build(), - ); - let (key_builder, val_builder) = match builder { - ColumnBuilder::Tuple(fields) => (fields[0].clone(), fields[1].clone()), - _ => unreachable!(), - }; - let builder = KvColumnBuilder { - keys: key_builder, - values: val_builder, - }; - let builder = ArrayColumnBuilder { builder, offsets }; - let column = ArrayColumn::try_downcast(column).unwrap(); - Self::take_ranges_scalar_types::>( - &column, builder, ranges, num_rows, - ) - } - Column::Bitmap(column) => { - let column = Self::take_ranges_binary_types(column, ranges, num_rows); - Column::Bitmap(column) - } +struct TakeRangeVisitor<'a> { + ranges: &'a [Range], + num_rows: usize, + result: Option>, +} - Column::Nullable(c) => { - let column = Self::take_ranges(&c.column, ranges, num_rows); - let validity = Self::take_ranges_boolean_types(&c.validity, ranges, num_rows); - NullableColumn::new_column(column, validity) - } - Column::Tuple(fields) => { - let fields = fields - .iter() - .map(|c| c.take_ranges(ranges, num_rows)) - .collect(); - Column::Tuple(fields) - } - Column::Variant(column) => { - let column = Self::take_ranges_binary_types(column, ranges, num_rows); - Column::Variant(column) - } - Column::Geometry(column) => { - let column = Self::take_ranges_binary_types(column, ranges, num_rows); - Column::Geometry(column) - } - Column::Geography(column) => { - let mut builder = GeographyType::create_builder(num_rows, &[]); - for range in ranges { - GeographyType::append_column( - &mut builder, - &column.slice(range.start as usize..range.end as usize), - ) - } - GeographyType::upcast_column(GeographyType::build_column(builder)) - } +impl<'a> TakeRangeVisitor<'a> { + fn new(ranges: &'a [Range], num_rows: usize) -> Self { + Self { + ranges, + num_rows, + result: None, } } +} - fn take_ranges_scalar_types( - col: &T::Column, - mut builder: T::ColumnBuilder, - ranges: &[Range], - _num_rows: usize, - ) -> Column { - for range in ranges { - for index in range.start as usize..range.end as usize { - T::push_item(&mut builder, unsafe { - T::index_column_unchecked(col, index) - }); - } - } - T::upcast_column(T::build_column(builder)) +impl<'a> ValueVisitor for TakeRangeVisitor<'a> { + fn visit_scalar(&mut self, scalar: crate::Scalar) -> Result<()> { + self.result = Some(Value::Scalar(scalar)); + Ok(()) } - fn take_ranges_primitive_types( - values: &Buffer, - ranges: &[Range], - num_rows: usize, - ) -> Buffer { - let mut builder: Vec = Vec::with_capacity(num_rows); - for range in ranges { - builder.extend(&values[range.start as usize..range.end as usize]); - } - builder.into() + fn visit_nullable(&mut self, column: Box>) -> Result<()> { + self.visit_boolean(column.validity.clone())?; + let validity = + BooleanType::try_downcast_column(self.result.take().unwrap().as_column().unwrap()) + .unwrap(); + + self.visit_column(column.column)?; + let result = self.result.take().unwrap(); + let result = result.as_column().unwrap(); + self.result = Some(Value::Column(NullableColumn::new_column( + result.clone(), + validity, + ))); + Ok(()) } - fn take_ranges_binary_types( - values: &BinaryColumn, - ranges: &[Range], - num_rows: usize, - ) -> BinaryColumn { - let mut offsets: Vec = Vec::with_capacity(num_rows + 1); - let mut data_size = 0; + fn visit_typed_column(&mut self, column: ::Column) -> Result<()> { + let c = T::upcast_column(column.clone()); + let builder = ColumnBuilder::with_capacity(&c.data_type(), c.len()); + let mut builder = T::try_downcast_owned_builder(builder).unwrap(); - let value_data = values.data().as_slice(); - let values_offset = values.offsets().as_slice(); - // Build [`offset`] and calculate `data_size` required by [`data`]. - offsets.push(0); - for range in ranges { - let mut offset_start = values_offset[range.start as usize]; - for offset_end in values_offset[range.start as usize + 1..range.end as usize + 1].iter() - { - data_size += offset_end - offset_start; - offset_start = *offset_end; - offsets.push(data_size); + for range in self.ranges { + for index in range.start as usize..range.end as usize { + T::push_item(&mut builder, unsafe { + T::index_column_unchecked(&column, index) + }); } } + self.result = Some(Value::Column(T::upcast_column(T::build_column(builder)))); - // Build [`data`]. - let mut data: Vec = Vec::with_capacity(data_size as usize); - let mut data_ptr = data.as_mut_ptr(); + Ok(()) + } - unsafe { - for range in ranges { - let col_data = &value_data[values_offset[range.start as usize] as usize - ..values_offset[range.end as usize] as usize]; - copy_advance_aligned(col_data.as_ptr(), &mut data_ptr, col_data.len()); - } - set_vec_len_by_ptr(&mut data, data_ptr); - } + fn visit_number( + &mut self, + buffer: as ValueType>::Column, + ) -> Result<()> { + self.result = Some(Value::Column(NumberType::::upcast_column( + self.take_primitive_types(buffer), + ))); + Ok(()) + } - BinaryColumn::new(data.into(), offsets.into()) + fn visit_timestamp(&mut self, buffer: Buffer) -> Result<()> { + self.result = Some(Value::Column(TimestampType::upcast_column( + self.take_primitive_types(buffer), + ))); + Ok(()) } - fn take_ranges_string_types( - values: &StringColumn, - ranges: &[Range], - num_rows: usize, - ) -> StringColumn { - unsafe { - StringColumn::from_binary_unchecked(Self::take_ranges_binary_types( - &values.clone().into(), - ranges, - num_rows, - )) - } + fn visit_date(&mut self, buffer: Buffer) -> Result<()> { + self.result = Some(Value::Column(DateType::upcast_column( + self.take_primitive_types(buffer), + ))); + Ok(()) } - fn take_ranges_boolean_types( - bitmap: &Bitmap, - ranges: &[Range], - num_rows: usize, - ) -> Bitmap { - let capacity = num_rows.saturating_add(7) / 8; + fn visit_decimal( + &mut self, + buffer: Buffer, + size: DecimalSize, + ) -> Result<()> { + self.result = Some(Value::Column(T::upcast_column( + self.take_primitive_types(buffer), + size, + ))); + Ok(()) + } + + fn visit_boolean(&mut self, bitmap: Bitmap) -> Result<()> { + let capacity = self.num_rows.saturating_add(7) / 8; let mut builder: Vec = Vec::with_capacity(capacity); let mut builder_ptr = builder.as_mut_ptr(); let mut builder_idx = 0; @@ -274,7 +168,7 @@ impl Column { let (bitmap_slice, bitmap_offset, _) = bitmap.as_slice(); unsafe { - for range in ranges { + for range in self.ranges { let mut start = range.start as usize; let end = range.end as usize; if builder_idx % 8 != 0 { @@ -295,7 +189,7 @@ impl Column { } let remaining = end - start; if remaining > 0 { - let (cur_buf, cur_unset_bits) = Self::copy_continuous_bits( + let (cur_buf, cur_unset_bits) = copy_continuous_bits( &mut builder_ptr, bitmap_slice, builder_idx, @@ -313,9 +207,78 @@ impl Column { } set_vec_len_by_ptr(&mut builder, builder_ptr); - Bitmap::from_inner(Arc::new(builder.into()), 0, num_rows, unset_bits) + let bitmap = Bitmap::from_inner(Arc::new(builder.into()), 0, self.num_rows, unset_bits) .ok() - .unwrap() + .unwrap(); + self.result = Some(Value::Column(BooleanType::upcast_column(bitmap))); + Ok(()) + } + } + + fn visit_binary(&mut self, col: BinaryColumn) -> Result<()> { + self.result = Some(Value::Column(BinaryType::upcast_column( + self.take_binary_types(&col), + ))); + Ok(()) + } + + fn visit_string(&mut self, column: StringColumn) -> Result<()> { + let column: BinaryColumn = column.into(); + self.result = Some(Value::Column(StringType::upcast_column(unsafe { + StringColumn::from_binary_unchecked(self.take_binary_types(&column)) + }))); + Ok(()) + } + + fn visit_variant(&mut self, column: BinaryColumn) -> Result<()> { + self.result = Some(Value::Column(VariantType::upcast_column( + self.take_binary_types(&column), + ))); + Ok(()) + } +} + +impl<'a> TakeRangeVisitor<'a> { + fn take_primitive_types(&mut self, buffer: Buffer) -> Buffer { + let mut builder: Vec = Vec::with_capacity(self.num_rows); + let values = buffer.as_slice(); + for range in self.ranges { + builder.extend(&values[range.start as usize..range.end as usize]); } + builder.into() + } + + fn take_binary_types(&mut self, values: &BinaryColumn) -> BinaryColumn { + let mut offsets: Vec = Vec::with_capacity(self.num_rows + 1); + let mut data_size = 0; + + let value_data = values.data().as_slice(); + let values_offset = values.offsets().as_slice(); + // Build [`offset`] and calculate `data_size` required by [`data`]. + offsets.push(0); + for range in self.ranges { + let mut offset_start = values_offset[range.start as usize]; + for offset_end in values_offset[range.start as usize + 1..range.end as usize + 1].iter() + { + data_size += offset_end - offset_start; + offset_start = *offset_end; + offsets.push(data_size); + } + } + + // Build [`data`]. + let mut data: Vec = Vec::with_capacity(data_size as usize); + let mut data_ptr = data.as_mut_ptr(); + + unsafe { + for range in self.ranges { + let col_data = &value_data[values_offset[range.start as usize] as usize + ..values_offset[range.end as usize] as usize]; + copy_advance_aligned(col_data.as_ptr(), &mut data_ptr, col_data.len()); + } + set_vec_len_by_ptr(&mut data, data_ptr); + } + + BinaryColumn::new(data.into(), offsets.into()) } } diff --git a/src/query/expression/src/kernels/utils.rs b/src/query/expression/src/kernels/utils.rs index d74bb15ba2d9..0e37bcb4e470 100644 --- a/src/query/expression/src/kernels/utils.rs +++ b/src/query/expression/src/kernels/utils.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use super::take::BIT_MASK; + /// # Safety /// /// * `ptr` must be [valid] for writes of `size_of::()` bytes. @@ -54,6 +56,45 @@ pub unsafe fn copy_advance_aligned(src: *const T, ptr: &mut *mut T, count: us } } +/// # Safety +/// * `src` + `src_idx`(in bits) must be [valid] for reads of `len` bits. +/// * `ptr` must be [valid] for writes of `len` bits. +pub unsafe fn copy_continuous_bits( + ptr: &mut *mut u8, + src: &[u8], + mut dst_idx: usize, + mut src_idx: usize, + len: usize, +) -> (u8, usize) { + let mut unset_bits = 0; + let chunks = BitChunks::new(src, src_idx, len); + chunks.iter().for_each(|chunk| { + unset_bits += chunk.count_zeros(); + copy_advance_aligned(&chunk as *const _ as *const u8, ptr, 8); + }); + + let mut remainder = chunks.remainder_len(); + dst_idx += len - remainder; + src_idx += len - remainder; + + let mut buf = 0; + while remainder > 0 { + if (*src.as_ptr().add(src_idx >> 3) & BIT_MASK[src_idx & 7]) != 0 { + buf |= BIT_MASK[dst_idx % 8]; + } else { + unset_bits += 1; + } + src_idx += 1; + dst_idx += 1; + remainder -= 1; + if dst_idx % 8 == 0 { + store_advance_aligned(buf, ptr); + buf = 0; + } + } + (buf, unset_bits as usize) +} + /// # Safety /// /// * `(ptr as usize - vec.as_ptr() as usize) / std::mem::size_of::()` must be diff --git a/src/query/expression/src/utils/visitor.rs b/src/query/expression/src/utils/visitor.rs index c3f7929b4f08..535383ce7968 100755 --- a/src/query/expression/src/utils/visitor.rs +++ b/src/query/expression/src/utils/visitor.rs @@ -43,7 +43,7 @@ pub trait ValueVisitor { self.visit_typed_column::>(column) } - fn visit_decimal(&mut self, column: Buffer) -> Result<()> { + fn visit_decimal(&mut self, column: Buffer, _size: DecimalSize) -> Result<()> { self.visit_typed_column::>(column) } @@ -120,7 +120,7 @@ pub trait ValueVisitor { } Column::Decimal(column) => { with_decimal_type!(|DECIMAL_TYPE| match column { - DecimalColumn::DECIMAL_TYPE(b, _) => self.visit_decimal(b), + DecimalColumn::DECIMAL_TYPE(b, size) => self.visit_decimal(b, size), }) } Column::Boolean(bitmap) => self.visit_boolean(bitmap), diff --git a/src/query/expression/tests/it/sort.rs b/src/query/expression/tests/it/sort.rs index f9fc04d2dc78..9c72d7d6ab28 100644 --- a/src/query/expression/tests/it/sort.rs +++ b/src/query/expression/tests/it/sort.rs @@ -15,7 +15,6 @@ use std::vec; use databend_common_exception::Result; -use databend_common_expression::block_debug::assert_block_value_eq; use databend_common_expression::types::decimal::*; use databend_common_expression::types::number::*; use databend_common_expression::types::StringType; @@ -198,11 +197,6 @@ fn test_block_sort() -> Result<()> { entry.value ); } - - // test new sort algorithm - let res = DataBlock::sort_old(&decimal_block, &sort_descs, Some(decimal_block.num_rows()))?; - let res_new = DataBlock::sort(&decimal_block, &sort_descs, None)?; - assert_block_value_eq(&res, &res_new); } Ok(())