Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(query): use pipeline exchange to refactor window scatter #16471

Merged
merged 5 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/query/pipeline/core/src/processors/shuffle_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ pub enum MultiwayStrategy {
pub trait Exchange: Send + Sync + 'static {
const STRATEGY: MultiwayStrategy = MultiwayStrategy::Random;

fn partition(&self, state: DataBlock, n: usize) -> Result<Vec<DataBlock>>;
fn partition(&self, data_block: DataBlock, n: usize) -> Result<Vec<DataBlock>>;

fn multiway_pick(&self, partitions: &[Option<DataBlock>]) -> Result<usize>;
fn multiway_pick(&self, _partitions: &[Option<DataBlock>]) -> Result<usize> {
unimplemented!()
}
}

pub struct ShuffleProcessor {
Expand Down
78 changes: 27 additions & 51 deletions src/query/service/src/pipelines/builders/builder_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@ use databend_common_expression::with_number_mapped_type;
use databend_common_expression::SortColumnDescription;
use databend_common_pipeline_core::processors::Processor;
use databend_common_pipeline_core::processors::ProcessorPtr;
use databend_common_pipeline_core::Pipe;
use databend_common_sql::executor::physical_plans::Window;
use databend_common_sql::executor::physical_plans::WindowPartition;

use crate::pipelines::processors::transforms::FrameBound;
use crate::pipelines::processors::transforms::TransformWindowPartitionCollect;
use crate::pipelines::processors::transforms::TransformWindowPartitionScatter;
use crate::pipelines::processors::transforms::WindowFunctionInfo;
use crate::pipelines::processors::transforms::WindowPartitionExchange;
use crate::pipelines::processors::transforms::WindowSpillSettings;
use crate::pipelines::processors::TransformWindow;
use crate::pipelines::PipelineBuilder;
Expand Down Expand Up @@ -149,11 +148,13 @@ impl PipelineBuilder {
let window_spill_settings = WindowSpillSettings::new(settings.clone(), num_processors)?;

let plan_schema = window_partition.output_schema()?;

let partition_by = window_partition
.partition_by
.iter()
.map(|index| plan_schema.index_of(&index.to_string()))
.collect::<Result<Vec<_>>>()?;

let sort_desc = window_partition
.order_by
.iter()
Expand All @@ -167,59 +168,34 @@ impl PipelineBuilder {
})
})
.collect::<Result<Vec<_>>>()?;

let have_order_col = window_partition.after_exchange.unwrap_or(false);

// 1. Build window partition scatter processors.
let mut pipe_items = Vec::with_capacity(num_processors);
for _ in 0..num_processors {
let processor = TransformWindowPartitionScatter::new(
num_processors,
num_partitions,
partition_by.clone(),
)?;
pipe_items.push(processor.into_pipe_item());
}
self.main_pipeline.add_pipe(Pipe::create(
self.main_pipeline.exchange(
num_processors,
num_processors * num_processors,
pipe_items,
));

// 2. Build shuffle processor.
let mut rule = Vec::with_capacity(num_processors * num_processors);
for i in 0..num_processors * num_processors {
rule.push(
(i * num_processors + i / num_processors) % (num_processors * num_processors),
);
}
self.main_pipeline.reorder_inputs(rule);
WindowPartitionExchange::create(partition_by.clone(), num_partitions),
);

// 3. Build window partition collect processors.
let processor_id = AtomicUsize::new(0);
let mut pipe_items = Vec::with_capacity(num_processors);
for _ in 0..num_processors {
let processor = TransformWindowPartitionCollect::new(
self.ctx.clone(),
processor_id.fetch_add(1, std::sync::atomic::Ordering::AcqRel),
num_processors,
num_partitions,
window_spill_settings.clone(),
sort_desc.clone(),
plan_schema.clone(),
max_block_size,
sort_block_size,
sort_spilling_batch_bytes,
enable_loser_tree,
have_order_col,
)?;
pipe_items.push(processor.into_pipe_item());
}
self.main_pipeline.add_pipe(Pipe::create(
num_processors * num_processors,
num_processors,
pipe_items,
));

Ok(())
self.main_pipeline.add_transform(|input, output| {
Ok(ProcessorPtr::create(Box::new(
TransformWindowPartitionCollect::new(
self.ctx.clone(),
input,
output,
processor_id.fetch_add(1, std::sync::atomic::Ordering::AcqRel),
num_processors,
num_partitions,
window_spill_settings.clone(),
sort_desc.clone(),
plan_schema.clone(),
max_block_size,
sort_block_size,
sort_spilling_batch_bytes,
enable_loser_tree,
have_order_col,
)?,
)))
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
// limitations under the License.

mod transform_window_partition_collect;
mod transform_window_partition_scatter;
mod window_partition_buffer;
mod window_partition_exchange;
mod window_partition_meta;

pub use transform_window_partition_collect::*;
pub use transform_window_partition_scatter::*;
pub use window_partition_buffer::*;
pub use window_partition_exchange::*;
pub use window_partition_meta::*;
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ use databend_common_pipeline_core::processors::Event;
use databend_common_pipeline_core::processors::InputPort;
use databend_common_pipeline_core::processors::OutputPort;
use databend_common_pipeline_core::processors::Processor;
use databend_common_pipeline_core::processors::ProcessorPtr;
use databend_common_pipeline_core::PipeItem;
use databend_common_pipeline_transforms::processors::sort_merge;

use super::WindowPartitionBuffer;
Expand Down Expand Up @@ -58,7 +56,7 @@ pub enum AsyncStep {
}

pub struct TransformWindowPartitionCollect {
inputs: Vec<Arc<InputPort>>,
input: Arc<InputPort>,
output: Arc<OutputPort>,

restored_data_blocks: Vec<DataBlock>,
Expand Down Expand Up @@ -86,6 +84,8 @@ impl TransformWindowPartitionCollect {
#[allow(clippy::too_many_arguments)]
pub fn new(
ctx: Arc<QueryContext>,
input: Arc<InputPort>,
output: Arc<OutputPort>,
processor_id: usize,
num_processors: usize,
num_partitions: usize,
Expand All @@ -98,9 +98,6 @@ impl TransformWindowPartitionCollect {
enable_loser_tree: bool,
have_order_col: bool,
) -> Result<Self> {
let inputs = (0..num_processors).map(|_| InputPort::create()).collect();
let output = OutputPort::create();

// Calculate the partition ids collected by the processor.
let partitions: Vec<usize> = (0..num_partitions)
.filter(|&partition| partition % num_processors == processor_id)
Expand All @@ -117,38 +114,29 @@ impl TransformWindowPartitionCollect {
WindowPartitionBuffer::new(ctx, partitions.len(), sort_block_size, spill_settings)?;

Ok(Self {
inputs,
input,
output,
output_data_blocks: VecDeque::new(),
restored_data_blocks: Vec::new(),
partition_id,
buffer,
sort_desc,
schema,
max_block_size,
sort_spilling_batch_bytes,
enable_loser_tree,
have_order_col,
step: Step::Sync(SyncStep::Collect),
enable_loser_tree,
sort_spilling_batch_bytes,
is_collect_finished: false,
output_data_blocks: VecDeque::new(),
restored_data_blocks: Vec::new(),
step: Step::Sync(SyncStep::Collect),
})
}

pub fn into_pipe_item(self) -> PipeItem {
let inputs = self.inputs.clone();
let outputs = vec![self.output.clone()];
let processor_ptr = ProcessorPtr::create(Box::new(self));
PipeItem::create(processor_ptr, inputs, outputs)
}

fn next_step(&mut self, step: Step) -> Result<Event> {
let event = match step {
Step::Sync(_) => Event::Sync,
Step::Async(_) => Event::Async,
Step::Finish => {
for input in self.inputs.iter() {
input.finish();
}
self.input.finish();
self.output.finish();
Event::Finished
}
Expand All @@ -158,41 +146,36 @@ impl TransformWindowPartitionCollect {
}

fn collect(&mut self) -> Result<Event> {
let mut finished_input = 0;
for input in self.inputs.iter() {
if input.is_finished() {
finished_input += 1;
continue;
}

if input.has_data() {
Self::collect_data_block(
input.pull_data().unwrap()?,
&self.partition_id,
&mut self.buffer,
);
}
if self.output.is_finished() {
self.input.finish();
return self.next_step(Step::Finish);
}

if input.is_finished() {
finished_input += 1;
} else {
input.set_need_data();
}
// First check. flush memory data to external storage if need
if self.need_spill() {
return self.next_step(Step::Async(AsyncStep::Spill));
}

if finished_input == self.inputs.len() {
self.is_collect_finished = true;
if self.input.has_data() {
Self::collect_data_block(
self.input.pull_data().unwrap()?,
&self.partition_id,
&mut self.buffer,
);
}

// Check again. flush memory data to external storage if need
if self.need_spill() {
return self.next_step(Step::Async(AsyncStep::Spill));
}

if self.is_collect_finished {
self.next_step(Step::Async(AsyncStep::Restore))
} else {
Ok(Event::NeedData)
if self.input.is_finished() {
self.is_collect_finished = true;
return self.next_step(Step::Async(AsyncStep::Restore));
}

self.input.set_need_data();
Ok(Event::NeedData)
}

fn output(&mut self) -> Result<Event> {
Expand All @@ -213,10 +196,9 @@ impl TransformWindowPartitionCollect {
return Ok(Event::NeedConsume);
}

if !self.buffer.is_empty() {
self.next_step(Step::Async(AsyncStep::Restore))
} else {
self.next_step(Step::Finish)
match self.buffer.is_empty() {
true => self.next_step(Step::Finish),
false => self.next_step(Step::Async(AsyncStep::Restore)),
}
}
}
Expand All @@ -232,28 +214,28 @@ impl Processor for TransformWindowPartitionCollect {
}

fn event(&mut self) -> Result<Event> {
// (collect <--> spill) -> (sort <--> restore) -> finish
match self.step {
Step::Sync(sync_step) => match sync_step {
SyncStep::Collect => self.collect(),
SyncStep::Sort => self.output(),
},
Step::Async(async_step) => match async_step {
AsyncStep::Spill => {
if self.need_spill() {
self.next_step(Step::Async(AsyncStep::Spill))
} else if !self.is_collect_finished {
self.collect()
} else {
AsyncStep::Spill => match self.is_collect_finished {
true => {
self.step = Step::Sync(SyncStep::Sort);
self.output()
}
}
AsyncStep::Restore => {
if !self.restored_data_blocks.is_empty() {
self.next_step(Step::Sync(SyncStep::Sort))
} else {
self.next_step(Step::Finish)
false => {
// collect data again.
self.step = Step::Sync(SyncStep::Collect);
self.collect()
}
}
},
AsyncStep::Restore => match self.restored_data_blocks.is_empty() {
true => self.next_step(Step::Finish),
false => self.next_step(Step::Sync(SyncStep::Sort)),
},
},
Step::Finish => Ok(Event::Finished),
}
Expand Down
Loading
Loading