Skip to content

Commit

Permalink
expand streams after agg (#2530) (#2537)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Jan 28, 2022
1 parent 48a2afc commit 18e3df0
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 1 deletion.
13 changes: 12 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -895,12 +895,12 @@ void DAGQueryBlockInterpreter::executeAggregation(DAGPipeline & pipeline, const
/// If there are several sources, then we perform parallel aggregation
if (pipeline.streams.size() > 1)
{
before_agg_streams = pipeline.streams.size();
BlockInputStreamPtr stream_with_non_joined_data = combinedNonJoinedDataStream(pipeline, max_streams);
pipeline.firstStream() = std::make_shared<ParallelAggregatingBlockInputStream>(pipeline.streams, stream_with_non_joined_data,
params, context.getFileProvider(), true, max_streams,
settings.aggregation_memory_efficient_merge_threads ? static_cast<size_t>(settings.aggregation_memory_efficient_merge_threads)
: static_cast<size_t>(settings.max_threads));

pipeline.streams.resize(1);
}
else
Expand Down Expand Up @@ -1471,6 +1471,17 @@ BlockInputStreams DAGQueryBlockInterpreter::execute()
}
}

/// expand concurrency after agg
if(!query_block.isRootQueryBlock() && before_agg_streams > 1 && pipeline.streams.size()==1)
{
size_t concurrency = before_agg_streams;
BlockInputStreamPtr shared_query_block_input_stream
= std::make_shared<SharedQueryBlockInputStream>(concurrency * 5, pipeline.firstStream());
pipeline.streams.clear();
for (size_t i = 0; i < concurrency; i++)
pipeline.streams.push_back(std::make_shared<SimpleBlockInputStream>(shared_query_block_input_stream));
}

return pipeline.streams;
}
} // namespace DB
3 changes: 3 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ class DAGQueryBlockInterpreter
/// How many streams we ask for storage to produce, and in how many threads we will do further processing.
size_t max_streams = 1;

/// How many streams before aggregation
size_t before_agg_streams = 1;

/// Table from where to read data, if not subquery.
ManageableStoragePtr storage;
TableLockHolder table_drop_lock;
Expand Down

0 comments on commit 18e3df0

Please sign in to comment.