From c5f932966e69d628426f78c9d633b1f21a4f638a Mon Sep 17 00:00:00 2001 From: coldWater Date: Sat, 21 Sep 2024 12:59:05 +0800 Subject: [PATCH] feat(query): parallel k way merge sort (#16340) * merge_path Signed-off-by: coldWater * KWaySortPartition Signed-off-by: coldWater * KWayMergeWorkerProcessor Signed-off-by: coldWater * stream Signed-off-by: coldWater * BlockStream Signed-off-by: coldWater * update Signed-off-by: coldWater * event Signed-off-by: coldWater * builder Signed-off-by: coldWater * settings Signed-off-by: coldWater * clean Signed-off-by: coldWater * fix Signed-off-by: coldWater * bypass some task Signed-off-by: coldWater * test Signed-off-by: coldWater * refine Signed-off-by: coldWater * enable Signed-off-by: coldWater * list domain Signed-off-by: coldWater * refine Signed-off-by: coldWater * integrate Signed-off-by: coldWater * fix Signed-off-by: coldWater * fix Signed-off-by: coldWater * search per iter Signed-off-by: coldWater * fix Signed-off-by: coldWater * use block meta Signed-off-by: coldWater * rename Signed-off-by: coldWater * fix Signed-off-by: coldWater * factor Signed-off-by: coldWater * bump quinn Signed-off-by: coldWater * fix test Signed-off-by: coldWater * fix test * fix test Signed-off-by: coldWater * fix Signed-off-by: coldWater * with_block_size_hit Signed-off-by: coldWater * fix Signed-off-by: coldWater --------- Signed-off-by: coldWater Co-authored-by: zhyass Co-authored-by: sundyli <543950155@qq.com> --- Cargo.lock | 10 +- src/query/pipeline/transforms/src/lib.rs | 1 + .../src/processors/transforms/mod.rs | 4 +- .../src/processors/transforms/sort/cursor.rs | 10 +- .../sort/k_way_merge_sort_partition.rs | 258 ++++++ .../processors/transforms/sort/list_domain.rs | 589 +++++++++++++ .../src/processors/transforms/sort/merger.rs | 4 +- .../src/processors/transforms/sort/mod.rs | 4 + .../processors/transforms/sort/rows/common.rs | 6 + .../processors/transforms/sort/rows/mod.rs | 16 +- .../processors/transforms/sort/rows/simple.rs | 61 +- .../transforms/transform_k_way_merge_sort.rs | 785 ++++++++++++++++++ .../transforms/transform_multi_sort_merge.rs | 16 +- .../pipelines/builders/builder_recluster.rs | 19 +- .../src/pipelines/builders/builder_sort.rs | 97 ++- src/query/service/tests/it/pipelines/mod.rs | 1 + .../tests/it/pipelines/transforms/mod.rs | 15 + .../tests/it/pipelines/transforms/sort.rs | 285 +++++++ src/query/settings/src/settings_default.rs | 6 + .../settings/src/settings_getter_setter.rs | 4 + .../09_0008_fuse_optimize_table.test | 13 +- .../09_0016_remote_alter_recluster.test | 4 +- .../20_0013_query_result_cache.test | 3 + .../mode/standalone/explain/clustering.test | 4 +- .../suites/mode/standalone/explain/sort.test | 64 +- .../mode/standalone/explain/window.test | 3 + 26 files changed, 2136 insertions(+), 146 deletions(-) create mode 100644 src/query/pipeline/transforms/src/processors/transforms/sort/k_way_merge_sort_partition.rs create mode 100644 src/query/pipeline/transforms/src/processors/transforms/sort/list_domain.rs create mode 100644 src/query/pipeline/transforms/src/processors/transforms/transform_k_way_merge_sort.rs create mode 100644 src/query/service/tests/it/pipelines/transforms/mod.rs create mode 100644 src/query/service/tests/it/pipelines/transforms/sort.rs diff --git a/Cargo.lock b/Cargo.lock index 9ce24f40e271..b00a339653d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12517,7 +12517,7 @@ dependencies = [ "indoc", "libc", "memoffset", - "parking_lot 0.12.3", + "parking_lot 0.11.2", "portable-atomic", "pyo3-build-config", "pyo3-ffi", @@ -12644,9 +12644,9 @@ dependencies = [ [[package]] name = "quinn" -version = "0.11.3" +version = "0.11.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b22d8e7369034b9a7132bc2008cac12f2013c8132b45e0554e6e20e2617f2156" +checksum = "8c7c5fdde3cdae7203427dc4f0a68fe0ed09833edc525a03456b153b79828684" dependencies = [ "bytes", "pin-project-lite", @@ -15900,8 +15900,8 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 1.0.0", - "rand 0.8.5", + "cfg-if 0.1.10", + "rand 0.7.3", "static_assertions", ] diff --git a/src/query/pipeline/transforms/src/lib.rs b/src/query/pipeline/transforms/src/lib.rs index 7f05fd1aee71..23c89473f7db 100644 --- a/src/query/pipeline/transforms/src/lib.rs +++ b/src/query/pipeline/transforms/src/lib.rs @@ -17,5 +17,6 @@ #![feature(core_intrinsics)] #![feature(int_roundings)] #![feature(let_chains)] +#![feature(iter_map_windows)] pub mod processors; diff --git a/src/query/pipeline/transforms/src/processors/transforms/mod.rs b/src/query/pipeline/transforms/src/processors/transforms/mod.rs index 708181aca6b3..ec6ca0faf96a 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/mod.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/mod.rs @@ -22,14 +22,15 @@ mod transform_compact_block; mod transform_compact_builder; mod transform_compact_no_split_builder; mod transform_dummy; +mod transform_k_way_merge_sort; mod transform_multi_sort_merge; mod transform_pipeline_helper; mod transform_retry_async; mod transform_sort_merge; mod transform_sort_merge_base; mod transform_sort_merge_limit; - pub mod transform_sort_partial; + pub use transform::*; pub use transform_accumulating::*; pub use transform_accumulating_async::*; @@ -39,6 +40,7 @@ pub use transform_compact_block::*; pub use transform_compact_builder::*; pub use transform_compact_no_split_builder::*; pub use transform_dummy::*; +pub use transform_k_way_merge_sort::*; pub use transform_multi_sort_merge::try_add_multi_sort_merge; pub use transform_pipeline_helper::TransformPipelineHelper; pub use transform_retry_async::*; diff --git a/src/query/pipeline/transforms/src/processors/transforms/sort/cursor.rs b/src/query/pipeline/transforms/src/processors/transforms/sort/cursor.rs index e1dad9a908a7..40a81517e86d 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/sort/cursor.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/sort/cursor.rs @@ -27,7 +27,6 @@ where pub input_index: usize, pub row_index: usize, - num_rows: usize, _o: PhantomData, /// rows within [`Cursor`] should be monotonic. @@ -52,7 +51,7 @@ where #[inline] pub fn is_finished(&self) -> bool { - self.num_rows == self.row_index + self.rows.len() == self.row_index } #[inline] @@ -62,12 +61,12 @@ where #[inline] pub fn last(&self) -> R::Item<'_> { - self.rows.row(self.num_rows - 1) + self.rows.last() } #[inline] pub fn num_rows(&self) -> usize { - self.num_rows + self.rows.len() } pub fn cursor_mut(&self) -> CursorMut<'_, R, O> { @@ -87,7 +86,6 @@ pub trait CursorOrder: Sized + Copy { Cursor:: { input_index, row_index: 0, - num_rows: rows.len(), rows, _o: PhantomData, } @@ -153,7 +151,7 @@ where } pub fn is_finished(&self) -> bool { - self.row_index == self.cursor.num_rows + self.row_index == self.cursor.rows.len() } pub fn current<'b>(&'b self) -> R::Item<'a> { diff --git a/src/query/pipeline/transforms/src/processors/transforms/sort/k_way_merge_sort_partition.rs b/src/query/pipeline/transforms/src/processors/transforms/sort/k_way_merge_sort_partition.rs new file mode 100644 index 000000000000..ddc4bd088e74 --- /dev/null +++ b/src/query/pipeline/transforms/src/processors/transforms/sort/k_way_merge_sort_partition.rs @@ -0,0 +1,258 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp::Ordering; +use std::collections::VecDeque; +use std::sync::Arc; + +use databend_common_exception::Result; +use databend_common_expression::BlockMetaInfo; +use databend_common_expression::BlockMetaInfoDowncast; +use databend_common_expression::BlockMetaInfoPtr; +use databend_common_expression::DataBlock; +use databend_common_expression::DataSchemaRef; +use databend_common_expression::SortColumnDescription; + +use super::list_domain::Candidate; +use super::list_domain::EndDomain; +use super::list_domain::List; +use super::list_domain::Partition; +use super::Rows; +use super::SortedStream; + +pub struct KWaySortPartitioner +where + R: Rows, + S: SortedStream, +{ + schema: DataSchemaRef, + sort_desc: Arc>, + unsorted_streams: Vec, + pending_streams: VecDeque, + + buffer: Vec, + rows: Vec>, + cur_task: usize, + + limit: Option, + total_rows: usize, + + // settings + min_task: usize, + max_task: usize, + max_iter: usize, + search_per_iter: usize, +} + +impl KWaySortPartitioner +where + R: Rows, + S: SortedStream, +{ + pub fn new( + schema: DataSchemaRef, + streams: Vec, + sort_desc: Arc>, + batch_rows: usize, + limit: Option, + ) -> Self { + debug_assert!(streams.len() > 1, "streams.len() = {}", streams.len()); + + let buffer = vec![DataBlock::empty_with_schema(schema.clone()); streams.len()]; + let rows = vec![None; streams.len()]; + let pending_streams = (0..streams.len()).collect(); + + fn get_env(key: &str, default: T) -> T { + std::env::var(key).map_or(default, |s| s.parse::().unwrap_or(default)) + } + + let min_task = + (batch_rows as f64 * get_env("K_WAY_MERGE_SORT_MIN_TASK_FACTOR", 2.0)) as usize; + assert!(min_task > 0); + + let max_task = + (batch_rows as f64 * get_env("K_WAY_MERGE_SORT_MAX_TASK_FACTOR", 4.0)) as usize; + assert!(max_task > 0); + + let max_iter = get_env("K_WAY_MERGE_SORT_MAX_ITER", 20); + assert!(max_iter > 0); + + let search_per_iter = get_env("K_WAY_MERGE_SORT_SEARCH_PER_ITER", 4); + assert!(search_per_iter > 0); + + Self { + schema, + sort_desc, + unsorted_streams: streams, + pending_streams, + buffer, + rows, + cur_task: 1, + limit, + total_rows: 0, + + min_task, + max_task, + max_iter, + search_per_iter, + } + } + + pub fn is_finished(&self) -> bool { + self.limit.map_or(false, |limit| self.total_rows >= limit) + || !self.has_pending_stream() && self.rows.iter().all(|x| x.is_none()) + } + + pub fn has_pending_stream(&self) -> bool { + !self.pending_streams.is_empty() + } + + pub fn poll_pending_stream(&mut self) -> Result<()> { + let mut continue_pendings = Vec::new(); + while let Some(i) = self.pending_streams.pop_front() { + debug_assert!(self.buffer[i].is_empty()); + let (input, pending) = self.unsorted_streams[i].next()?; + if pending { + continue_pendings.push(i); + continue; + } + if let Some((block, col)) = input { + self.rows[i] = Some(R::from_column(&col, &self.sort_desc)?); + self.buffer[i] = block; + } + } + self.pending_streams.extend(continue_pendings); + Ok(()) + } + + pub fn next_task(&mut self) -> Result> { + if self.is_finished() { + return Ok(vec![]); + } + + if self.has_pending_stream() { + self.poll_pending_stream()?; + if self.has_pending_stream() { + return Ok(vec![]); + } + } + + Ok(self.build_task()) + } + + fn calc_partition_point(&self) -> Partition { + let mut candidate = Candidate::new(&self.rows, EndDomain { + min: self.min_task, + max: self.max_task, + }); + candidate.init(); + + // if candidate.is_small_task() { + // todo: Consider loading multiple blocks at the same time so that we can avoid cutting out too small a task + // } + + candidate.calc_partition(self.search_per_iter, self.max_iter) + } + + fn build_task(&mut self) -> Vec { + let partition = self.calc_partition_point(); + + let id = self.next_task_id(); + self.total_rows += partition.total; + + let task: Vec<_> = partition + .ends + .iter() + .map(|&(input, pp)| { + let mut block = self.slice(input, pp); + + block.replace_meta(Box::new(SortTaskMeta { + id, + total: partition.total, + input, + })); + + block + }) + .collect(); + task + } + + fn next_task_id(&mut self) -> usize { + let id = self.cur_task; + self.cur_task += 1; + id + } + + fn slice(&mut self, i: usize, pp: usize) -> DataBlock { + let block = &self.buffer[i]; + let rows = self.rows[i].as_ref(); + let n = block.num_rows(); + + if pp < n { + let first_block = block.slice(0..pp); + self.buffer[i] = block.slice(pp..n); + self.rows[i] = Some(rows.unwrap().slice(pp..n)); + first_block + } else { + let first_block = block.clone(); + self.buffer[i] = DataBlock::empty_with_schema(self.schema.clone()); + self.rows[i] = None; + self.pending_streams.push_back(i); + first_block + } + } +} + +impl List for Option { + type Item<'a> = R::Item<'a> where R: 'a; + fn len(&self) -> usize { + match self { + Some(r) => r.len(), + None => 0, + } + } + + fn cmp_value<'a>(&'a self, i: usize, target: &R::Item<'a>) -> Ordering { + self.as_ref().unwrap().row(i).cmp(target) + } + + fn index(&self, i: usize) -> R::Item<'_> { + self.as_ref().unwrap().row(i) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub struct SortTaskMeta { + pub id: usize, + pub total: usize, + pub input: usize, +} + +impl SortTaskMeta { + pub fn as_meta(self) -> BlockMetaInfoPtr { + Box::new(self) + } +} + +#[typetag::serde(name = "sort_task")] +impl BlockMetaInfo for SortTaskMeta { + fn equals(&self, info: &Box) -> bool { + SortTaskMeta::downcast_ref_from(info).map_or(false, |info| self == info) + } + + fn clone_self(&self) -> Box { + Box::new(*self) + } +} diff --git a/src/query/pipeline/transforms/src/processors/transforms/sort/list_domain.rs b/src/query/pipeline/transforms/src/processors/transforms/sort/list_domain.rs new file mode 100644 index 000000000000..9459493363af --- /dev/null +++ b/src/query/pipeline/transforms/src/processors/transforms/sort/list_domain.rs @@ -0,0 +1,589 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp::Ordering; +use std::collections::BTreeSet; +use std::fmt::Debug; + +pub trait List +where Self: Debug +{ + type Item<'a>: Ord + Debug + where Self: 'a; + + fn len(&self) -> usize; + fn cmp_value<'a>(&'a self, i: usize, target: &Self::Item<'a>) -> Ordering; + fn index(&self, i: usize) -> Self::Item<'_>; + + fn is_empty(&self) -> bool { + self.len() == 0 + } + + fn first(&self) -> Option> { + if self.is_empty() { + None + } else { + Some(self.index(0)) + } + } + + fn last(&self) -> Option> { + if self.is_empty() { + None + } else { + Some(self.index(self.len() - 1)) + } + } + + fn domain(&self) -> EndDomain { + EndDomain { + min: 0, + max: self.len(), + } + } + + fn search<'a>(&'a self, target: &Self::Item<'a>, domain: EndDomain) -> EndDomain { + if domain.done() { + return domain; + } + let mid = domain.mid(); + if self.cmp_value(mid, target) == Ordering::Greater { + EndDomain { + min: domain.min, + max: mid, + } + } else { + EndDomain { + min: mid + 1, + max: domain.max, + } + } + } +} + +#[derive(Debug)] +pub struct Partition { + pub ends: Vec<(usize, usize)>, + pub total: usize, +} + +impl Partition { + fn new(item: TargetItem<'_, T>) -> Self + where T: List { + let TargetItem { domains, sum, .. } = item; + debug_assert!(sum.done()); + + Self { + ends: domains + .iter() + .enumerate() + .filter_map(|(i, domain)| { + debug_assert!(domain.done()); + if domain.is_zero() { + None + } else { + Some((i, domain.min)) + } + }) + .collect(), + total: sum.min, + } + } +} + +pub struct Candidate<'a, T> +where T: List +{ + all_list: &'a [T], + expect: EndDomain, + min_target: Option>, + mid_target: Option>, + max_target: Option>, +} + +struct TargetItem<'a, T> +where + T: List + 'a, + T::Item<'a>: Debug, +{ + target: T::Item<'a>, + domains: Vec, + sum: EndDomain, +} + +impl<'a, T> Candidate<'a, T> +where T: List +{ + pub fn new(all_list: &'a [T], expect: EndDomain) -> Self { + Self { + all_list, + expect, + min_target: None, + mid_target: None, + max_target: None, + } + } + + pub fn init(&mut self) -> bool { + let target: (Option>, Option>) = + self.all_list.iter().fold((None, None), |(min, max), ls| { + let min = match (min, ls.first()) { + (Some(acc), Some(v)) => Some(acc.min(v)), + (None, v @ Some(_)) | (v @ Some(_), None) => v, + (None, None) => None, + }; + let max = match (max, ls.last()) { + (Some(acc), Some(v)) => Some(acc.min(v)), + (None, v @ Some(_)) | (v @ Some(_), None) => v, + (None, None) => None, + }; + + (min, max) + }); + let (min_target, max_target) = if let (Some(min), Some(max)) = target { + (min, max) + } else { + return false; + }; + + let domains = self + .all_list + .iter() + .map(|ls| { + ls.first().map_or(EndDomain::default(), |first| { + if first > max_target { + EndDomain::default() + } else { + ls.domain() + } + }) + }) + .collect::>(); + let sum: EndDomain = domains.iter().copied().sum(); + + self.max_target = Some(TargetItem { + target: max_target, + domains: domains.clone(), + sum, + }); + + self.min_target = Some(TargetItem { + target: min_target, + domains: domains.clone(), + sum, + }); + + true + } + + pub fn is_small_task(&mut self) -> bool { + loop { + let sum = self.do_search_max(Some(8)); + match self.expect.overlaps(sum) { + Overlap::Left => return true, + Overlap::Right => return false, + Overlap::Cross if sum.done() => return false, + Overlap::Cross => (), + } + } + } + + pub fn calc_partition(mut self, n: usize, max_iter: usize) -> Partition { + for _ in 0..max_iter { + match self.overlaps() { + (_, _, Overlap::Cross) => { + let sum = self.do_search_max(Some(n)); + if self.is_finish(sum) { + return Partition::new(self.max_target.unwrap()); + } + } + (_, _, Overlap::Left) => break, + (_, None, Overlap::Right) => { + if let Some(target) = self.find_target() { + self.update_mid(target); + } else { + break; + } + } + ( + min_overlap @ (Overlap::Cross | Overlap::Left), + Some(Overlap::Cross), + Overlap::Right, + ) => { + let sum = self.do_search_mid(Some(n)); + match self.expect.overlaps(sum) { + Overlap::Right => self.cut_right(), + Overlap::Left if matches!(min_overlap, Overlap::Left) => self.cut_left(), + Overlap::Cross if sum.done() => { + return Partition::new(self.mid_target.unwrap()); + } + Overlap::Cross | Overlap::Left => (), + } + } + (Overlap::Cross, Some(Overlap::Left), Overlap::Right) => { + let sum = self.do_search_min(Some(n)); + match self.expect.overlaps(sum) { + Overlap::Left => self.cut_left(), + Overlap::Cross if sum.done() => { + return Partition::new(self.min_target.unwrap()); + } + Overlap::Cross | Overlap::Right => (), + } + } + x => { + if cfg!(debug_assertions) { + unreachable!("unreachable {x:?}"); + } else { + break; + } + } + }; + } + + self.do_search_max(None); + Partition::new(self.max_target.unwrap()) + } + + fn do_search_max(&mut self, n: Option) -> EndDomain { + do_search(self.all_list, self.max_target.as_mut().unwrap(), n) + } + + fn do_search_min(&mut self, n: Option) -> EndDomain { + do_search(self.all_list, self.min_target.as_mut().unwrap(), n) + } + + fn do_search_mid(&mut self, n: Option) -> EndDomain { + do_search(self.all_list, self.mid_target.as_mut().unwrap(), n) + } + + fn find_target<'b>(&'b self) -> Option> { + let TargetItem { + target: min_target, + domains: min_domains, + .. + } = self.min_target.as_ref().unwrap(); + + let TargetItem { + target: max_target, + domains: max_domains, + .. + } = self.max_target.as_ref().unwrap(); + + let mut targets = BTreeSet::new(); + + for ((min_domain, max_domain), ls) in min_domains + .iter() + .zip(max_domains.iter()) + .zip(self.all_list.iter()) + { + if max_domain.is_zero() { + continue; + } + let five = EndDomain { + min: min_domain.min, + max: max_domain.min, + } + .five_point(); + for v in five.into_iter().filter_map(|i| { + let v = ls.index(i); + if v >= *min_target && v <= *max_target { + Some(v) + } else { + None + } + }) { + targets.insert(v); + } + } + + let n = targets.len(); + targets.into_iter().nth(n / 2) + } + + fn update_mid(&mut self, target: T::Item<'a>) { + let max = self.max_target.as_ref().unwrap(); + + let domains = max + .domains + .iter() + .map(|domain| EndDomain { + min: 0, + max: domain.max, + }) + .collect::>(); + let sum: EndDomain = domains.iter().copied().sum(); + + self.mid_target = Some(TargetItem { + target, + domains, + sum, + }) + } + + fn is_finish(&self, domain: EndDomain) -> bool { + domain.done() && matches!(self.expect.overlaps(domain), Overlap::Cross) + } + + fn overlaps(&self) -> (Overlap, Option, Overlap) { + ( + self.expect.overlaps(self.min_target.as_ref().unwrap().sum), + self.mid_target + .as_ref() + .map(|item| self.expect.overlaps(item.sum)), + self.expect.overlaps(self.max_target.as_ref().unwrap().sum), + ) + } + + fn cut_left(&mut self) { + self.min_target = self.mid_target.take() + } + + fn cut_right(&mut self) { + self.max_target = self.mid_target.take() + } +} + +fn do_search<'a, T>( + all_list: &'a [T], + item: &mut TargetItem<'a, T>, + n: Option, +) -> EndDomain +where + T: List + 'a, +{ + let TargetItem { + target, + domains, + sum, + } = item; + + domains + .iter_mut() + .zip(all_list.iter()) + .for_each(|(domain, ls)| match n { + Some(n) => { + for _ in 0..n { + if domain.done() { + break; + } + *domain = ls.search(target, *domain) + } + } + None => { + while !domain.done() { + *domain = ls.search(target, *domain) + } + } + }); + *sum = domains.iter().copied().sum(); + *sum +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub struct EndDomain { + pub min: usize, + pub max: usize, +} + +impl EndDomain { + fn done(&self) -> bool { + self.min == self.max + } + + fn size(&self) -> usize { + self.max - self.min + } + + fn overlaps(&self, rhs: Self) -> Overlap { + if rhs.max < self.min { + return Overlap::Left; + } + if rhs.min > self.max { + return Overlap::Right; + } + Overlap::Cross + } + + fn left_half(&self) -> EndDomain { + EndDomain { + min: self.min, + max: self.mid(), + } + } + + fn right_half(&self) -> EndDomain { + EndDomain { + min: self.mid() + 1, + max: self.max, + } + } + + fn mid(&self) -> usize { + self.min + self.size() / 2 + } + + fn is_zero(&self) -> bool { + *self == EndDomain::default() + } + + fn five_point(&self) -> Vec { + match self.size() { + 0 => vec![], + 1 => vec![self.min], + 2 => vec![self.min, self.max - 1], + 3 => vec![self.min, self.min + 1, self.max - 1], + 4 => vec![self.min, self.min + 1, self.min + 2, self.max - 1], + _ => vec![ + self.min, + self.left_half().mid(), + self.mid(), + self.right_half().mid(), + self.max - 1, + ], + } + } +} + +#[derive(Debug)] +enum Overlap { + Left, + Cross, + Right, +} + +impl std::ops::Add for EndDomain { + type Output = EndDomain; + + fn add(self, rhs: Self) -> Self::Output { + EndDomain { + min: self.min + rhs.min, + max: self.max + rhs.max, + } + } +} + +impl std::iter::Sum for EndDomain { + fn sum>(iter: I) -> Self { + iter.reduce(|acc, v| acc + v).unwrap_or_default() + } +} + +impl From> for EndDomain { + fn from(value: std::ops::RangeInclusive) -> Self { + EndDomain { + min: *value.start(), + max: *value.end(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + impl List for &[i32] { + type Item<'a> = &'a i32 where Self: 'a; + + fn cmp_value(&self, i: usize, target: &&i32) -> Ordering { + self[i].cmp(target) + } + + fn len(&self) -> usize { + (*self).len() + } + + fn index(&self, i: usize) -> &i32 { + &self[i] + } + } + + #[test] + fn test_calc_partition() { + { + let all_list: Vec> = vec![vec![]]; + let all_list: Vec<_> = all_list.iter().map(|v| v.as_slice()).collect(); + run_test(&all_list, (5..=10).into(), 10); + } + + { + let all_list: Vec> = vec![ + vec![1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 3, 4, 5], + vec![1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 3, 4, 5], + ]; + let all_list: Vec<_> = all_list.iter().map(|v| v.as_slice()).collect(); + run_test(&all_list, (5..=10).into(), 10); + } + + for _ in 0..100 { + let all_list = rand_data(); + let all_list: Vec<_> = all_list.iter().map(|v| v.as_slice()).collect(); + + run_test(&all_list, (5..=10).into(), 10) + } + } + + fn rand_data() -> Vec> { + use rand::Rng; + let mut rng = rand::thread_rng(); + + (0..5) + .map(|_| { + let rows: usize = rng.gen_range(0..=20); + let mut data = (0..rows) + .map(|_| rng.gen_range(0..=1000)) + .collect::>(); + data.sort(); + data + }) + .collect::>() + } + + fn run_test(all_list: &[&[i32]], expect_size: EndDomain, max_iter: usize) { + let mut candidate = Candidate::new(all_list, expect_size); + + let got = if candidate.init() { + candidate.calc_partition(3, max_iter) + } else { + let sum: usize = all_list.iter().map(|ls| ls.len()).sum(); + assert_eq!(sum, 0); + return; + }; + + // println!("total {}", got.total); + + let sum: usize = got.ends.iter().map(|(_, end)| *end).sum(); + assert_eq!(sum, got.total, "all_list {all_list:?}"); + + let x = got + .ends + .iter() + .copied() + .map(|(i, end)| { + let ls = all_list[i]; + (ls[..end].last(), ls[end..].first()) + }) + .fold((None, None), |acc, (end, start)| { + (acc.0.max(end), match (acc.1, start) { + (None, None) => None, + (None, v @ Some(_)) | (v @ Some(_), None) => v, + (Some(a), Some(b)) => Some(a.min(b)), + }) + }); + match x { + (Some(a), Some(b)) => assert!(a < b, "all_list {all_list:?}"), + (None, None) => unreachable!(), + _ => (), + } + } +} diff --git a/src/query/pipeline/transforms/src/processors/transforms/sort/merger.rs b/src/query/pipeline/transforms/src/processors/transforms/sort/merger.rs index f5cb944f8872..fab4f3c2daeb 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/sort/merger.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/sort/merger.rs @@ -78,7 +78,7 @@ where let sorted_cursors = A::with_capacity(streams.len()); let buffer = vec![DataBlock::empty_with_schema(schema.clone()); streams.len()]; - let pending_stream = (0..streams.len()).collect(); + let pending_streams = (0..streams.len()).collect(); Self { schema, @@ -88,7 +88,7 @@ where batch_rows, limit, sort_desc, - pending_streams: pending_stream, + pending_streams, temp_sorted_num_rows: 0, temp_output_indices: vec![], temp_sorted_blocks: vec![], diff --git a/src/query/pipeline/transforms/src/processors/transforms/sort/mod.rs b/src/query/pipeline/transforms/src/processors/transforms/sort/mod.rs index bb6bdfc0aa9c..84ba57d43a72 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/sort/mod.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/sort/mod.rs @@ -14,6 +14,8 @@ pub mod algorithm; mod cursor; +mod k_way_merge_sort_partition; +mod list_domain; mod loser_tree; mod merger; mod rows; @@ -21,6 +23,8 @@ mod spill; pub mod utils; pub use cursor::*; +pub use k_way_merge_sort_partition::KWaySortPartitioner; +pub use k_way_merge_sort_partition::SortTaskMeta; pub use merger::*; pub use rows::*; pub use spill::*; diff --git a/src/query/pipeline/transforms/src/processors/transforms/sort/rows/common.rs b/src/query/pipeline/transforms/src/processors/transforms/sort/rows/common.rs index 965784432106..6db4b9188601 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/sort/rows/common.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/sort/rows/common.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::ops::Range; + use databend_common_exception::Result; use databend_common_expression::types::binary::BinaryColumn; use databend_common_expression::types::binary::BinaryColumnBuilder; @@ -53,6 +55,10 @@ impl Rows for BinaryColumn { fn try_from_column(col: &Column, _: &[SortColumnDescription]) -> Option { col.as_binary().cloned() } + + fn slice(&self, range: Range) -> Self { + self.slice(range) + } } impl RowConverter for CommonRowConverter { diff --git a/src/query/pipeline/transforms/src/processors/transforms/sort/rows/mod.rs b/src/query/pipeline/transforms/src/processors/transforms/sort/rows/mod.rs index 78b33ebedd12..692450b828ca 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/sort/rows/mod.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/sort/rows/mod.rs @@ -15,6 +15,8 @@ mod common; mod simple; +use std::fmt::Debug; + pub use common::*; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -39,9 +41,9 @@ where Self: Sized /// Rows can be compared. pub trait Rows -where Self: Sized + Clone +where Self: Sized + Clone + Debug { - type Item<'a>: Ord + type Item<'a>: Ord + Debug where Self: 'a; type Type: ArgType; @@ -68,4 +70,14 @@ where Self: Sized + Clone fn is_empty(&self) -> bool { self.len() == 0 } + + fn first(&self) -> Self::Item<'_> { + self.row(0) + } + + fn last(&self) -> Self::Item<'_> { + self.row(self.len() - 1) + } + + fn slice(&self, range: std::ops::Range) -> Self; } diff --git a/src/query/pipeline/transforms/src/processors/transforms/sort/rows/simple.rs b/src/query/pipeline/transforms/src/processors/transforms/sort/rows/simple.rs index ac11b47015bd..294733e1c3c8 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/sort/rows/simple.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/sort/rows/simple.rs @@ -14,6 +14,7 @@ use std::cmp::Reverse; use std::marker::PhantomData; +use std::ops::Range; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -30,7 +31,7 @@ use super::RowConverter; use super::Rows; /// Rows structure for single simple types. (numbers, date, timestamp) -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct SimpleRowsAsc { inner: T::Column, } @@ -66,10 +67,16 @@ where None } } + + fn slice(&self, range: Range) -> Self { + Self { + inner: T::slice_column(&self.inner, range), + } + } } /// Rows structure for single simple types. (numbers, date, timestamp) -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct SimpleRowsDesc { inner: T::Column, } @@ -106,6 +113,12 @@ where None } } + + fn slice(&self, range: Range) -> Self { + Self { + inner: T::slice_column(&self.inner, range), + } + } } /// If there is only one sort field and its type is a primitive type, @@ -129,27 +142,7 @@ where } fn convert(&mut self, columns: &[BlockEntry], num_rows: usize) -> Result> { - assert!(columns.len() == 1); - let col = &columns[0]; - if col.data_type != T::data_type() { - return Err(ErrorCode::Internal(format!( - "Cannot convert simple column. Expect data type {:?}, found {:?}", - T::data_type(), - col.data_type - ))); - } - - let col = match &col.value { - Value::Scalar(v) => { - let builder = ColumnBuilder::repeat(&v.as_ref(), num_rows, &col.data_type); - builder.build() - } - Value::Column(c) => c.clone(), - }; - - Ok(SimpleRowsAsc { - inner: T::try_downcast_column(&col).unwrap(), - }) + self.convert_rows(columns, num_rows, true) } } @@ -168,6 +161,17 @@ where } fn convert(&mut self, columns: &[BlockEntry], num_rows: usize) -> Result> { + self.convert_rows(columns, num_rows, false) + } +} + +impl SimpleRowConverter { + fn convert_rows( + &mut self, + columns: &[BlockEntry], + num_rows: usize, + asc: bool, + ) -> Result { assert!(columns.len() == 1); let col = &columns[0]; if col.data_type != T::data_type() { @@ -186,8 +190,13 @@ where Value::Column(c) => c.clone(), }; - Ok(SimpleRowsDesc { - inner: T::try_downcast_column(&col).unwrap(), - }) + let desc = [SortColumnDescription { + offset: 0, + asc, + nulls_first: false, + is_nullable: false, + }]; + + R::from_column(&col, &desc) } } diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_k_way_merge_sort.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_k_way_merge_sort.rs new file mode 100644 index 000000000000..293160dc1568 --- /dev/null +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_k_way_merge_sort.rs @@ -0,0 +1,785 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::collections::VecDeque; +use std::marker::PhantomData; +use std::ops::ControlFlow; +use std::sync::Arc; + +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::types::binary::BinaryColumn; +use databend_common_expression::types::*; +use databend_common_expression::with_number_mapped_type; +use databend_common_expression::BlockMetaInfoDowncast; +use databend_common_expression::Column; +use databend_common_expression::DataBlock; +use databend_common_expression::DataSchemaRef; +use databend_common_expression::SortColumnDescription; +use databend_common_pipeline_core::processors::Event; +use databend_common_pipeline_core::processors::InputPort; +use databend_common_pipeline_core::processors::OutputPort; +use databend_common_pipeline_core::processors::Processor; +use databend_common_pipeline_core::processors::ProcessorPtr; +use databend_common_pipeline_core::Pipe; +use databend_common_pipeline_core::PipeItem; +use databend_common_pipeline_core::Pipeline; +use match_template::match_template; + +use super::sort::algorithm::HeapSort; +use super::sort::algorithm::LoserTreeSort; +use super::sort::algorithm::SortAlgorithm; +use super::sort::KWaySortPartitioner; +use super::sort::Merger; +use super::sort::Rows; +use super::sort::SimpleRowsAsc; +use super::sort::SimpleRowsDesc; +use super::sort::SortTaskMeta; +use super::sort::SortedStream; +use super::transform_multi_sort_merge::InputBlockStream; + +pub fn add_k_way_merge_sort( + pipeline: &mut Pipeline, + schema: DataSchemaRef, + worker: usize, + block_size: usize, + limit: Option, + sort_desc: Arc>, + remove_order_col: bool, + enable_loser_tree: bool, +) -> Result<()> { + if pipeline.is_empty() { + return Err(ErrorCode::Internal("Cannot resize empty pipe.")); + } + + match pipeline.output_len() { + 0 => Err(ErrorCode::Internal("Cannot resize empty pipe.")), + 1 => Ok(()), + stream_size => { + macro_rules! add { + ($algo: ty) => {{ + let b = Builder::<$algo> { + schema, + stream_size, + worker, + sort_desc, + block_size, + limit, + remove_order_col, + _a: Default::default(), + }; + b.build(pipeline) + }}; + } + + if sort_desc.len() == 1 { + let sort_type = schema.field(sort_desc[0].offset).data_type(); + let asc = sort_desc[0].asc; + match_template! { + T = [ Date => DateType, Timestamp => TimestampType, String => StringType ], + match sort_type { + DataType::T => return match (enable_loser_tree, asc) { + (true, true) => add!(LoserTreeSort>), + (true, false) => add!(LoserTreeSort>), + (false, true) => add!(HeapSort>), + (false, false) => add!(HeapSort>), + }, + DataType::Number(num_ty) => { + return with_number_mapped_type!(|NUM_TYPE| match num_ty { + NumberDataType::NUM_TYPE => match (enable_loser_tree, asc) { + (true, true) => + add!(LoserTreeSort>>), + (true, false) => + add!(LoserTreeSort>>), + (false, true) => + add!(HeapSort>>), + (false, false) => + add!(HeapSort>>), + }, + }) + } + _ => (), + } + } + }; + if enable_loser_tree { + add!(LoserTreeSort) + } else { + add!(HeapSort) + } + } + } +} + +struct Builder +where A: SortAlgorithm +{ + schema: DataSchemaRef, + stream_size: usize, + worker: usize, + sort_desc: Arc>, + block_size: usize, + limit: Option, + remove_order_col: bool, + _a: PhantomData, +} + +impl Builder +where + A: SortAlgorithm + Send + 'static, + ::Rows: Send + 'static, +{ + fn create_partitioner(&self, input: usize) -> Pipe { + create_partitioner_pipe::( + (0..input).map(|_| InputPort::create()).collect(), + self.worker, + self.schema.clone(), + self.sort_desc.clone(), + self.block_size, + self.limit, + ) + } + + fn create_worker( + &self, + input: Arc, + stream_size: usize, + output: Arc, + batch_rows: usize, + ) -> KWayMergeWorkerProcessor { + KWayMergeWorkerProcessor::::new( + input, + stream_size, + output, + self.schema.clone(), + batch_rows, + self.sort_desc.clone(), + self.remove_order_col, + ) + } + + fn create_combiner(&self) -> Pipe { + let inputs_port = (0..self.worker) + .map(|_| InputPort::create()) + .collect::>(); + let output = OutputPort::create(); + + let processor = ProcessorPtr::create(Box::new(KWayMergeCombinerProcessor::new( + inputs_port.clone(), + output.clone(), + self.limit, + ))); + + Pipe::create(self.worker, 1, vec![PipeItem::create( + processor, + inputs_port, + vec![output], + )]) + } + + fn build(&self, pipeline: &mut Pipeline) -> Result<()> { + pipeline.add_pipe(self.create_partitioner(self.stream_size)); + + pipeline.add_transform(|input, output| { + Ok(ProcessorPtr::create(Box::new(self.create_worker( + input, + self.stream_size, + output, + self.block_size, + )))) + })?; + + pipeline.add_pipe(self.create_combiner()); + Ok(()) + } +} + +pub fn create_partitioner_pipe( + inputs_port: Vec>, + worker: usize, + schema: DataSchemaRef, + sort_desc: Arc>, + batch_rows: usize, + limit: Option, +) -> Pipe +where + R: Rows + Send + 'static, +{ + let outputs_port: Vec<_> = (0..worker).map(|_| OutputPort::create()).collect(); + let processor = ProcessorPtr::create(Box::new(KWayMergePartitionerProcessor::::new( + inputs_port.clone(), + outputs_port.clone(), + schema, + sort_desc, + batch_rows, + limit, + ))); + + Pipe::create(inputs_port.len(), worker, vec![PipeItem::create( + processor, + inputs_port, + outputs_port, + )]) +} + +pub struct KWayMergePartitionerProcessor { + partitioner: KWaySortPartitioner, + inputs: Vec>, + outputs: Vec>, + + task: VecDeque, + cur_output: Option, + next_output: usize, +} + +impl KWayMergePartitionerProcessor { + pub fn new( + inputs: Vec>, + outputs: Vec>, + schema: DataSchemaRef, + sort_desc: Arc>, + batch_rows: usize, + limit: Option, + ) -> Self { + let streams = inputs + .iter() + .map(|i| InputBlockStream::new(i.clone(), false)) + .collect::>(); + + Self { + partitioner: KWaySortPartitioner::new(schema, streams, sort_desc, batch_rows, limit), + inputs, + outputs, + task: VecDeque::new(), + cur_output: None, + next_output: 0, + } + } + + fn find_output(&mut self) -> Option { + let n = self.outputs.len(); + for mut i in self.next_output..self.next_output + n { + if i >= n { + i -= n; + } + if self.outputs[i].can_push() { + self.cur_output = Some(i); + self.next_output = if i + 1 == n { 0 } else { i + 1 }; + return self.cur_output; + } + } + None + } +} + +impl Processor for KWayMergePartitionerProcessor +where R: Rows + Send + 'static +{ + fn name(&self) -> String { + "KWayMergePartitioner".to_string() + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn event(&mut self) -> Result { + if self.outputs.iter().all(|o| o.is_finished()) { + self.inputs.iter().for_each(|i| i.finish()); + return Ok(Event::Finished); + } + + if !self.task.is_empty() { + let output = match self.cur_output { + Some(i) => &self.outputs[i], + None => match self.find_output() { + Some(cur) => &self.outputs[cur], + None => return Ok(Event::NeedConsume), + }, + }; + + if !output.can_push() { + return Ok(Event::NeedConsume); + } + + let block = self.task.pop_front().unwrap(); + debug_assert!(block.num_rows() > 0); + output.push_data(Ok(block)); + if self.task.is_empty() { + self.cur_output = None; + } + return Ok(Event::NeedConsume); + } + + if self.partitioner.is_finished() { + self.outputs.iter().for_each(|o| o.finish()); + self.inputs.iter().for_each(|i| i.finish()); + return Ok(Event::Finished); + } + + if self.cur_output.is_none() && self.find_output().is_none() { + return Ok(Event::NeedConsume); + } + + self.partitioner.poll_pending_stream()?; + if self.partitioner.has_pending_stream() { + Ok(Event::NeedData) + } else { + Ok(Event::Sync) + } + } + + fn process(&mut self) -> Result<()> { + let task = self.partitioner.next_task()?; + self.task.extend(task); + Ok(()) + } +} + +pub struct KWayMergeWorkerProcessor +where A: SortAlgorithm +{ + input: Arc, + output: Arc, + stream_size: usize, + schema: DataSchemaRef, + batch_rows: usize, + sort_desc: Arc>, + remove_order_col: bool, + + buffer: Vec, + task: Option, + output_data: VecDeque, + _a: PhantomData, +} + +impl KWayMergeWorkerProcessor +where A: SortAlgorithm +{ + pub fn new( + input: Arc, + stream_size: usize, + output: Arc, + schema: DataSchemaRef, + batch_rows: usize, + sort_desc: Arc>, + remove_order_col: bool, + ) -> Self { + Self { + input, + output, + output_data: VecDeque::new(), + stream_size, + schema, + batch_rows, + sort_desc, + remove_order_col, + buffer: Vec::new(), + task: None, + _a: Default::default(), + } + } + + fn ready(&self) -> bool { + self.task.map_or(false, |state| state.done()) + } + + fn pull(&mut self) -> Result { + if self.ready() { + return Ok(Event::Sync); + } + if !self.input.has_data() { + if self.input.is_finished() { + self.output.finish(); + return Ok(Event::Finished); + } + self.input.set_need_data(); + return Ok(Event::NeedData); + } + + let mut block = self.input.pull_data().unwrap()?; + self.input.set_need_data(); + + let incoming = SortTaskMeta::downcast_ref_from(block.get_meta().unwrap()).unwrap(); + + let rows = block.num_rows(); + match &mut self.task { + Some(task) => { + debug_assert_eq!(incoming.id, task.id); + debug_assert_eq!(incoming.total, task.total); + assert!(task.remain >= rows); + task.remain -= rows; + + if task.done() { + self.buffer.push(block); + Ok(Event::Sync) + } else { + self.buffer.push(block); + Ok(Event::NeedData) + } + } + None if incoming.total == rows => { + if self.remove_order_col { + block.pop_columns(1); + } + + self.output_data.push_back(block); + Ok(Event::NeedConsume) + } + None => { + assert!(incoming.total >= rows); + self.task = Some(TaskState { + id: incoming.id, + total: incoming.total, + remain: incoming.total - rows, + }); + self.buffer.push(block); + Ok(Event::NeedData) + } + } + } + + fn streams(&mut self) -> Vec { + let mut streams = vec![VecDeque::new(); self.stream_size]; + + for mut block in self.buffer.drain(..) { + let meta = SortTaskMeta::downcast_from(block.take_meta().unwrap()).unwrap(); + + let sort_col = block.get_last_column().clone(); + if self.remove_order_col { + block.pop_columns(1); + } + + streams[meta.input].push_back((block, sort_col)); + } + streams + } +} + +impl Processor for KWayMergeWorkerProcessor +where A: SortAlgorithm + Send + 'static +{ + fn name(&self) -> String { + "KWayMergeWorker".to_string() + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn event(&mut self) -> Result { + if self.output.is_finished() { + self.input.finish(); + return Ok(Event::Finished); + } + + if !self.output.can_push() { + return Ok(Event::NeedConsume); + } + + if let Some(block) = self.output_data.pop_front() { + debug_assert!(block.num_rows() > 0); + self.output.push_data(Ok(block)); + return Ok(Event::NeedConsume); + } + + match self.pull()? { + e @ (Event::NeedData | Event::Finished | Event::Sync) => Ok(e), + Event::NeedConsume => { + let block = self.output_data.pop_front().unwrap(); + debug_assert!(block.num_rows() > 0); + self.output.push_data(Ok(block)); + Ok(Event::NeedConsume) + } + _ => unreachable!(), + } + } + + fn process(&mut self) -> Result<()> { + debug_assert!(self.ready()); + let task = self.task.take().unwrap(); + + let mut merger = Merger::::create( + self.schema.clone(), + self.streams(), + self.sort_desc.clone(), + if task.total > self.batch_rows { + task.total / (task.total / self.batch_rows) + } else { + self.batch_rows + }, + None, + ); + + let mut rows = 0; + while let Some(block) = merger.next_block()? { + rows += block.num_rows(); + + let meta = SortTaskMeta { + id: task.id, + total: task.total, + input: 0, + } + .as_meta(); + self.output_data.push_back(block.add_meta(Some(meta))?); + } + debug_assert_eq!(rows, task.total); + debug_assert!(merger.is_finished()); + Ok(()) + } +} + +pub struct KWayMergeCombinerProcessor { + inputs: Vec>, + output: Arc, + + cur_input: Option, + cur_task: usize, + buffer: Vec>, + state: Vec>, + limit: Option, +} + +#[derive(Debug, Clone, Copy)] +struct TaskState { + id: usize, + total: usize, + remain: usize, +} + +impl TaskState { + fn done(&self) -> bool { + self.remain == 0 + } +} + +impl KWayMergeCombinerProcessor { + pub fn new(inputs: Vec>, output: Arc, limit: Option) -> Self { + let buffer = vec![VecDeque::new(); inputs.len()]; + let state = vec![None; inputs.len()]; + Self { + inputs, + output, + cur_input: None, + cur_task: 1, + buffer, + state, + limit, + } + } + + fn pull(&mut self, i: usize) -> Result { + let input = &self.inputs[i]; + let buffer = &mut self.buffer[i]; + let cur_state = &mut self.state[i]; + + if let Some(cur) = cur_state { + if cur.done() { + return if cur.id == self.cur_task { + Ok(PullEvent::Data) + } else { + Ok(PullEvent::Pending) + }; + } + } + + if input.has_data() { + let mut block = input.pull_data().unwrap()?; + input.set_need_data(); + let incoming = SortTaskMeta::downcast_from(block.take_meta().unwrap()).unwrap(); + + let task_id = match cur_state { + None => { + debug_assert!(incoming.total > 0); + debug_assert!( + incoming.id >= self.cur_task, + "disorder task. cur_task: {} incoming.id: {} incoming.total: {}", + self.cur_task, + incoming.id, + incoming.total + ); + let SortTaskMeta { id, total, .. } = incoming; + let _ = cur_state.insert(TaskState { + id, + total, + remain: total - block.num_rows(), + }); + buffer.push_back(block); + id + } + Some(cur) => { + debug_assert_eq!(cur.id, incoming.id); + debug_assert_eq!(cur.total, incoming.total); + + let rows = block.num_rows(); + if rows <= cur.remain { + buffer.push_back(block); + cur.remain -= rows; + cur.id + } else { + unreachable!() + } + } + }; + return if task_id == self.cur_task { + Ok(PullEvent::Data) + } else { + Ok(PullEvent::Pending) + }; + } + + if !buffer.is_empty() { + return if cur_state.unwrap().id == self.cur_task { + Ok(PullEvent::Data) + } else { + Ok(PullEvent::Pending) + }; + } + + if input.is_finished() { + return Ok(PullEvent::Finished); + } + + input.set_need_data(); + Ok(PullEvent::Pending) + } + + fn push(&mut self) -> bool { + let cur_input = self.cur_input.unwrap(); + let buffer = &mut self.buffer[cur_input]; + let cur_state = &mut self.state[cur_input]; + debug_assert_eq!(cur_state.unwrap().id, self.cur_task); + if let Some(block) = buffer.pop_front() { + let block = match &mut self.limit { + None => block, + Some(limit) => { + let n = block.num_rows(); + if *limit >= n { + *limit -= n; + block + } else { + let range = 0..*limit; + *limit = 0; + block.slice(range) + } + } + }; + debug_assert!(block.num_rows() > 0); + self.output.push_data(Ok(block)); + if buffer.is_empty() && cur_state.unwrap().done() { + self.cur_task += 1; + cur_state.take(); + self.cur_input = None; + } + true + } else { + false + } + } + + fn find(&mut self) -> Result { + let event = + (0..self.inputs.len()).try_fold(PullEvent::Finished, |e, i| match self.pull(i) { + Ok(PullEvent::Data) => { + self.cur_input = Some(i); + ControlFlow::Break(None) + } + Ok(PullEvent::Pending) => ControlFlow::Continue(PullEvent::Pending), + Ok(PullEvent::Finished) => ControlFlow::Continue(e), + Err(e) => ControlFlow::Break(Some(e)), + }); + match event { + ControlFlow::Continue(e) => Ok(e), + ControlFlow::Break(None) => Ok(PullEvent::Data), + ControlFlow::Break(Some(e)) => Err(e), + } + } +} + +impl Processor for KWayMergeCombinerProcessor { + fn name(&self) -> String { + "KWayMergeCombiner".to_string() + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn event(&mut self) -> Result { + if self.output.is_finished() || self.limit.map_or(false, |limit| limit == 0) { + self.inputs.iter().for_each(|i| i.finish()); + self.output.finish(); + return Ok(Event::Finished); + } + + if !self.output.can_push() { + return Ok(Event::NeedConsume); + } + + if self.cur_input.is_none() { + match self.find()? { + PullEvent::Pending => return Ok(Event::NeedData), + PullEvent::Finished => { + self.output.finish(); + return Ok(Event::Finished); + } + PullEvent::Data => (), + } + } + + let pushed = self.push(); + match self.cur_input { + Some(cur) => match (pushed, self.pull(cur)?) { + (true, PullEvent::Pending | PullEvent::Data) => Ok(Event::NeedConsume), + (true, PullEvent::Finished) => { + self.cur_input = None; + Ok(Event::NeedConsume) + } + (false, PullEvent::Pending) => Ok(Event::NeedData), + (false, PullEvent::Data) => { + if self.push() { + Ok(Event::NeedConsume) + } else { + unreachable!() + } + } + (false, PullEvent::Finished) => { + todo!("unexpected finish") + } + }, + None => match self.find()? { + PullEvent::Pending | PullEvent::Data => Ok(Event::NeedConsume), + PullEvent::Finished => { + self.output.finish(); + Ok(Event::NeedConsume) + } + }, + } + } +} + +type BlockStream = VecDeque<(DataBlock, Column)>; + +impl SortedStream for BlockStream { + fn next(&mut self) -> Result<(Option<(DataBlock, Column)>, bool)> { + Ok((self.pop_front(), false)) + } +} + +enum PullEvent { + Data, + Pending, + Finished, +} diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs index 39c59d9e17b7..ea77bd03ed00 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs @@ -42,11 +42,11 @@ use databend_common_pipeline_core::Pipeline; use super::sort::algorithm::HeapSort; use super::sort::algorithm::LoserTreeSort; use super::sort::algorithm::SortAlgorithm; +use super::sort::utils::ORDER_COL_NAME; use super::sort::Merger; use super::sort::SimpleRowsAsc; use super::sort::SimpleRowsDesc; use super::sort::SortedStream; -use crate::processors::sort::utils::ORDER_COL_NAME; pub fn try_add_multi_sort_merge( pipeline: &mut Pipeline, @@ -163,13 +163,13 @@ fn create_processor( }) } -struct BlockStream { +pub struct InputBlockStream { input: Arc, remove_order_col: bool, } -impl BlockStream { - fn new(input: Arc, remove_order_col: bool) -> Self { +impl InputBlockStream { + pub fn new(input: Arc, remove_order_col: bool) -> Self { Self { input, remove_order_col, @@ -177,7 +177,7 @@ impl BlockStream { } } -impl SortedStream for BlockStream { +impl SortedStream for InputBlockStream { fn next(&mut self) -> Result<(Option<(DataBlock, Column)>, bool)> { if self.input.has_data() { let mut block = self.input.pull_data().unwrap()?; @@ -200,7 +200,7 @@ impl SortedStream for BlockStream { pub struct MultiSortMergeProcessor where A: SortAlgorithm { - merger: Merger, + merger: Merger, /// This field is used to drive the processor's state. /// @@ -225,10 +225,10 @@ where A: SortAlgorithm ) -> Result { let streams = inputs .iter() - .map(|i| BlockStream::new(i.clone(), remove_order_col)) + .map(|i| InputBlockStream::new(i.clone(), remove_order_col)) .collect::>(); let merger = - Merger::::create(schema, streams, sort_desc, block_size, limit); + Merger::::create(schema, streams, sort_desc, block_size, limit); Ok(Self { merger, inputs, diff --git a/src/query/service/src/pipelines/builders/builder_recluster.rs b/src/query/service/src/pipelines/builders/builder_recluster.rs index 548685d7dbf1..d7eb12a7db39 100644 --- a/src/query/service/src/pipelines/builders/builder_recluster.rs +++ b/src/query/service/src/pipelines/builders/builder_recluster.rs @@ -116,18 +116,6 @@ impl PipelineBuilder { }); } - // merge sort - let final_block_size = - block_thresholds.calc_rows_per_block(task.total_bytes, task.total_rows); - let partial_block_size = if self.main_pipeline.output_len() > 1 { - std::cmp::min( - final_block_size, - self.ctx.get_settings().get_max_block_size()? as usize, - ) - } else { - final_block_size - }; - // construct output fields let output_fields = cluster_stats_gen.out_fields.clone(); let schema = DataSchemaRefExt::create(output_fields); @@ -142,11 +130,14 @@ impl PipelineBuilder { }) .collect(); + // merge sort + let sort_block_size = + block_thresholds.calc_rows_per_block(task.total_bytes, task.total_rows); + self.ctx.set_enable_sort_spill(false); let sort_pipeline_builder = SortPipelineBuilder::create(self.ctx.clone(), schema, Arc::new(sort_descs)) - .with_partial_block_size(partial_block_size) - .with_final_block_size(final_block_size) + .with_block_size_hit(sort_block_size) .remove_order_col_at_last(); sort_pipeline_builder.build_merge_sort_pipeline(&mut self.main_pipeline, false)?; diff --git a/src/query/service/src/pipelines/builders/builder_sort.rs b/src/query/service/src/pipelines/builders/builder_sort.rs index 6c933b165b0b..48a31bc99244 100644 --- a/src/query/service/src/pipelines/builders/builder_sort.rs +++ b/src/query/service/src/pipelines/builders/builder_sort.rs @@ -21,6 +21,7 @@ use databend_common_expression::SortColumnDescription; use databend_common_pipeline_core::processors::ProcessorPtr; use databend_common_pipeline_core::query_spill_prefix; use databend_common_pipeline_core::Pipeline; +use databend_common_pipeline_transforms::processors::add_k_way_merge_sort; use databend_common_pipeline_transforms::processors::sort::utils::add_order_field; use databend_common_pipeline_transforms::processors::try_add_multi_sort_merge; use databend_common_pipeline_transforms::processors::TransformPipelineHelper; @@ -97,7 +98,6 @@ impl PipelineBuilder { limit: Option, after_exchange: Option, ) -> Result<()> { - let block_size = self.settings.get_max_block_size()? as usize; let max_threads = self.settings.get_max_threads()? as usize; let sort_desc = Arc::new(sort_desc); @@ -106,11 +106,8 @@ impl PipelineBuilder { self.main_pipeline.try_resize(max_threads)?; } - let mut builder = - SortPipelineBuilder::create(self.ctx.clone(), plan_schema.clone(), sort_desc.clone()) - .with_partial_block_size(block_size) - .with_final_block_size(block_size) - .with_limit(limit); + let builder = + SortPipelineBuilder::create(self.ctx.clone(), plan_schema, sort_desc).with_limit(limit); match after_exchange { Some(true) => { @@ -119,18 +116,13 @@ impl PipelineBuilder { // as the data is already sorted in each cluster node. // The input number of the transform is equal to the number of cluster nodes. if self.main_pipeline.output_len() > 1 { - try_add_multi_sort_merge( - &mut self.main_pipeline, - plan_schema, - block_size, - limit, - sort_desc, - true, - self.ctx.get_settings().get_enable_loser_tree_merge_sort()?, - ) + builder + .remove_order_col_at_last() + .build_multi_merge(&mut self.main_pipeline) } else { - builder = builder.remove_order_col_at_last(); - builder.build_merge_sort_pipeline(&mut self.main_pipeline, true) + builder + .remove_order_col_at_last() + .build_merge_sort_pipeline(&mut self.main_pipeline, true) } } Some(false) => { @@ -142,8 +134,9 @@ impl PipelineBuilder { None => { // Build for single node mode. // We build the full sort pipeline for it. - builder = builder.remove_order_col_at_last(); - builder.build_full_sort_pipeline(&mut self.main_pipeline) + builder + .remove_order_col_at_last() + .build_full_sort_pipeline(&mut self.main_pipeline) } } } @@ -154,8 +147,7 @@ pub struct SortPipelineBuilder { schema: DataSchemaRef, sort_desc: Arc>, limit: Option, - partial_block_size: usize, - final_block_size: usize, + block_size: Option, remove_order_col_at_last: bool, } @@ -170,8 +162,7 @@ impl SortPipelineBuilder { schema, sort_desc, limit: None, - partial_block_size: 0, - final_block_size: 0, + block_size: None, remove_order_col_at_last: false, } } @@ -181,13 +172,9 @@ impl SortPipelineBuilder { self } - pub fn with_partial_block_size(mut self, partial_block_size: usize) -> Self { - self.partial_block_size = partial_block_size; - self - } - - pub fn with_final_block_size(mut self, final_block_size: usize) -> Self { - self.final_block_size = final_block_size; + // The expected output block size, the actual output block size will be equal to or less than the given value. + pub fn with_block_size_hit(mut self, block_size: usize) -> Self { + self.block_size = Some(block_size); self } @@ -268,15 +255,19 @@ impl SortPipelineBuilder { self.schema.clone() }; - let enable_loser_tree = self.ctx.get_settings().get_enable_loser_tree_merge_sort()?; - let spilling_batch_bytes = self.ctx.get_settings().get_sort_spilling_batch_bytes()?; + let settings = self.ctx.get_settings(); + + let enable_loser_tree = settings.get_enable_loser_tree_merge_sort()?; + let spilling_batch_bytes = settings.get_sort_spilling_batch_bytes()?; + let block_size = settings.get_max_block_size()? as usize; + pipeline.add_transform(|input, output| { let builder = TransformSortMergeBuilder::create( input, output, sort_merge_output_schema.clone(), self.sort_desc.clone(), - self.partial_block_size, + block_size, ) .with_limit(self.limit) .with_order_col_generated(order_col_generated) @@ -311,19 +302,45 @@ impl SortPipelineBuilder { })?; } - if need_multi_merge { - // Multi-pipelines merge sort + if !need_multi_merge { + return Ok(()); + } + + self.build_multi_merge(pipeline) + } + + pub fn build_multi_merge(self, pipeline: &mut Pipeline) -> Result<()> { + // Multi-pipelines merge sort + let settings = self.ctx.get_settings(); + + let block_size = match self.block_size { + Some(block_size) => block_size.min(settings.get_max_block_size()? as usize), + None => settings.get_max_block_size()? as usize, + }; + + let enable_loser_tree = settings.get_enable_loser_tree_merge_sort()?; + let max_threads = settings.get_max_threads()? as usize; + if settings.get_enable_parallel_multi_merge_sort()? { + add_k_way_merge_sort( + pipeline, + self.schema.clone(), + max_threads, + block_size, + self.limit, + self.sort_desc, + self.remove_order_col_at_last, + enable_loser_tree, + ) + } else { try_add_multi_sort_merge( pipeline, self.schema.clone(), - self.final_block_size, + block_size, self.limit, self.sort_desc, self.remove_order_col_at_last, - self.ctx.get_settings().get_enable_loser_tree_merge_sort()?, - )?; + enable_loser_tree, + ) } - - Ok(()) } } diff --git a/src/query/service/tests/it/pipelines/mod.rs b/src/query/service/tests/it/pipelines/mod.rs index 51427e097588..4fa71f2eb15a 100644 --- a/src/query/service/tests/it/pipelines/mod.rs +++ b/src/query/service/tests/it/pipelines/mod.rs @@ -15,3 +15,4 @@ mod builders; mod executor; mod filter; +mod transforms; diff --git a/src/query/service/tests/it/pipelines/transforms/mod.rs b/src/query/service/tests/it/pipelines/transforms/mod.rs new file mode 100644 index 000000000000..e77c67521e18 --- /dev/null +++ b/src/query/service/tests/it/pipelines/transforms/mod.rs @@ -0,0 +1,15 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod sort; diff --git a/src/query/service/tests/it/pipelines/transforms/sort.rs b/src/query/service/tests/it/pipelines/transforms/sort.rs new file mode 100644 index 000000000000..3cc91634ba0a --- /dev/null +++ b/src/query/service/tests/it/pipelines/transforms/sort.rs @@ -0,0 +1,285 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::VecDeque; +use std::sync::Arc; + +use databend_common_base::base::tokio; +use databend_common_base::base::tokio::sync::mpsc::channel; +use databend_common_base::base::tokio::sync::mpsc::Receiver; +use databend_common_exception::Result; +use databend_common_expression::block_debug::pretty_format_blocks; +use databend_common_expression::types::Int32Type; +use databend_common_expression::DataBlock; +use databend_common_expression::DataField; +use databend_common_expression::DataSchemaRefExt; +use databend_common_expression::FromData; +use databend_common_expression::SortColumnDescription; +use databend_common_pipeline_core::processors::InputPort; +use databend_common_pipeline_core::processors::OutputPort; +use databend_common_pipeline_core::processors::ProcessorPtr; +use databend_common_pipeline_core::Pipe; +use databend_common_pipeline_core::PipeItem; +use databend_common_pipeline_core::Pipeline; +use databend_common_pipeline_sinks::SyncSenderSink; +use databend_common_pipeline_sources::BlocksSource; +use databend_common_pipeline_transforms::processors::add_k_way_merge_sort; +use databend_query::pipelines::executor::ExecutorSettings; +use databend_query::pipelines::executor::QueryPipelineExecutor; +use databend_query::sessions::QueryContext; +use databend_query::test_kits::TestFixture; +use itertools::Itertools; +use parking_lot::Mutex; +use rand::rngs::ThreadRng; +use rand::Rng; + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_k_way_merge_sort() -> Result<()> { + let fixture = TestFixture::setup().await?; + let ctx = fixture.new_query_ctx().await?; + + let worker = 3; + let block_size = 4; + let limit = None; + let (data, expected) = basic_test_data(None); + let (executor, mut rx) = create_pipeline(ctx, data, worker, block_size, limit)?; + + executor.execute()?; + + let mut got: Vec = Vec::new(); + while !rx.is_empty() { + got.push(rx.recv().await.unwrap()?); + } + + check_result(got, expected); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_k_way_merge_sort_fuzz() -> Result<()> { + let mut rng = rand::thread_rng(); + let fixture = TestFixture::setup().await?; + + for _ in 0..10 { + let ctx = fixture.new_query_ctx().await?; + run_fuzz(ctx, &mut rng, false).await?; + } + + for _ in 0..10 { + let ctx = fixture.new_query_ctx().await?; + run_fuzz(ctx, &mut rng, true).await?; + } + Ok(()) +} + +async fn run_fuzz(ctx: Arc, rng: &mut ThreadRng, with_limit: bool) -> Result<()> { + let worker = rng.gen_range(1..=5); + let block_size = rng.gen_range(1..=20); + let (data, expected, limit) = random_test_data(rng, with_limit); + + // println!("\nwith_limit {with_limit}"); + // for (input, blocks) in data.iter().enumerate() { + // println!("intput {input}"); + // for b in blocks { + // println!("{:?}", b.columns()[0].value); + // } + // } + + let (executor, mut rx) = create_pipeline(ctx, data, worker, block_size, limit)?; + executor.execute()?; + + let mut got: Vec = Vec::new(); + while !rx.is_empty() { + got.push(rx.recv().await.unwrap()?); + } + + check_result(got, expected); + + Ok(()) +} + +fn create_pipeline( + ctx: Arc, + data: Vec>, + worker: usize, + block_size: usize, + limit: Option, +) -> Result<(Arc, Receiver>)> { + let mut pipeline = Pipeline::create(); + + let data_type = data[0][0].get_by_offset(0).data_type.clone(); + let source_pipe = create_source_pipe(ctx, data)?; + pipeline.add_pipe(source_pipe); + + let schema = DataSchemaRefExt::create(vec![DataField::new("a", data_type)]); + let sort_desc = Arc::new(vec![SortColumnDescription { + offset: 0, + asc: true, + nulls_first: true, + is_nullable: false, + }]); + add_k_way_merge_sort( + &mut pipeline, + schema, + worker, + block_size, + limit, + sort_desc, + false, + true, + )?; + + let (mut rx, sink_pipe) = create_sink_pipe(1)?; + let rx = rx.pop().unwrap(); + pipeline.add_pipe(sink_pipe); + pipeline.set_max_threads(3); + + let settings = ExecutorSettings { + query_id: Arc::new("".to_string()), + max_execute_time_in_seconds: Default::default(), + enable_queries_executor: false, + max_threads: 8, + executor_node_id: "".to_string(), + }; + let executor = QueryPipelineExecutor::create(pipeline, settings)?; + Ok((executor, rx)) +} + +fn create_source_pipe(ctx: Arc, data: Vec>) -> Result { + let size = data.len(); + let mut items = Vec::with_capacity(size); + + for blocks in data.into_iter() { + let output = OutputPort::create(); + items.push(PipeItem::create( + BlocksSource::create( + ctx.clone(), + output.clone(), + Arc::new(Mutex::new(VecDeque::from(blocks))), + )?, + vec![], + vec![output], + )); + } + Ok(Pipe::create(0, size, items)) +} + +fn create_sink_pipe(size: usize) -> Result<(Vec>>, Pipe)> { + let mut rxs = Vec::with_capacity(size); + let mut items = Vec::with_capacity(size); + for _index in 0..size { + let input = InputPort::create(); + let (tx, rx) = channel(1000); + rxs.push(rx); + items.push(PipeItem::create( + ProcessorPtr::create(SyncSenderSink::create(tx, input.clone())), + vec![input], + vec![], + )); + } + + Ok((rxs, Pipe::create(size, 0, items))) +} + +/// Returns (input, expected) +pub fn basic_test_data(limit: Option) -> (Vec>, DataBlock) { + let data = vec![ + vec![vec![1, 2, 3, 4], vec![4, 5, 6, 7]], + vec![vec![1, 1, 1, 1], vec![1, 10, 100, 2000]], + vec![vec![0, 2, 4, 5]], + ]; + + prepare_input_and_result(data, limit) +} + +fn prepare_input_and_result( + data: Vec>>, + limit: Option, +) -> (Vec>, DataBlock) { + let input = data + .clone() + .into_iter() + .map(|v| { + v.into_iter() + .map(|v| DataBlock::new_from_columns(vec![Int32Type::from_data(v)])) + .collect::>() + }) + .collect::>(); + let result = data + .into_iter() + .flatten() + .flatten() + .sorted() + .take(limit.unwrap_or(usize::MAX)) + .collect::>(); + let result = DataBlock::new_from_columns(vec![Int32Type::from_data(result)]); + + (input, result) +} + +fn check_result(result: Vec, expected: DataBlock) { + if expected.is_empty() { + if !result.is_empty() && !DataBlock::concat(&result).unwrap().is_empty() { + panic!( + "\nexpected empty block, but got:\n {}", + pretty_format_blocks(&result).unwrap() + ) + } + return; + } + + let result_rows: usize = result.iter().map(|v| v.num_rows()).sum(); + let result = pretty_format_blocks(&result).unwrap(); + let expected_rows = expected.num_rows(); + let expected = pretty_format_blocks(&[expected]).unwrap(); + assert_eq!( + expected, result, + "\nexpected (num_rows = {}):\n{}\nactual (num_rows = {}):\n{}", + expected_rows, expected, result_rows, result + ); +} + +fn random_test_data( + rng: &mut ThreadRng, + with_limit: bool, +) -> (Vec>, DataBlock, Option) { + let random_batch_size = rng.gen_range(1..=10); + let random_num_streams = rng.gen_range(5..=10); + + let random_data = (0..random_num_streams) + .map(|_| { + let random_num_blocks = rng.gen_range(1..=10); + let mut data = (0..random_batch_size * random_num_blocks) + .map(|_| rng.gen_range(0..=1000)) + .collect::>(); + data.sort(); + data.chunks(random_batch_size) + .map(|v| v.to_vec()) + .collect::>() + }) + .collect::>(); + + let num_rows = random_data + .iter() + .map(|v| v.iter().map(|v| v.len()).sum::()) + .sum::(); + let limit = if with_limit { + Some(rng.gen_range(0..=num_rows)) + } else { + None + }; + let (input, expected) = prepare_input_and_result(random_data, limit); + (input, expected, limit) +} diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 1ac0adf1ec20..bef9c5ae9dbc 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -879,6 +879,12 @@ impl DefaultSettings { mode: SettingMode::Both, range: Some(SettingRange::Numeric(0..=1)), }), + ("enable_parallel_multi_merge_sort", DefaultSettingValue { + value: UserSettingValue::UInt64(1), + desc: "Enables parallel multi merge sort", + mode: SettingMode::Both, + range: Some(SettingRange::Numeric(0..=1)), + }), ("enable_last_snapshot_location_hint", DefaultSettingValue { value: UserSettingValue::UInt64(1), desc: "Enables writing last_snapshot_location_hint object", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 0b39c89bf14c..521fa92c11d4 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -721,6 +721,10 @@ impl Settings { Ok(self.try_get_u64("enable_loser_tree_merge_sort")? == 1) } + pub fn get_enable_parallel_multi_merge_sort(&self) -> Result { + Ok(self.try_get_u64("enable_parallel_multi_merge_sort")? == 1) + } + pub fn get_format_null_as_str(&self) -> Result { Ok(self.try_get_u64("format_null_as_str")? == 1) } diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table.test index 248f4ca5816d..847b0de373de 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table.test @@ -835,17 +835,10 @@ select * exclude(timestamp) from clustering_information('db_09_0008','t15') statement ok alter table t15 recluster -query TTIIFFT -select * exclude(timestamp) from clustering_information('db_09_0008','t15') ----- -(abs(a)) linear 3 0 0.0 1.0 {"00001":3} - -query III -select segment_count, block_count, row_count from fuse_snapshot('db_09_0008','t15') limit 3 +query TTFF +select cluster_key, type, average_overlaps, average_depth from clustering_information('db_09_0008','t15') ---- -1 3 9 -2 3 9 -4 4 9 +(abs(a)) linear 0.0 1.0 statement ok diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0016_remote_alter_recluster.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0016_remote_alter_recluster.test index 4d09b40b950b..217db4cbd587 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0016_remote_alter_recluster.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0016_remote_alter_recluster.test @@ -76,9 +76,9 @@ statement ok ALTER TABLE t3 RECLUSTER query FFT -select average_overlaps, average_depth, block_depth_histogram from clustering_information('db_09_0016','t3') +select average_overlaps, average_depth from clustering_information('db_09_0016','t3') ---- -0.0 1.0 {"00001":2} +0.0 1.0 # test trim string statement ok diff --git a/tests/sqllogictests/suites/base/20+_others/20_0013_query_result_cache.test b/tests/sqllogictests/suites/base/20+_others/20_0013_query_result_cache.test index 1905c042f3f1..d2fb2e8c3466 100644 --- a/tests/sqllogictests/suites/base/20+_others/20_0013_query_result_cache.test +++ b/tests/sqllogictests/suites/base/20+_others/20_0013_query_result_cache.test @@ -13,6 +13,9 @@ CREATE TABLE IF NOT EXISTS t1 (a INT not null); statement ok INSERT INTO t1 VALUES (1), (2), (3); +statement ok +set enable_parallel_multi_merge_sort = 0; + query I SELECT * FROM t1 ORDER BY a; ---- diff --git a/tests/sqllogictests/suites/mode/standalone/explain/clustering.test b/tests/sqllogictests/suites/mode/standalone/explain/clustering.test index 00f9c4e6c7bd..06828b11dff7 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/clustering.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/clustering.test @@ -66,7 +66,7 @@ ALTER TABLE test_hilbert RECLUSTER FINAL; query TTIIFFT select * exclude(timestamp) from clustering_information('default','test_hilbert') ---- -(a, b) hilbert 2 0 0.0 1.0 {"00001":2} +(a, b) hilbert 2 1 0.0 1.0 {"00001":2} query T EXPLAIN SELECT * FROM test_hilbert WHERE a = 1; @@ -96,7 +96,7 @@ Filter └── TableScan ├── table: default.default.test_hilbert ├── output columns: [a (#0), b (#1)] - ├── read rows: 2 + ├── read rows: 3 ├── read size: < 1 KiB ├── partitions total: 2 ├── partitions scanned: 1 diff --git a/tests/sqllogictests/suites/mode/standalone/explain/sort.test b/tests/sqllogictests/suites/mode/standalone/explain/sort.test index 5e3a47907db0..a501abeb3f79 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/sort.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/sort.test @@ -75,12 +75,14 @@ query T explain pipeline select a, b from t1 order by a; ---- CompoundBlockOperator(Project) × 1 - Merge to MultiSortMerge × 1 - TransformSortMerge × 4 - SortPartialTransform × 4 - Merge to Resize × 4 - DeserializeDataTransform × 1 - SyncReadParquetDataSource × 1 + Merge to KWayMergeCombiner × 1 + KWayMergeWorker × 4 + KWayMergePartitioner × 1 + TransformSortMerge × 4 + SortPartialTransform × 4 + Merge to Resize × 4 + DeserializeDataTransform × 1 + SyncReadParquetDataSource × 1 # Sort spilling @@ -91,13 +93,15 @@ query T explain pipeline select a, b from t1 order by a; ---- CompoundBlockOperator(Project) × 1 - Merge to MultiSortMerge × 1 - TransformSortSpill × 4 - TransformSortMerge × 4 - SortPartialTransform × 4 - Merge to Resize × 4 - DeserializeDataTransform × 1 - SyncReadParquetDataSource × 1 + Merge to KWayMergeCombiner × 1 + KWayMergeWorker × 4 + KWayMergePartitioner × 1 + TransformSortSpill × 4 + TransformSortMerge × 4 + SortPartialTransform × 4 + Merge to Resize × 4 + DeserializeDataTransform × 1 + SyncReadParquetDataSource × 1 statement ok set sort_spilling_memory_ratio = 0; @@ -107,13 +111,15 @@ query T explain pipeline select a + 1, b from t1 order by a + 1; ---- CompoundBlockOperator(Project) × 1 - Merge to MultiSortMerge × 1 - TransformSortMerge × 4 - SortPartialTransform × 4 - Merge to Resize × 4 - CompoundBlockOperator(Map) × 1 - DeserializeDataTransform × 1 - SyncReadParquetDataSource × 1 + Merge to KWayMergeCombiner × 1 + KWayMergeWorker × 4 + KWayMergePartitioner × 1 + TransformSortMerge × 4 + SortPartialTransform × 4 + Merge to Resize × 4 + CompoundBlockOperator(Map) × 1 + DeserializeDataTransform × 1 + SyncReadParquetDataSource × 1 # Sort spilling @@ -124,14 +130,16 @@ query T explain pipeline select a + 1, b from t1 order by a + 1; ---- CompoundBlockOperator(Project) × 1 - Merge to MultiSortMerge × 1 - TransformSortSpill × 4 - TransformSortMerge × 4 - SortPartialTransform × 4 - Merge to Resize × 4 - CompoundBlockOperator(Map) × 1 - DeserializeDataTransform × 1 - SyncReadParquetDataSource × 1 + Merge to KWayMergeCombiner × 1 + KWayMergeWorker × 4 + KWayMergePartitioner × 1 + TransformSortSpill × 4 + TransformSortMerge × 4 + SortPartialTransform × 4 + Merge to Resize × 4 + CompoundBlockOperator(Map) × 1 + DeserializeDataTransform × 1 + SyncReadParquetDataSource × 1 statement ok drop table if exists t1; diff --git a/tests/sqllogictests/suites/mode/standalone/explain/window.test b/tests/sqllogictests/suites/mode/standalone/explain/window.test index 24089fc57691..97f9e3c49360 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/window.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/window.test @@ -44,6 +44,9 @@ set max_threads=4; statement ok set sort_spilling_memory_ratio = 0; +statement ok +set enable_parallel_multi_merge_sort = 0; + query T explain pipeline SELECT depname, empno, salary, sum(salary) OVER (PARTITION BY depname ORDER BY empno) FROM empsalary ORDER BY depname, empno; ----