diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index 2891d3f772b..f35850c1412 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -895,12 +895,12 @@ void DAGQueryBlockInterpreter::executeAggregation(DAGPipeline & pipeline, const /// If there are several sources, then we perform parallel aggregation if (pipeline.streams.size() > 1) { + before_agg_streams = pipeline.streams.size(); BlockInputStreamPtr stream_with_non_joined_data = combinedNonJoinedDataStream(pipeline, max_streams); pipeline.firstStream() = std::make_shared(pipeline.streams, stream_with_non_joined_data, params, context.getFileProvider(), true, max_streams, settings.aggregation_memory_efficient_merge_threads ? static_cast(settings.aggregation_memory_efficient_merge_threads) : static_cast(settings.max_threads)); - pipeline.streams.resize(1); } else @@ -1471,6 +1471,17 @@ BlockInputStreams DAGQueryBlockInterpreter::execute() } } + /// expand concurrency after agg + if(!query_block.isRootQueryBlock() && before_agg_streams > 1 && pipeline.streams.size()==1) + { + size_t concurrency = before_agg_streams; + BlockInputStreamPtr shared_query_block_input_stream + = std::make_shared(concurrency * 5, pipeline.firstStream()); + pipeline.streams.clear(); + for (size_t i = 0; i < concurrency; i++) + pipeline.streams.push_back(std::make_shared(shared_query_block_input_stream)); + } + return pipeline.streams; } } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h index 449a5471c2d..d172af5f3fd 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h @@ -148,6 +148,9 @@ class DAGQueryBlockInterpreter /// How many streams we ask for storage to produce, and in how many threads we will do further processing. size_t max_streams = 1; + /// How many streams before aggregation + size_t before_agg_streams = 1; + /// Table from where to read data, if not subquery. ManageableStoragePtr storage; TableLockHolder table_drop_lock;