Skip to content

Commit

Permalink
feat(query): parallel k way merge sort (#16340)
Browse files Browse the repository at this point in the history
* merge_path

Signed-off-by: coldWater <forsaken628@gmail.com>

* KWaySortPartition

Signed-off-by: coldWater <forsaken628@gmail.com>

* KWayMergeWorkerProcessor

Signed-off-by: coldWater <forsaken628@gmail.com>

* stream

Signed-off-by: coldWater <forsaken628@gmail.com>

* BlockStream

Signed-off-by: coldWater <forsaken628@gmail.com>

* update

Signed-off-by: coldWater <forsaken628@gmail.com>

* event

Signed-off-by: coldWater <forsaken628@gmail.com>

* builder

Signed-off-by: coldWater <forsaken628@gmail.com>

* settings

Signed-off-by: coldWater <forsaken628@gmail.com>

* clean

Signed-off-by: coldWater <forsaken628@gmail.com>

* fix

Signed-off-by: coldWater <forsaken628@gmail.com>

* bypass some task

Signed-off-by: coldWater <forsaken628@gmail.com>

* test

Signed-off-by: coldWater <forsaken628@gmail.com>

* refine

Signed-off-by: coldWater <forsaken628@gmail.com>

* enable

Signed-off-by: coldWater <forsaken628@gmail.com>

* list domain

Signed-off-by: coldWater <forsaken628@gmail.com>

* refine

Signed-off-by: coldWater <forsaken628@gmail.com>

* integrate

Signed-off-by: coldWater <forsaken628@gmail.com>

* fix

Signed-off-by: coldWater <forsaken628@gmail.com>

* fix

Signed-off-by: coldWater <forsaken628@gmail.com>

* search per iter

Signed-off-by: coldWater <forsaken628@gmail.com>

* fix

Signed-off-by: coldWater <forsaken628@gmail.com>

* use block meta

Signed-off-by: coldWater <forsaken628@gmail.com>

* rename

Signed-off-by: coldWater <forsaken628@gmail.com>

* fix

Signed-off-by: coldWater <forsaken628@gmail.com>

* factor

Signed-off-by: coldWater <forsaken628@gmail.com>

* bump quinn

Signed-off-by: coldWater <forsaken628@gmail.com>

* fix test

Signed-off-by: coldWater <forsaken628@gmail.com>

* fix test

* fix test

Signed-off-by: coldWater <forsaken628@gmail.com>

* fix

Signed-off-by: coldWater <forsaken628@gmail.com>

* with_block_size_hit

Signed-off-by: coldWater <forsaken628@gmail.com>

* fix

Signed-off-by: coldWater <forsaken628@gmail.com>

---------

Signed-off-by: coldWater <forsaken628@gmail.com>
Co-authored-by: zhyass <mytesla@live.com>
Co-authored-by: sundyli <543950155@qq.com>
  • Loading branch information
3 people committed Sep 21, 2024
1 parent 356e5cb commit c5f9329
Show file tree
Hide file tree
Showing 26 changed files with 2,136 additions and 146 deletions.
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/query/pipeline/transforms/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@
#![feature(core_intrinsics)]
#![feature(int_roundings)]
#![feature(let_chains)]
#![feature(iter_map_windows)]

pub mod processors;
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ mod transform_compact_block;
mod transform_compact_builder;
mod transform_compact_no_split_builder;
mod transform_dummy;
mod transform_k_way_merge_sort;
mod transform_multi_sort_merge;
mod transform_pipeline_helper;
mod transform_retry_async;
mod transform_sort_merge;
mod transform_sort_merge_base;
mod transform_sort_merge_limit;

pub mod transform_sort_partial;

pub use transform::*;
pub use transform_accumulating::*;
pub use transform_accumulating_async::*;
Expand All @@ -39,6 +40,7 @@ pub use transform_compact_block::*;
pub use transform_compact_builder::*;
pub use transform_compact_no_split_builder::*;
pub use transform_dummy::*;
pub use transform_k_way_merge_sort::*;
pub use transform_multi_sort_merge::try_add_multi_sort_merge;
pub use transform_pipeline_helper::TransformPipelineHelper;
pub use transform_retry_async::*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ where
pub input_index: usize,
pub row_index: usize,

num_rows: usize,
_o: PhantomData<O>,

/// rows within [`Cursor`] should be monotonic.
Expand All @@ -52,7 +51,7 @@ where

#[inline]
pub fn is_finished(&self) -> bool {
self.num_rows == self.row_index
self.rows.len() == self.row_index
}

#[inline]
Expand All @@ -62,12 +61,12 @@ where

#[inline]
pub fn last(&self) -> R::Item<'_> {
self.rows.row(self.num_rows - 1)
self.rows.last()
}

#[inline]
pub fn num_rows(&self) -> usize {
self.num_rows
self.rows.len()
}

pub fn cursor_mut(&self) -> CursorMut<'_, R, O> {
Expand All @@ -87,7 +86,6 @@ pub trait CursorOrder<R: Rows>: Sized + Copy {
Cursor::<R, Self> {
input_index,
row_index: 0,
num_rows: rows.len(),
rows,
_o: PhantomData,
}
Expand Down Expand Up @@ -153,7 +151,7 @@ where
}

pub fn is_finished(&self) -> bool {
self.row_index == self.cursor.num_rows
self.row_index == self.cursor.rows.len()
}

pub fn current<'b>(&'b self) -> R::Item<'a> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cmp::Ordering;
use std::collections::VecDeque;
use std::sync::Arc;

use databend_common_exception::Result;
use databend_common_expression::BlockMetaInfo;
use databend_common_expression::BlockMetaInfoDowncast;
use databend_common_expression::BlockMetaInfoPtr;
use databend_common_expression::DataBlock;
use databend_common_expression::DataSchemaRef;
use databend_common_expression::SortColumnDescription;

use super::list_domain::Candidate;
use super::list_domain::EndDomain;
use super::list_domain::List;
use super::list_domain::Partition;
use super::Rows;
use super::SortedStream;

pub struct KWaySortPartitioner<R, S>
where
R: Rows,
S: SortedStream,
{
schema: DataSchemaRef,
sort_desc: Arc<Vec<SortColumnDescription>>,
unsorted_streams: Vec<S>,
pending_streams: VecDeque<usize>,

buffer: Vec<DataBlock>,
rows: Vec<Option<R>>,
cur_task: usize,

limit: Option<usize>,
total_rows: usize,

// settings
min_task: usize,
max_task: usize,
max_iter: usize,
search_per_iter: usize,
}

impl<R, S> KWaySortPartitioner<R, S>
where
R: Rows,
S: SortedStream,
{
pub fn new(
schema: DataSchemaRef,
streams: Vec<S>,
sort_desc: Arc<Vec<SortColumnDescription>>,
batch_rows: usize,
limit: Option<usize>,
) -> Self {
debug_assert!(streams.len() > 1, "streams.len() = {}", streams.len());

let buffer = vec![DataBlock::empty_with_schema(schema.clone()); streams.len()];
let rows = vec![None; streams.len()];
let pending_streams = (0..streams.len()).collect();

fn get_env<T: std::str::FromStr + Copy>(key: &str, default: T) -> T {
std::env::var(key).map_or(default, |s| s.parse::<T>().unwrap_or(default))
}

let min_task =
(batch_rows as f64 * get_env("K_WAY_MERGE_SORT_MIN_TASK_FACTOR", 2.0)) as usize;
assert!(min_task > 0);

let max_task =
(batch_rows as f64 * get_env("K_WAY_MERGE_SORT_MAX_TASK_FACTOR", 4.0)) as usize;
assert!(max_task > 0);

let max_iter = get_env("K_WAY_MERGE_SORT_MAX_ITER", 20);
assert!(max_iter > 0);

let search_per_iter = get_env("K_WAY_MERGE_SORT_SEARCH_PER_ITER", 4);
assert!(search_per_iter > 0);

Self {
schema,
sort_desc,
unsorted_streams: streams,
pending_streams,
buffer,
rows,
cur_task: 1,
limit,
total_rows: 0,

min_task,
max_task,
max_iter,
search_per_iter,
}
}

pub fn is_finished(&self) -> bool {
self.limit.map_or(false, |limit| self.total_rows >= limit)
|| !self.has_pending_stream() && self.rows.iter().all(|x| x.is_none())
}

pub fn has_pending_stream(&self) -> bool {
!self.pending_streams.is_empty()
}

pub fn poll_pending_stream(&mut self) -> Result<()> {
let mut continue_pendings = Vec::new();
while let Some(i) = self.pending_streams.pop_front() {
debug_assert!(self.buffer[i].is_empty());
let (input, pending) = self.unsorted_streams[i].next()?;
if pending {
continue_pendings.push(i);
continue;
}
if let Some((block, col)) = input {
self.rows[i] = Some(R::from_column(&col, &self.sort_desc)?);
self.buffer[i] = block;
}
}
self.pending_streams.extend(continue_pendings);
Ok(())
}

pub fn next_task(&mut self) -> Result<Vec<DataBlock>> {
if self.is_finished() {
return Ok(vec![]);
}

if self.has_pending_stream() {
self.poll_pending_stream()?;
if self.has_pending_stream() {
return Ok(vec![]);
}
}

Ok(self.build_task())
}

fn calc_partition_point(&self) -> Partition {
let mut candidate = Candidate::new(&self.rows, EndDomain {
min: self.min_task,
max: self.max_task,
});
candidate.init();

// if candidate.is_small_task() {
// todo: Consider loading multiple blocks at the same time so that we can avoid cutting out too small a task
// }

candidate.calc_partition(self.search_per_iter, self.max_iter)
}

fn build_task(&mut self) -> Vec<DataBlock> {
let partition = self.calc_partition_point();

let id = self.next_task_id();
self.total_rows += partition.total;

let task: Vec<_> = partition
.ends
.iter()
.map(|&(input, pp)| {
let mut block = self.slice(input, pp);

block.replace_meta(Box::new(SortTaskMeta {
id,
total: partition.total,
input,
}));

block
})
.collect();
task
}

fn next_task_id(&mut self) -> usize {
let id = self.cur_task;
self.cur_task += 1;
id
}

fn slice(&mut self, i: usize, pp: usize) -> DataBlock {
let block = &self.buffer[i];
let rows = self.rows[i].as_ref();
let n = block.num_rows();

if pp < n {
let first_block = block.slice(0..pp);
self.buffer[i] = block.slice(pp..n);
self.rows[i] = Some(rows.unwrap().slice(pp..n));
first_block
} else {
let first_block = block.clone();
self.buffer[i] = DataBlock::empty_with_schema(self.schema.clone());
self.rows[i] = None;
self.pending_streams.push_back(i);
first_block
}
}
}

impl<R: Rows> List for Option<R> {
type Item<'a> = R::Item<'a> where R: 'a;
fn len(&self) -> usize {
match self {
Some(r) => r.len(),
None => 0,
}
}

fn cmp_value<'a>(&'a self, i: usize, target: &R::Item<'a>) -> Ordering {
self.as_ref().unwrap().row(i).cmp(target)
}

fn index(&self, i: usize) -> R::Item<'_> {
self.as_ref().unwrap().row(i)
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct SortTaskMeta {
pub id: usize,
pub total: usize,
pub input: usize,
}

impl SortTaskMeta {
pub fn as_meta(self) -> BlockMetaInfoPtr {
Box::new(self)
}
}

#[typetag::serde(name = "sort_task")]
impl BlockMetaInfo for SortTaskMeta {
fn equals(&self, info: &Box<dyn BlockMetaInfo>) -> bool {
SortTaskMeta::downcast_ref_from(info).map_or(false, |info| self == info)
}

fn clone_self(&self) -> Box<dyn BlockMetaInfo> {
Box::new(*self)
}
}
Loading

0 comments on commit c5f9329

Please sign in to comment.