Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expand streams after agg #2530

Merged
merged 3 commits into from
Jul 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -894,12 +894,12 @@ void DAGQueryBlockInterpreter::executeAggregation(DAGPipeline & pipeline, const
/// If there are several sources, then we perform parallel aggregation
if (pipeline.streams.size() > 1)
{
before_agg_streams = pipeline.streams.size();
BlockInputStreamPtr stream_with_non_joined_data = combinedNonJoinedDataStream(pipeline, max_streams);
pipeline.firstStream() = std::make_shared<ParallelAggregatingBlockInputStream>(pipeline.streams, stream_with_non_joined_data,
params, context.getFileProvider(), true, max_streams,
settings.aggregation_memory_efficient_merge_threads ? static_cast<size_t>(settings.aggregation_memory_efficient_merge_threads)
: static_cast<size_t>(settings.max_threads));

pipeline.streams.resize(1);
}
else
Expand Down Expand Up @@ -1471,6 +1471,17 @@ BlockInputStreams DAGQueryBlockInterpreter::execute()
}
}

/// expand concurrency after agg
if(!query_block.isRootQueryBlock() && before_agg_streams > 1 && pipeline.streams.size()==1)
{
size_t concurrency = before_agg_streams;
BlockInputStreamPtr shared_query_block_input_stream
= std::make_shared<SharedQueryBlockInputStream>(concurrency * 5, pipeline.firstStream());
pipeline.streams.clear();
for (size_t i = 0; i < concurrency; i++)
pipeline.streams.push_back(std::make_shared<SimpleBlockInputStream>(shared_query_block_input_stream));
}

return pipeline.streams;
}
} // namespace DB
3 changes: 3 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ class DAGQueryBlockInterpreter
/// How many streams we ask for storage to produce, and in how many threads we will do further processing.
size_t max_streams = 1;

/// How many streams before aggregation
size_t before_agg_streams = 1;

/// Table from where to read data, if not subquery.
ManageableStoragePtr storage;
TableLockHolder table_drop_lock;
Expand Down