Skip to content

Commit

Permalink
Merge branch 'main' into copy_optimize
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Oct 19, 2022
2 parents 26e9207 + bb8a27b commit c40b048
Show file tree
Hide file tree
Showing 14 changed files with 114 additions and 709 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ use common_base::base::GlobalIORuntime;
use common_base::base::TrySpawn;
use common_datablocks::DataBlock;
use common_exception::Result;
use common_pipeline_core::processors::port::OutputPort;
use common_pipeline_core::Pipeline;
use common_pipeline_core::SourcePipeBuilder;
use futures::AsyncRead;
use futures_util::stream::FuturesUnordered;
use futures_util::AsyncReadExt;
Expand Down Expand Up @@ -204,17 +202,11 @@ pub trait InputFormatPipe: Sized + Send + 'static {
row_batch_rx: async_channel::Receiver<Self::RowBatch>,
pipeline: &mut Pipeline,
) -> Result<()> {
let mut builder = SourcePipeBuilder::create();
for _ in 0..ctx.settings.get_max_threads()? {
let output = OutputPort::create();
let source = DeserializeSource::<Self>::create(
ctx.clone(),
output.clone(),
row_batch_rx.clone(),
)?;
builder.add_source(output, source);
}
pipeline.add_pipe(builder.finalize());
let max_threads = ctx.settings.get_max_threads()? as usize;
pipeline.add_source(
|output| DeserializeSource::<Self>::create(ctx.clone(), output, row_batch_rx.clone()),
max_threads,
)?;
Ok(())
}

Expand All @@ -223,7 +215,6 @@ pub trait InputFormatPipe: Sized + Send + 'static {
split_rx: async_channel::Receiver<Result<Split<Self>>>,
pipeline: &mut Pipeline,
) -> Result<()> {
let mut builder = SourcePipeBuilder::create();
let n_threads = ctx.settings.get_max_threads()? as usize;
let max_aligner = match ctx.plan {
InputPlan::CopyInto(_) => ctx.splits.len(),
Expand All @@ -236,17 +227,17 @@ pub trait InputFormatPipe: Sized + Send + 'static {
}
};
let (row_batch_tx, row_batch_rx) = crossbeam_channel::bounded(n_threads);
for _ in 0..std::cmp::min(max_aligner, n_threads) {
let output = OutputPort::create();
let source = Aligner::<Self>::try_create(
output.clone(),
ctx.clone(),
split_rx.clone(),
row_batch_tx.clone(),
)?;
builder.add_source(output, source);
}
pipeline.add_pipe(builder.finalize());
pipeline.add_source(
|output| {
Aligner::<Self>::try_create(
output,
ctx.clone(),
split_rx.clone(),
row_batch_tx.clone(),
)
},
std::cmp::min(max_aligner, n_threads),
)?;
pipeline.resize(n_threads)?;
pipeline.add_transform(|input, output| {
DeserializeTransformer::<Self>::create(ctx.clone(), input, output, row_batch_rx.clone())
Expand Down
31 changes: 3 additions & 28 deletions src/query/service/src/api/rpc/exchange/exchange_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ use crate::api::rpc::exchange::exchange_params::ExchangeParams;
use crate::api::rpc::exchange::exchange_sink_merge::ExchangeMergeSink;
use crate::api::rpc::exchange::exchange_sink_shuffle::ExchangePublisherSink;
use crate::clusters::ClusterHelper;
use crate::pipelines::processors::port::InputPort;
use crate::pipelines::Pipeline;
use crate::pipelines::SinkPipeBuilder;
use crate::sessions::QueryContext;
use crate::sessions::TableContext;

Expand All @@ -46,33 +44,10 @@ impl ExchangeSink {
)));
}

let mut sink_builder = SinkPipeBuilder::create();

for _index in 0..pipeline.output_len() {
let input = InputPort::create();
sink_builder.add_sink(
input.clone(),
ExchangeMergeSink::try_create(ctx.clone(), input.clone(), params)?,
);
}

pipeline.add_pipe(sink_builder.finalize());
Ok(())
}
ExchangeParams::ShuffleExchange(params) => {
let mut sink_builder = SinkPipeBuilder::create();

for _index in 0..pipeline.output_len() {
let input = InputPort::create();
sink_builder.add_sink(
input.clone(),
ExchangePublisherSink::try_create(ctx.clone(), input, params)?,
);
}

pipeline.add_pipe(sink_builder.finalize());
Ok(())
pipeline.add_sink(|input| ExchangeMergeSink::try_create(ctx.clone(), input, params))
}
ExchangeParams::ShuffleExchange(params) => pipeline
.add_sink(|input| ExchangePublisherSink::try_create(ctx.clone(), input, params)),
}
}
}
Loading

0 comments on commit c40b048

Please sign in to comment.