From 33d33c663cd3fd81f4a151c8e722f7d04cbe8614 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Fri, 1 Jul 2022 14:29:20 +0800 Subject: [PATCH 01/16] speed up agg with right join Signed-off-by: gengliqi --- .../ParallelAggregatingBlockInputStream.cpp | 7 +- .../ParallelAggregatingBlockInputStream.h | 2 +- .../src/DataStreams/ParallelInputsProcessor.h | 155 +++++++++++------- dbms/src/DataStreams/UnionBlockInputStream.h | 2 +- .../Coprocessor/DAGQueryBlockInterpreter.cpp | 3 +- .../Interpreters/InterpreterSelectQuery.cpp | 8 +- 6 files changed, 112 insertions(+), 65 deletions(-) diff --git a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp index f983de91b37..cd9d6235f52 100644 --- a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp +++ b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp @@ -24,7 +24,7 @@ namespace DB { ParallelAggregatingBlockInputStream::ParallelAggregatingBlockInputStream( const BlockInputStreams & inputs, - const BlockInputStreamPtr & additional_input_at_end, + const BlockInputStreams & additional_inputs_at_end, const Aggregator::Params & params_, const FileProviderPtr & file_provider_, bool final_, @@ -41,11 +41,10 @@ ParallelAggregatingBlockInputStream::ParallelAggregatingBlockInputStream( , keys_size(params.keys_size) , aggregates_size(params.aggregates_size) , handler(*this) - , processor(inputs, additional_input_at_end, max_threads, handler, log) + , processor(inputs, additional_inputs_at_end, max_threads, handler, log) { children = inputs; - if (additional_input_at_end) - children.push_back(additional_input_at_end); + children.insert(children.end(), additional_inputs_at_end.begin(), additional_inputs_at_end.end()); } diff --git a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h index 41e61786370..907622c8364 100644 --- a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h +++ b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h @@ -36,7 +36,7 @@ class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream */ ParallelAggregatingBlockInputStream( const BlockInputStreams & inputs, - const BlockInputStreamPtr & additional_input_at_end, + const BlockInputStreams & additional_inputs_at_end, const Aggregator::Params & params_, const FileProviderPtr & file_provider_, bool final_, diff --git a/dbms/src/DataStreams/ParallelInputsProcessor.h b/dbms/src/DataStreams/ParallelInputsProcessor.h index 34c70a7085e..72d414b9fdc 100644 --- a/dbms/src/DataStreams/ParallelInputsProcessor.h +++ b/dbms/src/DataStreams/ParallelInputsProcessor.h @@ -83,9 +83,8 @@ template class ParallelInputsProcessor { public: - /** additional_input_at_end - if not nullptr, - * then the blocks from this source will start to be processed only after all other sources are processed. - * This is done in the main thread. + /** additional_inputs_at_end - if not empty, + * then the blocks from thes sources will start to be processed only after all other sources are processed. * * Intended for implementation of FULL and RIGHT JOIN * - where you must first make JOIN in parallel, while noting which keys are not found, @@ -93,12 +92,12 @@ class ParallelInputsProcessor */ ParallelInputsProcessor( const BlockInputStreams & inputs_, - const BlockInputStreamPtr & additional_input_at_end_, + const BlockInputStreams & additional_inputs_at_end_, size_t max_threads_, Handler & handler_, const LoggerPtr & log_) : inputs(inputs_) - , additional_input_at_end(additional_input_at_end_) + , additional_inputs_at_end(additional_inputs_at_end_) , max_threads(std::min(inputs_.size(), max_threads_)) , handler(handler_) , log(log_) @@ -125,6 +124,10 @@ class ParallelInputsProcessor if (!thread_manager) thread_manager = newThreadManager(); active_threads = max_threads; + { + std::lock_guard lock(running_first_mutex); + running_first = max_threads; + } for (size_t i = 0; i < max_threads; ++i) thread_manager->schedule(true, handler.getName(), [this, i] { this->thread(i); }); } @@ -136,21 +139,12 @@ class ParallelInputsProcessor for (auto & input : inputs) { - if (IProfilingBlockInputStream * child = dynamic_cast(&*input)) - { - try - { - child->cancel(kill); - } - catch (...) - { - /** If you can not ask one or more sources to stop. - * (for example, the connection is broken for distributed query processing) - * - then do not care. - */ - LOG_FMT_ERROR(log, "Exception while cancelling {}", child->getName()); - } - } + cancelStream(input, kill); + } + + for (auto & input : additional_inputs_at_end) + { + cancelStream(input, kill); } } @@ -188,6 +182,24 @@ class ParallelInputsProcessor {} }; + void cancelStream(const BlockInputStreamPtr & stream, bool kill) { + if (auto * child = dynamic_cast(&*stream)) + { + try + { + child->cancel(kill); + } + catch (...) + { + /** If you can not ask one or more sources to stop. + * (for example, the connection is broken for distributed query processing) + * - then do not care. + */ + LOG_FMT_ERROR(log, "Exception while cancelling {}", child->getName()); + } + } + } + void publishPayload(BlockInputStreamPtr & stream, Block & block, size_t thread_num) { if constexpr (mode == StreamUnionMode::Basic) @@ -205,27 +217,6 @@ class ParallelInputsProcessor try { - while (!finish) - { - InputData unprepared_input; - { - std::lock_guard lock(unprepared_inputs_mutex); - - if (unprepared_inputs.empty()) - break; - - unprepared_input = unprepared_inputs.front(); - unprepared_inputs.pop(); - } - - unprepared_input.in->readPrefix(); - - { - std::lock_guard lock(available_inputs_mutex); - available_inputs.push(unprepared_input); - } - } - loop(thread_num); } catch (...) @@ -240,35 +231,81 @@ class ParallelInputsProcessor handler.onFinishThread(thread_num); - /// The last thread on the output indicates that there is no more data. - if (0 == --active_threads) { - /// And then it processes an additional source, if there is one. - if (additional_input_at_end) + std::unique_lock lock(running_first_mutex); + if (0 == --running_first) { - try + /// Only one thread can go here so don't need to hold `unprepared_inputs_mutex` + /// or `unprepared_inputs_mutex` lock. + if (finish) { - additional_input_at_end->readPrefix(); - while (Block block = additional_input_at_end->read()) - publishPayload(additional_input_at_end, block, thread_num); + return; } - catch (...) + else if (additional_inputs_at_end.empty()) { - exception = std::current_exception(); + handler.onFinish(); + return; } - if (exception) - { - handler.onException(exception, thread_num); + assert(unprepared_inputs.empty() && available_inputs.empty()); + for (size_t i = 0; i < additional_inputs_at_end.size(); ++i) + unprepared_inputs.emplace(additional_inputs_at_end[i], i); + + wait_first_done.notify_all(); + } + else + { + if (additional_inputs_at_end.empty()) { + return; } + wait_first_done.wait(lock, [this] { + return running_first == 0; + }); } + } + + try + { + loop(thread_num); + } + catch (...) + { + exception = std::current_exception(); + } + if (exception) + { + handler.onException(exception, thread_num); + } + + if (0 == --active_threads) { handler.onFinish(); /// TODO If in `onFinish` or `onFinishThread` there is an exception, then std::terminate is called. } } void loop(size_t thread_num) { + while (!finish) + { + InputData unprepared_input; + { + std::lock_guard lock(unprepared_inputs_mutex); + + if (unprepared_inputs.empty()) + break; + + unprepared_input = unprepared_inputs.front(); + unprepared_inputs.pop(); + } + + unprepared_input.in->readPrefix(); + + { + std::lock_guard lock(available_inputs_mutex); + available_inputs.push(unprepared_input); + } + } + while (!finish) /// You may need to stop work earlier than all sources run out. { InputData input; @@ -318,8 +355,8 @@ class ParallelInputsProcessor } } - BlockInputStreams inputs; - BlockInputStreamPtr additional_input_at_end; + const BlockInputStreams inputs; + const BlockInputStreams additional_inputs_at_end; unsigned max_threads; Handler & handler; @@ -359,6 +396,12 @@ class ParallelInputsProcessor /// For operations with unprepared_inputs. std::mutex unprepared_inputs_mutex; + /// For waiting all `inputs` sources work done. + /// After that, the `additional_inputs_at_end` sources can be processed. + std::mutex running_first_mutex; + std::condition_variable wait_first_done; + size_t running_first{0}; + /// How many sources ran out. std::atomic active_threads{0}; /// Finish the threads work (before the sources run out). diff --git a/dbms/src/DataStreams/UnionBlockInputStream.h b/dbms/src/DataStreams/UnionBlockInputStream.h index a782c3dd087..02688d0e09b 100644 --- a/dbms/src/DataStreams/UnionBlockInputStream.h +++ b/dbms/src/DataStreams/UnionBlockInputStream.h @@ -101,7 +101,7 @@ class UnionBlockInputStream final : public IProfilingBlockInputStream : output_queue(std::min(inputs.size(), max_threads) * 5) // reduce contention , log(Logger::get(NAME, req_id)) , handler(*this) - , processor(inputs, additional_input_at_end, max_threads, handler, log) + , processor(inputs, additional_input_at_end ? BlockInputStreams{additional_input_at_end} : BlockInputStreams{}, max_threads, handler, log) , exception_callback(exception_callback_) { // TODO: assert capacity of output_queue is not less than processor.getMaxThreads() diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index e322a830744..049d6bcba70 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -386,10 +386,9 @@ void DAGQueryBlockInterpreter::executeAggregation( if (pipeline.streams.size() > 1) { const Settings & settings = context.getSettingsRef(); - BlockInputStreamPtr stream_with_non_joined_data = combinedNonJoinedDataStream(pipeline, max_streams, log); pipeline.firstStream() = std::make_shared( pipeline.streams, - stream_with_non_joined_data, + pipeline.streams_with_non_joined_data, params, context.getFileProvider(), true, diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index fe8f04427a0..02704142d11 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -989,9 +989,15 @@ void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const Expre /// If there are several sources, then we perform parallel aggregation if (pipeline.streams.size() > 1) { + BlockInputStreams additional_inputs; + if (pipeline.stream_with_non_joined_data) + { + additional_inputs.push_back(pipeline.stream_with_non_joined_data); + } + pipeline.firstStream() = std::make_shared( pipeline.streams, - pipeline.stream_with_non_joined_data, + additional_inputs, params, file_provider, final, From f3ef670e6c592048864e1ab47c63685c0b2d4924 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Fri, 1 Jul 2022 16:41:35 +0800 Subject: [PATCH 02/16] fix Signed-off-by: gengliqi --- dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index 049d6bcba70..4ba2ba6c292 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -396,6 +396,7 @@ void DAGQueryBlockInterpreter::executeAggregation( settings.aggregation_memory_efficient_merge_threads ? static_cast(settings.aggregation_memory_efficient_merge_threads) : static_cast(settings.max_threads), log->identifier()); pipeline.streams.resize(1); + pipeline.streams_with_non_joined_data.clear(); // should record for agg before restore concurrency. See #3804. recordProfileStreams(pipeline, query_block.aggregation_name); restorePipelineConcurrency(pipeline); From 858f851a7a52500d002fad5c1391795436fd77f9 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Fri, 1 Jul 2022 17:59:52 +0800 Subject: [PATCH 03/16] call OnFinish even if finish is true Signed-off-by: gengliqi --- .../src/DataStreams/ParallelInputsProcessor.h | 25 ++++++++----------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/dbms/src/DataStreams/ParallelInputsProcessor.h b/dbms/src/DataStreams/ParallelInputsProcessor.h index 72d414b9fdc..b3480e79b76 100644 --- a/dbms/src/DataStreams/ParallelInputsProcessor.h +++ b/dbms/src/DataStreams/ParallelInputsProcessor.h @@ -231,23 +231,23 @@ class ParallelInputsProcessor handler.onFinishThread(thread_num); + if (additional_inputs_at_end.empty()) + { + if (0 == --active_threads) + { + handler.onFinish(); + } + } + { std::unique_lock lock(running_first_mutex); if (0 == --running_first) { /// Only one thread can go here so don't need to hold `unprepared_inputs_mutex` /// or `unprepared_inputs_mutex` lock. - if (finish) - { - return; - } - else if (additional_inputs_at_end.empty()) - { - handler.onFinish(); - return; - } - - assert(unprepared_inputs.empty() && available_inputs.empty()); + /// If a error happens, the `unprepared_inputs` and `available_inputs` may not be empty. + unprepared_inputs = UnpreparedInputs{}; + available_inputs = AvailableInputs{}; for (size_t i = 0; i < additional_inputs_at_end.size(); ++i) unprepared_inputs.emplace(additional_inputs_at_end[i], i); @@ -255,9 +255,6 @@ class ParallelInputsProcessor } else { - if (additional_inputs_at_end.empty()) { - return; - } wait_first_done.wait(lock, [this] { return running_first == 0; }); From 5659b0002ba88102ebdf6f35353b9fda1eec95e3 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Mon, 4 Jul 2022 13:20:15 +0800 Subject: [PATCH 04/16] tiny fix Signed-off-by: gengliqi --- dbms/src/DataStreams/ParallelInputsProcessor.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dbms/src/DataStreams/ParallelInputsProcessor.h b/dbms/src/DataStreams/ParallelInputsProcessor.h index b3480e79b76..9d94943e804 100644 --- a/dbms/src/DataStreams/ParallelInputsProcessor.h +++ b/dbms/src/DataStreams/ParallelInputsProcessor.h @@ -237,6 +237,7 @@ class ParallelInputsProcessor { handler.onFinish(); } + return; } { @@ -245,7 +246,7 @@ class ParallelInputsProcessor { /// Only one thread can go here so don't need to hold `unprepared_inputs_mutex` /// or `unprepared_inputs_mutex` lock. - /// If a error happens, the `unprepared_inputs` and `available_inputs` may not be empty. + /// If an error has occurred, the `unprepared_inputs` and `available_inputs` may not be empty. unprepared_inputs = UnpreparedInputs{}; available_inputs = AvailableInputs{}; for (size_t i = 0; i < additional_inputs_at_end.size(); ++i) From 87117cb56b92b594153403a3c26f7e8697eb2ae4 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Tue, 5 Jul 2022 14:12:00 +0800 Subject: [PATCH 05/16] address comment Signed-off-by: gengliqi --- .../src/DataStreams/ParallelInputsProcessor.h | 43 +++++++++---------- 1 file changed, 20 insertions(+), 23 deletions(-) diff --git a/dbms/src/DataStreams/ParallelInputsProcessor.h b/dbms/src/DataStreams/ParallelInputsProcessor.h index 9d94943e804..266033b37c8 100644 --- a/dbms/src/DataStreams/ParallelInputsProcessor.h +++ b/dbms/src/DataStreams/ParallelInputsProcessor.h @@ -182,12 +182,13 @@ class ParallelInputsProcessor {} }; - void cancelStream(const BlockInputStreamPtr & stream, bool kill) { - if (auto * child = dynamic_cast(&*stream)) + void cancelStream(const BlockInputStreamPtr & stream, bool kill) + { + if (auto * p_stream = dynamic_cast(&*stream)) { try { - child->cancel(kill); + p_stream->cancel(kill); } catch (...) { @@ -195,7 +196,7 @@ class ParallelInputsProcessor * (for example, the connection is broken for distributed query processing) * - then do not care. */ - LOG_FMT_ERROR(log, "Exception while cancelling {}", child->getName()); + LOG_FMT_ERROR(log, "Exception while cancelling {}", p_stream->getName()); } } } @@ -213,21 +214,8 @@ class ParallelInputsProcessor void thread(size_t thread_num) { - std::exception_ptr exception; - - try - { - loop(thread_num); - } - catch (...) - { - exception = std::current_exception(); - } - - if (exception) - { - handler.onException(exception, thread_num); - } + /// Handle `inputs`. + work(thread_num); handler.onFinishThread(thread_num); @@ -262,6 +250,19 @@ class ParallelInputsProcessor } } + /// Handle `additional_inputs_at_end`. + work(thread_num); + + if (0 == --active_threads) + { + handler.onFinish(); /// TODO If in `onFinish` or `onFinishThread` there is an exception, then std::terminate is called. + } + } + + void work(size_t thread_num) + { + std::exception_ptr exception; + try { loop(thread_num); @@ -275,10 +276,6 @@ class ParallelInputsProcessor { handler.onException(exception, thread_num); } - - if (0 == --active_threads) { - handler.onFinish(); /// TODO If in `onFinish` or `onFinishThread` there is an exception, then std::terminate is called. - } } void loop(size_t thread_num) From 37536032eddc95a813b77d233e4316735e54c410 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Tue, 5 Jul 2022 17:28:55 +0800 Subject: [PATCH 06/16] use mpmcqueue as available_inputs Signed-off-by: gengliqi --- .../src/DataStreams/ParallelInputsProcessor.h | 71 ++++++++----------- 1 file changed, 28 insertions(+), 43 deletions(-) diff --git a/dbms/src/DataStreams/ParallelInputsProcessor.h b/dbms/src/DataStreams/ParallelInputsProcessor.h index 266033b37c8..a84e883056f 100644 --- a/dbms/src/DataStreams/ParallelInputsProcessor.h +++ b/dbms/src/DataStreams/ParallelInputsProcessor.h @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -98,10 +99,12 @@ class ParallelInputsProcessor const LoggerPtr & log_) : inputs(inputs_) , additional_inputs_at_end(additional_inputs_at_end_) - , max_threads(std::min(inputs_.size(), max_threads_)) + , max_threads(std::min(std::max(inputs_.size(), additional_inputs_at_end_.size()), max_threads_)) , handler(handler_) + , available_inputs_ptr(std::make_unique(inputs_.size())) , log(log_) { + active_streams = inputs_.size(); for (size_t i = 0; i < inputs_.size(); ++i) unprepared_inputs.emplace(inputs_[i], i); } @@ -236,7 +239,9 @@ class ParallelInputsProcessor /// or `unprepared_inputs_mutex` lock. /// If an error has occurred, the `unprepared_inputs` and `available_inputs` may not be empty. unprepared_inputs = UnpreparedInputs{}; - available_inputs = AvailableInputs{}; + available_inputs_ptr = std::make_unique(additional_inputs_at_end.size()); + + active_streams = additional_inputs_at_end.size(); for (size_t i = 0; i < additional_inputs_at_end.size(); ++i) unprepared_inputs.emplace(additional_inputs_at_end[i], i); @@ -295,57 +300,37 @@ class ParallelInputsProcessor unprepared_input.in->readPrefix(); - { - std::lock_guard lock(available_inputs_mutex); - available_inputs.push(unprepared_input); - } + available_inputs_ptr->push(unprepared_input); } while (!finish) /// You may need to stop work earlier than all sources run out. { InputData input; - /// Select the next source. + if (!available_inputs_ptr->pop(input)) { - std::lock_guard lock(available_inputs_mutex); - - /// If there are no free sources, then this thread is no longer needed. (But other threads can work with their sources.) - if (available_inputs.empty()) - break; - - input = available_inputs.front(); - - /// We remove the source from the queue of available sources. - available_inputs.pop(); + // All input streams are exhausted. + break; } /// The main work. Block block = input.in->read(); - { - if (finish) - break; + if (finish) + break; - /// If this source is not run out yet, then put the resulting block in the ready queue. + if (block) + { + available_inputs_ptr->push(input); + publishPayload(input.in, block, thread_num); + } + else + { + if (0 == --active_streams) { - std::lock_guard lock(available_inputs_mutex); - - if (block) - { - available_inputs.push(input); - } - else - { - if (available_inputs.empty()) - break; - } - } - - if (finish) + available_inputs_ptr->finish(); break; - - if (block) - publishPayload(input.in, block, thread_num); + } } } } @@ -375,8 +360,9 @@ class ParallelInputsProcessor * * Therefore, a queue is used. This can be improved in the future. */ - using AvailableInputs = std::queue; - AvailableInputs available_inputs; + using AvailableInputs = MPMCQueue; + using AvailableInputsPtr = std::unique_ptr; + AvailableInputsPtr available_inputs_ptr; /** For parallel preparing (readPrefix) child streams. * First, streams are located here. @@ -385,9 +371,6 @@ class ParallelInputsProcessor using UnpreparedInputs = std::queue; UnpreparedInputs unprepared_inputs; - /// For operations with available_inputs. - std::mutex available_inputs_mutex; - /// For operations with unprepared_inputs. std::mutex unprepared_inputs_mutex; @@ -397,6 +380,8 @@ class ParallelInputsProcessor std::condition_variable wait_first_done; size_t running_first{0}; + /// How many active input streams. + std::atomic active_streams{0}; /// How many sources ran out. std::atomic active_threads{0}; /// Finish the threads work (before the sources run out). From 3fc123024b9cbc5b89a3a5d83068b695404a282e Mon Sep 17 00:00:00 2001 From: gengliqi Date: Wed, 6 Jul 2022 13:21:13 +0800 Subject: [PATCH 07/16] refine code Signed-off-by: gengliqi --- .../src/DataStreams/ParallelInputsProcessor.h | 173 ++++++++---------- 1 file changed, 75 insertions(+), 98 deletions(-) diff --git a/dbms/src/DataStreams/ParallelInputsProcessor.h b/dbms/src/DataStreams/ParallelInputsProcessor.h index a84e883056f..e3d4cf921ad 100644 --- a/dbms/src/DataStreams/ParallelInputsProcessor.h +++ b/dbms/src/DataStreams/ParallelInputsProcessor.h @@ -101,13 +101,10 @@ class ParallelInputsProcessor , additional_inputs_at_end(additional_inputs_at_end_) , max_threads(std::min(std::max(inputs_.size(), additional_inputs_at_end_.size()), max_threads_)) , handler(handler_) - , available_inputs_ptr(std::make_unique(inputs_.size())) + , working_inputs(inputs_) + , working_additional_inputs(additional_inputs_at_end_) , log(log_) - { - active_streams = inputs_.size(); - for (size_t i = 0; i < inputs_.size(); ++i) - unprepared_inputs.emplace(inputs_[i], i); - } + {} ~ParallelInputsProcessor() { @@ -127,10 +124,6 @@ class ParallelInputsProcessor if (!thread_manager) thread_manager = newThreadManager(); active_threads = max_threads; - { - std::lock_guard lock(running_first_mutex); - running_first = max_threads; - } for (size_t i = 0; i < max_threads; ++i) thread_manager->schedule(true, handler.getName(), [this, i] { this->thread(i); }); } @@ -140,6 +133,9 @@ class ParallelInputsProcessor { finish = true; + working_inputs.available_inputs.cancel(); + working_additional_inputs.available_inputs.cancel(); + for (auto & input : inputs) { cancelStream(input, kill); @@ -185,6 +181,49 @@ class ParallelInputsProcessor {} }; + struct WorkingInputs + { + explicit WorkingInputs(const BlockInputStreams & inputs_) + : available_inputs(inputs_.size()) + , active_inputs(inputs_.size()) + { + for (size_t i = 0; i < inputs_.size(); ++i) + unprepared_inputs.emplace(inputs_[i], i); + } + /** A set of available sources that are not currently processed by any thread. + * Each thread takes one source from this set, takes a block out of the source (at this moment the source does the calculations) + * and (if the source is not run out), puts it back into the set of available sources. + * + * The question arises what is better to use: + * - the queue (just processed source will be processed the next time later than the rest) + * - stack (just processed source will be processed as soon as possible). + * + * The stack is better than the queue when you need to do work on reading one source more consequentially, + * and theoretically, this allows you to achieve more consequent/consistent reads from the disk. + * + * But when using the stack, there is a problem with distributed query processing: + * data is read only from a part of the servers, and on the other servers + * a timeout occurs during send, and the request processing ends with an exception. + * + * Therefore, a queue is used. This can be improved in the future. + */ + using AvailableInputs = MPMCQueue; + AvailableInputs available_inputs; + + /// How many active input streams. + std::atomic active_inputs; + + /** For parallel preparing (readPrefix) child streams. + * First, streams are located here. + * After a stream was prepared, it is moved to "available_inputs" for reading. + */ + using UnpreparedInputs = std::queue; + UnpreparedInputs unprepared_inputs; + + /// For operations with unprepared_inputs. + std::mutex unprepared_inputs_mutex; + }; + void cancelStream(const BlockInputStreamPtr & stream, bool kill) { if (auto * p_stream = dynamic_cast(&*stream)) @@ -217,46 +256,11 @@ class ParallelInputsProcessor void thread(size_t thread_num) { - /// Handle `inputs`. - work(thread_num); + work(thread_num, working_inputs); handler.onFinishThread(thread_num); - if (additional_inputs_at_end.empty()) - { - if (0 == --active_threads) - { - handler.onFinish(); - } - return; - } - - { - std::unique_lock lock(running_first_mutex); - if (0 == --running_first) - { - /// Only one thread can go here so don't need to hold `unprepared_inputs_mutex` - /// or `unprepared_inputs_mutex` lock. - /// If an error has occurred, the `unprepared_inputs` and `available_inputs` may not be empty. - unprepared_inputs = UnpreparedInputs{}; - available_inputs_ptr = std::make_unique(additional_inputs_at_end.size()); - - active_streams = additional_inputs_at_end.size(); - for (size_t i = 0; i < additional_inputs_at_end.size(); ++i) - unprepared_inputs.emplace(additional_inputs_at_end[i], i); - - wait_first_done.notify_all(); - } - else - { - wait_first_done.wait(lock, [this] { - return running_first == 0; - }); - } - } - - /// Handle `additional_inputs_at_end`. - work(thread_num); + work(thread_num, working_additional_inputs); if (0 == --active_threads) { @@ -264,13 +268,13 @@ class ParallelInputsProcessor } } - void work(size_t thread_num) + void work(size_t thread_num, WorkingInputs & work) { std::exception_ptr exception; try { - loop(thread_num); + loop(thread_num, work); } catch (...) { @@ -283,33 +287,42 @@ class ParallelInputsProcessor } } - void loop(size_t thread_num) + /// This function may be called in different threads. + /// If finish is not true and no exception occurs, we can ensure that the work is + /// all done when the function returns in any thread. + void loop(size_t thread_num, WorkingInputs & work) { + if (work.active_inputs == 0) + { + return; + } + while (!finish) { InputData unprepared_input; { - std::lock_guard lock(unprepared_inputs_mutex); + std::lock_guard lock(work.unprepared_inputs_mutex); - if (unprepared_inputs.empty()) + if (work.unprepared_inputs.empty()) break; - unprepared_input = unprepared_inputs.front(); - unprepared_inputs.pop(); + unprepared_input = work.unprepared_inputs.front(); + work.unprepared_inputs.pop(); } unprepared_input.in->readPrefix(); - available_inputs_ptr->push(unprepared_input); + work.available_inputs.push(unprepared_input); } while (!finish) /// You may need to stop work earlier than all sources run out. { InputData input; - if (!available_inputs_ptr->pop(input)) + if (!work.available_inputs.pop(input)) { - // All input streams are exhausted. + /// All input streams are exhausted. + /// Or an exception occurred and the queue was cancelled. break; } @@ -321,14 +334,14 @@ class ParallelInputsProcessor if (block) { - available_inputs_ptr->push(input); + work.available_inputs.push(input); publishPayload(input.in, block, thread_num); } else { - if (0 == --active_streams) + if (0 == --work.active_inputs) { - available_inputs_ptr->finish(); + work.available_inputs.finish(); break; } } @@ -343,45 +356,9 @@ class ParallelInputsProcessor std::shared_ptr thread_manager; - /** A set of available sources that are not currently processed by any thread. - * Each thread takes one source from this set, takes a block out of the source (at this moment the source does the calculations) - * and (if the source is not run out), puts it back into the set of available sources. - * - * The question arises what is better to use: - * - the queue (just processed source will be processed the next time later than the rest) - * - stack (just processed source will be processed as soon as possible). - * - * The stack is better than the queue when you need to do work on reading one source more consequentially, - * and theoretically, this allows you to achieve more consequent/consistent reads from the disk. - * - * But when using the stack, there is a problem with distributed query processing: - * data is read only from a part of the servers, and on the other servers - * a timeout occurs during send, and the request processing ends with an exception. - * - * Therefore, a queue is used. This can be improved in the future. - */ - using AvailableInputs = MPMCQueue; - using AvailableInputsPtr = std::unique_ptr; - AvailableInputsPtr available_inputs_ptr; - - /** For parallel preparing (readPrefix) child streams. - * First, streams are located here. - * After a stream was prepared, it is moved to "available_inputs" for reading. - */ - using UnpreparedInputs = std::queue; - UnpreparedInputs unprepared_inputs; - - /// For operations with unprepared_inputs. - std::mutex unprepared_inputs_mutex; - - /// For waiting all `inputs` sources work done. - /// After that, the `additional_inputs_at_end` sources can be processed. - std::mutex running_first_mutex; - std::condition_variable wait_first_done; - size_t running_first{0}; + WorkingInputs working_inputs; + WorkingInputs working_additional_inputs; - /// How many active input streams. - std::atomic active_streams{0}; /// How many sources ran out. std::atomic active_threads{0}; /// Finish the threads work (before the sources run out). From 82a50950470a5c8e2f8f2b25f9675c817e24833c Mon Sep 17 00:00:00 2001 From: gengliqi Date: Wed, 6 Jul 2022 15:58:10 +0800 Subject: [PATCH 08/16] update UnionBlockInputStream Signed-off-by: gengliqi --- dbms/src/DataStreams/UnionBlockInputStream.h | 7 ++- dbms/src/DataStreams/tests/union_stream2.cpp | 2 +- .../Coprocessor/DAGQueryBlockInterpreter.cpp | 8 ++-- .../Flash/Coprocessor/InterpreterUtils.cpp | 46 +++---------------- dbms/src/Flash/tests/bench_exchange.cpp | 6 +-- dbms/src/Flash/tests/bench_window.cpp | 2 +- .../Interpreters/InterpreterSelectQuery.cpp | 10 +--- .../InterpreterSelectWithUnionQuery.cpp | 2 +- 8 files changed, 21 insertions(+), 62 deletions(-) diff --git a/dbms/src/DataStreams/UnionBlockInputStream.h b/dbms/src/DataStreams/UnionBlockInputStream.h index 02688d0e09b..a8ceb7d6dff 100644 --- a/dbms/src/DataStreams/UnionBlockInputStream.h +++ b/dbms/src/DataStreams/UnionBlockInputStream.h @@ -94,20 +94,19 @@ class UnionBlockInputStream final : public IProfilingBlockInputStream public: UnionBlockInputStream( BlockInputStreams inputs, - BlockInputStreamPtr additional_input_at_end, + BlockInputStreams additional_inputs_at_end, size_t max_threads, const String & req_id, ExceptionCallback exception_callback_ = ExceptionCallback()) : output_queue(std::min(inputs.size(), max_threads) * 5) // reduce contention , log(Logger::get(NAME, req_id)) , handler(*this) - , processor(inputs, additional_input_at_end ? BlockInputStreams{additional_input_at_end} : BlockInputStreams{}, max_threads, handler, log) + , processor(inputs, additional_inputs_at_end, max_threads, handler, log) , exception_callback(exception_callback_) { // TODO: assert capacity of output_queue is not less than processor.getMaxThreads() children = inputs; - if (additional_input_at_end) - children.push_back(additional_input_at_end); + children.insert(children.end(), additional_inputs_at_end.begin(), additional_inputs_at_end.end()); size_t num_children = children.size(); if (num_children > 1) diff --git a/dbms/src/DataStreams/tests/union_stream2.cpp b/dbms/src/DataStreams/tests/union_stream2.cpp index f939cda4e14..fb3f7238414 100644 --- a/dbms/src/DataStreams/tests/union_stream2.cpp +++ b/dbms/src/DataStreams/tests/union_stream2.cpp @@ -51,7 +51,7 @@ try for (size_t i = 0, size = streams.size(); i < size; ++i) streams[i] = std::make_shared(streams[i]); - BlockInputStreamPtr stream = std::make_shared>(streams, nullptr, settings.max_threads, /*req_id=*/""); + BlockInputStreamPtr stream = std::make_shared>(streams, BlockInputStreams{}, settings.max_threads, /*req_id=*/""); stream = std::make_shared(stream, 10, 0, ""); WriteBufferFromFileDescriptor wb(STDERR_FILENO); diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index 4ba2ba6c292..6266737b11c 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -383,7 +383,7 @@ void DAGQueryBlockInterpreter::executeAggregation( is_final_agg); /// If there are several sources, then we perform parallel aggregation - if (pipeline.streams.size() > 1) + if (pipeline.streams.size() > 1 || pipeline.streams_with_non_joined_data.size() > 1) { const Settings & settings = context.getSettingsRef(); pipeline.firstStream() = std::make_shared( @@ -403,14 +403,14 @@ void DAGQueryBlockInterpreter::executeAggregation( } else { - BlockInputStreamPtr stream_with_non_joined_data = combinedNonJoinedDataStream(pipeline, max_streams, log); BlockInputStreams inputs; if (!pipeline.streams.empty()) inputs.push_back(pipeline.firstStream()); else pipeline.streams.resize(1); - if (stream_with_non_joined_data) - inputs.push_back(stream_with_non_joined_data); + if (!pipeline.streams_with_non_joined_data.empty()) + inputs.push_back(pipeline.streams_with_non_joined_data.at(0)); + pipeline.firstStream() = std::make_shared( std::make_shared(inputs, log->identifier()), params, diff --git a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp index c9810454218..7b590b0f887 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp @@ -39,32 +39,6 @@ void restoreConcurrency( } } -BlockInputStreamPtr combinedNonJoinedDataStream( - DAGPipeline & pipeline, - size_t max_threads, - const LoggerPtr & log, - bool ignore_block) -{ - BlockInputStreamPtr ret = nullptr; - if (pipeline.streams_with_non_joined_data.size() == 1) - ret = pipeline.streams_with_non_joined_data.at(0); - else if (pipeline.streams_with_non_joined_data.size() > 1) - { - if (ignore_block) - { - ret = std::make_shared(pipeline.streams_with_non_joined_data, nullptr, max_threads, log->identifier()); - ret->setExtraInfo("combine non joined(ignore block)"); - } - else - { - ret = std::make_shared(pipeline.streams_with_non_joined_data, nullptr, max_threads, log->identifier()); - ret->setExtraInfo("combine non joined"); - } - } - pipeline.streams_with_non_joined_data.clear(); - return ret; -} - void executeUnion( DAGPipeline & pipeline, size_t max_streams, @@ -74,20 +48,12 @@ void executeUnion( { if (pipeline.streams.size() == 1 && pipeline.streams_with_non_joined_data.empty()) return; - auto non_joined_data_stream = combinedNonJoinedDataStream(pipeline, max_streams, log, ignore_block); - if (!pipeline.streams.empty()) - { - if (ignore_block) - pipeline.firstStream() = std::make_shared(pipeline.streams, non_joined_data_stream, max_streams, log->identifier()); - else - pipeline.firstStream() = std::make_shared(pipeline.streams, non_joined_data_stream, max_streams, log->identifier()); - pipeline.firstStream()->setExtraInfo(extra_info); - pipeline.streams.resize(1); - } - else if (non_joined_data_stream != nullptr) - { - pipeline.streams.push_back(non_joined_data_stream); - } + if (ignore_block) + pipeline.firstStream() = std::make_shared(pipeline.streams, pipeline.streams_with_non_joined_data, max_streams, log->identifier()); + else + pipeline.firstStream() = std::make_shared(pipeline.streams, pipeline.streams_with_non_joined_data, max_streams, log->identifier()); + pipeline.firstStream()->setExtraInfo(extra_info); + pipeline.streams.resize(1); } ExpressionActionsPtr generateProjectExpressionActions( diff --git a/dbms/src/Flash/tests/bench_exchange.cpp b/dbms/src/Flash/tests/bench_exchange.cpp index fbb53bfd4a4..cbbdf060580 100644 --- a/dbms/src/Flash/tests/bench_exchange.cpp +++ b/dbms/src/Flash/tests/bench_exchange.cpp @@ -215,7 +215,7 @@ std::vector ReceiverHelper::buildExchangeReceiverStream() BlockInputStreamPtr ReceiverHelper::buildUnionStream() { auto streams = buildExchangeReceiverStream(); - return std::make_shared>(streams, nullptr, concurrency, /*req_id=*/""); + return std::make_shared>(streams, BlockInputStreams{}, concurrency, /*req_id=*/""); } void ReceiverHelper::finish() @@ -290,7 +290,7 @@ BlockInputStreamPtr SenderHelper::buildUnionStream( send_streams.push_back(std::make_shared(stream, std::move(response_writer), /*req_id=*/"")); } - return std::make_shared>(send_streams, nullptr, concurrency, /*req_id=*/""); + return std::make_shared>(send_streams, BlockInputStreams{}, concurrency, /*req_id=*/""); } BlockInputStreamPtr SenderHelper::buildUnionStream(size_t total_rows, const std::vector & blocks) @@ -312,7 +312,7 @@ BlockInputStreamPtr SenderHelper::buildUnionStream(size_t total_rows, const std: send_streams.push_back(std::make_shared(stream, std::move(response_writer), /*req_id=*/"")); } - return std::make_shared>(send_streams, nullptr, concurrency, /*req_id=*/""); + return std::make_shared>(send_streams, BlockInputStreams{}, concurrency, /*req_id=*/""); } void SenderHelper::finish() diff --git a/dbms/src/Flash/tests/bench_window.cpp b/dbms/src/Flash/tests/bench_window.cpp index da9df20fdf3..80a6511ccfe 100644 --- a/dbms/src/Flash/tests/bench_window.cpp +++ b/dbms/src/Flash/tests/bench_window.cpp @@ -71,7 +71,7 @@ class WindowFunctionBench : public ExchangeBench pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, 8192, 0, "mock_executor_id_squashing"); }); - receiver_stream = std::make_shared>(pipeline.streams, nullptr, concurrency, /*req_id=*/""); + receiver_stream = std::make_shared>(pipeline.streams, BlockInputStreams{}, concurrency, /*req_id=*/""); } tipb::Window window; diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 02704142d11..8028306bcb8 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -989,15 +989,9 @@ void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const Expre /// If there are several sources, then we perform parallel aggregation if (pipeline.streams.size() > 1) { - BlockInputStreams additional_inputs; - if (pipeline.stream_with_non_joined_data) - { - additional_inputs.push_back(pipeline.stream_with_non_joined_data); - } - pipeline.firstStream() = std::make_shared( pipeline.streams, - additional_inputs, + BlockInputStreams{pipeline.stream_with_non_joined_data}, params, file_provider, final, @@ -1255,7 +1249,7 @@ void InterpreterSelectQuery::executeUnion(Pipeline & pipeline) { pipeline.firstStream() = std::make_shared>( pipeline.streams, - pipeline.stream_with_non_joined_data, + BlockInputStreams{pipeline.stream_with_non_joined_data}, max_streams, /*req_id=*/""); pipeline.stream_with_non_joined_data = nullptr; diff --git a/dbms/src/Interpreters/InterpreterSelectWithUnionQuery.cpp b/dbms/src/Interpreters/InterpreterSelectWithUnionQuery.cpp index 5e73b1e5f3e..076c290cc9d 100644 --- a/dbms/src/Interpreters/InterpreterSelectWithUnionQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectWithUnionQuery.cpp @@ -224,7 +224,7 @@ BlockIO InterpreterSelectWithUnionQuery::execute() } else { - result_stream = std::make_shared>(nested_streams, nullptr, settings.max_threads, /*req_id=*/""); + result_stream = std::make_shared>(nested_streams, BlockInputStreams{}, settings.max_threads, /*req_id=*/""); nested_streams.clear(); } From b44539cd9ac41fd9e326ccacd4bd7f25517921ce Mon Sep 17 00:00:00 2001 From: gengliqi Date: Wed, 6 Jul 2022 17:45:30 +0800 Subject: [PATCH 09/16] tiny fix Signed-off-by: gengliqi --- .../src/DataStreams/ParallelInputsProcessor.h | 40 +++++++++---------- dbms/src/DataStreams/UnionBlockInputStream.h | 2 +- .../Coprocessor/DAGQueryBlockInterpreter.cpp | 4 +- .../Flash/Coprocessor/InterpreterUtils.cpp | 7 +++- 4 files changed, 29 insertions(+), 24 deletions(-) diff --git a/dbms/src/DataStreams/ParallelInputsProcessor.h b/dbms/src/DataStreams/ParallelInputsProcessor.h index e3d4cf921ad..193bad3a17d 100644 --- a/dbms/src/DataStreams/ParallelInputsProcessor.h +++ b/dbms/src/DataStreams/ParallelInputsProcessor.h @@ -85,7 +85,7 @@ class ParallelInputsProcessor { public: /** additional_inputs_at_end - if not empty, - * then the blocks from thes sources will start to be processed only after all other sources are processed. + * then the blocks from the sources will start to be processed only after all other sources are processed. * * Intended for implementation of FULL and RIGHT JOIN * - where you must first make JOIN in parallel, while noting which keys are not found, @@ -136,15 +136,8 @@ class ParallelInputsProcessor working_inputs.available_inputs.cancel(); working_additional_inputs.available_inputs.cancel(); - for (auto & input : inputs) - { - cancelStream(input, kill); - } - - for (auto & input : additional_inputs_at_end) - { - cancelStream(input, kill); - } + cancelStreams(inputs, kill); + cancelStreams(additional_inputs_at_end, kill); } /// Wait until all threads are finished, before the destructor. @@ -174,7 +167,9 @@ class ParallelInputsProcessor BlockInputStreamPtr in; size_t i; /// The source number (for debugging). - InputData() {} + InputData() + : i(0) + {} InputData(const BlockInputStreamPtr & in_, size_t i_) : in(in_) , i(i_) @@ -224,21 +219,24 @@ class ParallelInputsProcessor std::mutex unprepared_inputs_mutex; }; - void cancelStream(const BlockInputStreamPtr & stream, bool kill) + void cancelStreams(const BlockInputStreams & streams, bool kill) { - if (auto * p_stream = dynamic_cast(&*stream)) + for (const auto & input : streams) { - try + if (auto * p_child = dynamic_cast(&*input)) { - p_stream->cancel(kill); - } - catch (...) - { - /** If you can not ask one or more sources to stop. + try + { + p_child->cancel(kill); + } + catch (...) + { + /** If you can not ask one or more sources to stop. * (for example, the connection is broken for distributed query processing) * - then do not care. */ - LOG_FMT_ERROR(log, "Exception while cancelling {}", p_stream->getName()); + LOG_FMT_ERROR(log, "Exception while cancelling {}", p_child->getName()); + } } } } @@ -346,6 +344,8 @@ class ParallelInputsProcessor } } } + + // Should read_suffix be called here? } const BlockInputStreams inputs; diff --git a/dbms/src/DataStreams/UnionBlockInputStream.h b/dbms/src/DataStreams/UnionBlockInputStream.h index a8ceb7d6dff..ffcc8d77c10 100644 --- a/dbms/src/DataStreams/UnionBlockInputStream.h +++ b/dbms/src/DataStreams/UnionBlockInputStream.h @@ -98,7 +98,7 @@ class UnionBlockInputStream final : public IProfilingBlockInputStream size_t max_threads, const String & req_id, ExceptionCallback exception_callback_ = ExceptionCallback()) - : output_queue(std::min(inputs.size(), max_threads) * 5) // reduce contention + : output_queue(std::min(std::max(inputs.size(), additional_inputs_at_end.size()), max_threads) * 5) // reduce contention , log(Logger::get(NAME, req_id)) , handler(*this) , processor(inputs, additional_inputs_at_end, max_threads, handler, log) diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index 6266737b11c..c234cda79e5 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -386,7 +386,7 @@ void DAGQueryBlockInterpreter::executeAggregation( if (pipeline.streams.size() > 1 || pipeline.streams_with_non_joined_data.size() > 1) { const Settings & settings = context.getSettingsRef(); - pipeline.firstStream() = std::make_shared( + BlockInputStreamPtr stream = std::make_shared( pipeline.streams, pipeline.streams_with_non_joined_data, params, @@ -397,6 +397,8 @@ void DAGQueryBlockInterpreter::executeAggregation( log->identifier()); pipeline.streams.resize(1); pipeline.streams_with_non_joined_data.clear(); + pipeline.firstStream() = std::move(stream); + // should record for agg before restore concurrency. See #3804. recordProfileStreams(pipeline, query_block.aggregation_name); restorePipelineConcurrency(pipeline); diff --git a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp index 7b590b0f887..aabd4b7361f 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp @@ -48,12 +48,15 @@ void executeUnion( { if (pipeline.streams.size() == 1 && pipeline.streams_with_non_joined_data.empty()) return; + BlockInputStreamPtr stream; if (ignore_block) - pipeline.firstStream() = std::make_shared(pipeline.streams, pipeline.streams_with_non_joined_data, max_streams, log->identifier()); + stream = std::make_shared(pipeline.streams, pipeline.streams_with_non_joined_data, max_streams, log->identifier()); else - pipeline.firstStream() = std::make_shared(pipeline.streams, pipeline.streams_with_non_joined_data, max_streams, log->identifier()); + stream = std::make_shared(pipeline.streams, pipeline.streams_with_non_joined_data, max_streams, log->identifier()); pipeline.firstStream()->setExtraInfo(extra_info); pipeline.streams.resize(1); + pipeline.streams_with_non_joined_data.clear(); + pipeline.firstStream() = std::move(stream); } ExpressionActionsPtr generateProjectExpressionActions( From bbf43e0edaaa98f27613039e467288b87ad017e6 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Thu, 7 Jul 2022 10:32:11 +0800 Subject: [PATCH 10/16] fix tests Signed-off-by: gengliqi --- .../src/DataStreams/ParallelInputsProcessor.h | 27 +++++++------------ .../Flash/Coprocessor/InterpreterUtils.cpp | 2 +- dbms/src/TestUtils/mockExecutor.h | 2 +- 3 files changed, 11 insertions(+), 20 deletions(-) diff --git a/dbms/src/DataStreams/ParallelInputsProcessor.h b/dbms/src/DataStreams/ParallelInputsProcessor.h index 193bad3a17d..24aa8dbcb70 100644 --- a/dbms/src/DataStreams/ParallelInputsProcessor.h +++ b/dbms/src/DataStreams/ParallelInputsProcessor.h @@ -295,41 +295,32 @@ class ParallelInputsProcessor return; } + InputData input; + while (!finish) { - InputData unprepared_input; { std::lock_guard lock(work.unprepared_inputs_mutex); if (work.unprepared_inputs.empty()) break; - unprepared_input = work.unprepared_inputs.front(); + input = work.unprepared_inputs.front(); work.unprepared_inputs.pop(); } - unprepared_input.in->readPrefix(); + input.in->readPrefix(); - work.available_inputs.push(unprepared_input); + work.available_inputs.push(input); } - while (!finish) /// You may need to stop work earlier than all sources run out. + /// The condition is false when all input streams are exhausted or + /// an exception occurred then the queue was cancelled. + while (work.available_inputs.pop(input)) { - InputData input; - - if (!work.available_inputs.pop(input)) - { - /// All input streams are exhausted. - /// Or an exception occurred and the queue was cancelled. - break; - } - /// The main work. Block block = input.in->read(); - if (finish) - break; - if (block) { work.available_inputs.push(input); @@ -345,7 +336,7 @@ class ParallelInputsProcessor } } - // Should read_suffix be called here? + // Should readSuffix be called here? } const BlockInputStreams inputs; diff --git a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp index aabd4b7361f..58d6f21a578 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp @@ -53,7 +53,7 @@ void executeUnion( stream = std::make_shared(pipeline.streams, pipeline.streams_with_non_joined_data, max_streams, log->identifier()); else stream = std::make_shared(pipeline.streams, pipeline.streams_with_non_joined_data, max_streams, log->identifier()); - pipeline.firstStream()->setExtraInfo(extra_info); + stream->setExtraInfo(extra_info); pipeline.streams.resize(1); pipeline.streams_with_non_joined_data.clear(); pipeline.firstStream() = std::move(stream); diff --git a/dbms/src/TestUtils/mockExecutor.h b/dbms/src/TestUtils/mockExecutor.h index bad92c4226d..7c12ab3b302 100644 --- a/dbms/src/TestUtils/mockExecutor.h +++ b/dbms/src/TestUtils/mockExecutor.h @@ -89,7 +89,7 @@ class DAGRequestBuilder DAGRequestBuilder & exchangeSender(tipb::ExchangeType exchange_type); - // Currentlt only support inner join, left join and right join. + // Currently only support inner join, left join and right join. // TODO support more types of join. DAGRequestBuilder & join(const DAGRequestBuilder & right, MockAsts exprs); DAGRequestBuilder & join(const DAGRequestBuilder & right, MockAsts exprs, ASTTableJoin::Kind kind); From 56372881e0188a8376b7f04187fbd5832b779bb4 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Thu, 7 Jul 2022 11:40:28 +0800 Subject: [PATCH 11/16] address comments Signed-off-by: gengliqi --- dbms/src/DataStreams/ParallelInputsProcessor.h | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/dbms/src/DataStreams/ParallelInputsProcessor.h b/dbms/src/DataStreams/ParallelInputsProcessor.h index 24aa8dbcb70..72438f5ca57 100644 --- a/dbms/src/DataStreams/ParallelInputsProcessor.h +++ b/dbms/src/DataStreams/ParallelInputsProcessor.h @@ -255,11 +255,10 @@ class ParallelInputsProcessor void thread(size_t thread_num) { work(thread_num, working_inputs); + work(thread_num, working_additional_inputs); handler.onFinishThread(thread_num); - work(thread_num, working_additional_inputs); - if (0 == --active_threads) { handler.onFinish(); /// TODO If in `onFinish` or `onFinishThread` there is an exception, then std::terminate is called. @@ -314,8 +313,8 @@ class ParallelInputsProcessor work.available_inputs.push(input); } - /// The condition is false when all input streams are exhausted or - /// an exception occurred then the queue was cancelled. + // The condition is false when all input streams are exhausted or + // an exception occurred then the queue was cancelled. while (work.available_inputs.pop(input)) { /// The main work. @@ -335,8 +334,6 @@ class ParallelInputsProcessor } } } - - // Should readSuffix be called here? } const BlockInputStreams inputs; From 6ecf88da7ed4286f5139da5b799610594c93ba38 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Thu, 7 Jul 2022 15:05:40 +0800 Subject: [PATCH 12/16] add interpreter test Signed-off-by: gengliqi --- dbms/src/Debug/astToExecutor.cpp | 2 +- dbms/src/Flash/tests/gtest_interpreter.cpp | 374 ++++++++++++++------- dbms/src/TestUtils/mockExecutor.h | 1 + 3 files changed, 249 insertions(+), 128 deletions(-) diff --git a/dbms/src/Debug/astToExecutor.cpp b/dbms/src/Debug/astToExecutor.cpp index 7d1f3bc7209..78f2485beb6 100644 --- a/dbms/src/Debug/astToExecutor.cpp +++ b/dbms/src/Debug/astToExecutor.cpp @@ -1540,7 +1540,7 @@ ExecutorPtr compileAggregation(ExecutorPtr input, size_t & executor_index, ASTPt } TiDB::ColumnInfo ci; - if (func->name == "count") + if (func->name == "count" || func->name == "sum") { ci.tp = TiDB::TypeLongLong; ci.flag = TiDB::ColumnFlagUnsigned | TiDB::ColumnFlagNotNull; diff --git a/dbms/src/Flash/tests/gtest_interpreter.cpp b/dbms/src/Flash/tests/gtest_interpreter.cpp index ba7d8fd15ee..fb7e89671a6 100644 --- a/dbms/src/Flash/tests/gtest_interpreter.cpp +++ b/dbms/src/Flash/tests/gtest_interpreter.cpp @@ -31,8 +31,8 @@ class InterpreterExecuteTest : public DB::tests::ExecutorTest context.addMockTable({"test_db", "r_table"}, {{"r_a", TiDB::TP::TypeLong}, {"r_b", TiDB::TP::TypeString}, {"join_c", TiDB::TP::TypeString}}); context.addMockTable({"test_db", "l_table"}, {{"l_a", TiDB::TP::TypeLong}, {"l_b", TiDB::TP::TypeString}, {"join_c", TiDB::TP::TypeString}}); context.addExchangeRelationSchema("sender_1", {{"s1", TiDB::TP::TypeString}, {"s2", TiDB::TP::TypeString}, {"s3", TiDB::TP::TypeString}}); - context.addExchangeRelationSchema("sender_l", {{"l_a", TiDB::TP::TypeString}, {"l_b", TiDB::TP::TypeString}, {"join_c", TiDB::TP::TypeString}}); - context.addExchangeRelationSchema("sender_r", {{"r_a", TiDB::TP::TypeString}, {"r_b", TiDB::TP::TypeString}, {"join_c", TiDB::TP::TypeString}}); + context.addExchangeRelationSchema("sender_l", {{"l_a", TiDB::TP::TypeLong}, {"l_b", TiDB::TP::TypeString}, {"join_c", TiDB::TP::TypeString}}); + context.addExchangeRelationSchema("sender_r", {{"r_a", TiDB::TP::TypeLong}, {"r_b", TiDB::TP::TypeString}, {"join_c", TiDB::TP::TypeString}}); } }; @@ -212,47 +212,6 @@ Union: ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } - // Join Source. - DAGRequestBuilder table1 = context.scan("test_db", "r_table"); - DAGRequestBuilder table2 = context.scan("test_db", "l_table"); - DAGRequestBuilder table3 = context.scan("test_db", "r_table"); - DAGRequestBuilder table4 = context.scan("test_db", "l_table"); - - request = table1.join( - table2.join( - table3.join(table4, - {col("join_c")}, - ASTTableJoin::Kind::Left), - {col("join_c")}, - ASTTableJoin::Kind::Left), - {col("join_c")}, - ASTTableJoin::Kind::Left) - .build(context); - { - String expected = R"( -CreatingSets - Union: - HashJoinBuildBlockInputStream x 10: , join_kind = Left - Expression: - Expression: - MockTableScan - Union x 2: - HashJoinBuildBlockInputStream x 10: , join_kind = Left - Expression: - Expression: - Expression: - HashJoinProbe: - Expression: - MockTableScan - Union: - Expression x 10: - Expression: - HashJoinProbe: - Expression: - MockTableScan)"; - ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); - } - request = context.receive("sender_1") .project({"s1", "s2", "s3"}) .project({"s1", "s2"}) @@ -298,90 +257,6 @@ Union: MockExchangeReceiver)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } - - // only join + ExchangeReceiver - DAGRequestBuilder receiver1 = context.receive("sender_l"); - DAGRequestBuilder receiver2 = context.receive("sender_r"); - DAGRequestBuilder receiver3 = context.receive("sender_l"); - DAGRequestBuilder receiver4 = context.receive("sender_r"); - - request = receiver1.join( - receiver2.join( - receiver3.join(receiver4, - {col("join_c")}, - ASTTableJoin::Kind::Left), - {col("join_c")}, - ASTTableJoin::Kind::Left), - {col("join_c")}, - ASTTableJoin::Kind::Left) - .build(context); - { - String expected = R"( -CreatingSets - Union: - HashJoinBuildBlockInputStream x 10: , join_kind = Left - Expression: - Expression: - MockExchangeReceiver - Union x 2: - HashJoinBuildBlockInputStream x 10: , join_kind = Left - Expression: - Expression: - Expression: - HashJoinProbe: - Expression: - MockExchangeReceiver - Union: - Expression x 10: - Expression: - HashJoinProbe: - Expression: - MockExchangeReceiver)"; - ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); - } - - // join + receiver + sender - // TODO: Find a way to write the request easier. - DAGRequestBuilder receiver5 = context.receive("sender_l"); - DAGRequestBuilder receiver6 = context.receive("sender_r"); - DAGRequestBuilder receiver7 = context.receive("sender_l"); - DAGRequestBuilder receiver8 = context.receive("sender_r"); - request = receiver5.join( - receiver6.join( - receiver7.join(receiver8, - {col("join_c")}, - ASTTableJoin::Kind::Left), - {col("join_c")}, - ASTTableJoin::Kind::Left), - {col("join_c")}, - ASTTableJoin::Kind::Left) - .exchangeSender(tipb::PassThrough) - .build(context); - { - String expected = R"( -CreatingSets - Union: - HashJoinBuildBlockInputStream x 10: , join_kind = Left - Expression: - Expression: - MockExchangeReceiver - Union x 2: - HashJoinBuildBlockInputStream x 10: , join_kind = Left - Expression: - Expression: - Expression: - HashJoinProbe: - Expression: - MockExchangeReceiver - Union: - MockExchangeSender x 10 - Expression: - Expression: - HashJoinProbe: - Expression: - MockExchangeReceiver)"; - ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); - } } CATCH @@ -465,5 +340,250 @@ Union: } CATCH +TEST_F(InterpreterExecuteTest, Join) +try +{ + // TODO: Find a way to write the request easier. + { + // Join Source. + DAGRequestBuilder table1 = context.scan("test_db", "r_table"); + DAGRequestBuilder table2 = context.scan("test_db", "l_table"); + DAGRequestBuilder table3 = context.scan("test_db", "r_table"); + DAGRequestBuilder table4 = context.scan("test_db", "l_table"); + + auto request = table1.join( + table2.join( + table3.join(table4, + {col("join_c")}, + ASTTableJoin::Kind::Left), + {col("join_c")}, + ASTTableJoin::Kind::Left), + {col("join_c")}, + ASTTableJoin::Kind::Left) + .build(context); + + String expected = R"( +CreatingSets + Union: + HashJoinBuildBlockInputStream x 10: , join_kind = Left + Expression: + Expression: + MockTableScan + Union x 2: + HashJoinBuildBlockInputStream x 10: , join_kind = Left + Expression: + Expression: + Expression: + HashJoinProbe: + Expression: + MockTableScan + Union: + Expression x 10: + Expression: + HashJoinProbe: + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + } + + { + // only join + ExchangeReceiver + DAGRequestBuilder receiver1 = context.receive("sender_l"); + DAGRequestBuilder receiver2 = context.receive("sender_r"); + DAGRequestBuilder receiver3 = context.receive("sender_l"); + DAGRequestBuilder receiver4 = context.receive("sender_r"); + + auto request = receiver1.join( + receiver2.join( + receiver3.join(receiver4, + {col("join_c")}, + ASTTableJoin::Kind::Left), + {col("join_c")}, + ASTTableJoin::Kind::Left), + {col("join_c")}, + ASTTableJoin::Kind::Left) + .build(context); + + String expected = R"( +CreatingSets + Union: + HashJoinBuildBlockInputStream x 10: , join_kind = Left + Expression: + Expression: + MockExchangeReceiver + Union x 2: + HashJoinBuildBlockInputStream x 10: , join_kind = Left + Expression: + Expression: + Expression: + HashJoinProbe: + Expression: + MockExchangeReceiver + Union: + Expression x 10: + Expression: + HashJoinProbe: + Expression: + MockExchangeReceiver)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + } + + { + // join + receiver + sender + DAGRequestBuilder receiver1 = context.receive("sender_l"); + DAGRequestBuilder receiver2 = context.receive("sender_r"); + DAGRequestBuilder receiver3 = context.receive("sender_l"); + DAGRequestBuilder receiver4 = context.receive("sender_r"); + + auto request = receiver1.join( + receiver2.join( + receiver3.join(receiver4, + {col("join_c")}, + ASTTableJoin::Kind::Left), + {col("join_c")}, + ASTTableJoin::Kind::Left), + {col("join_c")}, + ASTTableJoin::Kind::Left) + .exchangeSender(tipb::PassThrough) + .build(context); + + String expected = R"( +CreatingSets + Union: + HashJoinBuildBlockInputStream x 10: , join_kind = Left + Expression: + Expression: + MockExchangeReceiver + Union x 2: + HashJoinBuildBlockInputStream x 10: , join_kind = Left + Expression: + Expression: + Expression: + HashJoinProbe: + Expression: + MockExchangeReceiver + Union: + MockExchangeSender x 10 + Expression: + Expression: + HashJoinProbe: + Expression: + MockExchangeReceiver)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + } +} +CATCH + +TEST_F(InterpreterExecuteTest, JoinThenAgg) +try +{ + { + // Left Join. + DAGRequestBuilder table1 = context.scan("test_db", "r_table"); + DAGRequestBuilder table2 = context.scan("test_db", "l_table"); + + auto request = table1.join( + table2, + {col("join_c")}, + ASTTableJoin::Kind::Left) + .aggregation({Max(col("r_a"))}, {col("join_c")}) + .build(context); + String expected = R"( +CreatingSets + Union: + HashJoinBuildBlockInputStream x 10: , join_kind = Left + Expression: + Expression: + MockTableScan + Union: + Expression x 10: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + Expression: + HashJoinProbe: + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + } + + { + // Right Join + DAGRequestBuilder table1 = context.scan("test_db", "r_table"); + DAGRequestBuilder table2 = context.scan("test_db", "l_table"); + + auto request = table1.join( + table2, + {col("join_c")}, + ASTTableJoin::Kind::Right) + .aggregation({Max(col("r_a"))}, {col("join_c")}) + .build(context); + String expected = R"( +CreatingSets + Union: + HashJoinBuildBlockInputStream x 10: , join_kind = Right + Expression: + Expression: + MockTableScan + Union: + Expression x 10: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + Expression: + HashJoinProbe: + Expression: + Expression: + MockTableScan + Expression x 10: + Expression: + NonJoined: )"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + } + + { + // Right join + receiver + sender + DAGRequestBuilder receiver1 = context.receive("sender_l"); + DAGRequestBuilder receiver2 = context.receive("sender_r"); + + auto request = receiver1.join( + receiver2, + {col("join_c")}, + ASTTableJoin::Kind::Right) + .aggregation({Sum(col("r_a"))}, {col("join_c")}) + .exchangeSender(tipb::PassThrough) + .limit(10) + .build(context); + String expected = R"( +CreatingSets + Union: + HashJoinBuildBlockInputStream x 20: , join_kind = Right + Expression: + Expression: + MockExchangeReceiver + Union: + MockExchangeSender x 20 + SharedQuery: + Limit, limit = 10 + Union: + Limit x 20, limit = 10 + Expression: + Expression: + SharedQuery: + ParallelAggregating, max_threads: 20, final: true + Expression x 20: + Expression: + HashJoinProbe: + Expression: + Expression: + MockExchangeReceiver + Expression x 20: + Expression: + NonJoined: )"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 20); + } +} +CATCH + } // namespace tests } // namespace DB \ No newline at end of file diff --git a/dbms/src/TestUtils/mockExecutor.h b/dbms/src/TestUtils/mockExecutor.h index 7c12ab3b302..f9ded853e61 100644 --- a/dbms/src/TestUtils/mockExecutor.h +++ b/dbms/src/TestUtils/mockExecutor.h @@ -185,6 +185,7 @@ MockWindowFrame buildDefaultRowsFrame(); #define Or(expr1, expr2) makeASTFunction("or", (expr1), (expr2)) #define NOT(expr) makeASTFunction("not", (expr)) #define Max(expr) makeASTFunction("max", (expr)) +#define Sum(expr) makeASTFunction("sum", (expr)) /// Window functions #define RowNumber() makeASTFunction("RowNumber") #define Rank() makeASTFunction("Rank") From f0d1f93b30999db803bc2e3e6c498371ca9194a5 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Thu, 7 Jul 2022 23:45:28 +0800 Subject: [PATCH 13/16] unprepared_inputs use MPMCQueue as well Signed-off-by: gengliqi --- .../src/DataStreams/ParallelInputsProcessor.h | 28 ++++--------------- 1 file changed, 6 insertions(+), 22 deletions(-) diff --git a/dbms/src/DataStreams/ParallelInputsProcessor.h b/dbms/src/DataStreams/ParallelInputsProcessor.h index 72438f5ca57..2f8071b27a1 100644 --- a/dbms/src/DataStreams/ParallelInputsProcessor.h +++ b/dbms/src/DataStreams/ParallelInputsProcessor.h @@ -131,8 +131,6 @@ class ParallelInputsProcessor /// Ask all sources to stop earlier than they run out. void cancel(bool kill) { - finish = true; - working_inputs.available_inputs.cancel(); working_additional_inputs.available_inputs.cancel(); @@ -181,6 +179,7 @@ class ParallelInputsProcessor explicit WorkingInputs(const BlockInputStreams & inputs_) : available_inputs(inputs_.size()) , active_inputs(inputs_.size()) + , unprepared_inputs(inputs_.size()) { for (size_t i = 0; i < inputs_.size(); ++i) unprepared_inputs.emplace(inputs_[i], i); @@ -212,11 +211,8 @@ class ParallelInputsProcessor * First, streams are located here. * After a stream was prepared, it is moved to "available_inputs" for reading. */ - using UnpreparedInputs = std::queue; + using UnpreparedInputs = MPMCQueue; UnpreparedInputs unprepared_inputs; - - /// For operations with unprepared_inputs. - std::mutex unprepared_inputs_mutex; }; void cancelStreams(const BlockInputStreams & streams, bool kill) @@ -261,7 +257,7 @@ class ParallelInputsProcessor if (0 == --active_threads) { - handler.onFinish(); /// TODO If in `onFinish` or `onFinishThread` there is an exception, then std::terminate is called. + handler.onFinish(); } } @@ -285,8 +281,8 @@ class ParallelInputsProcessor } /// This function may be called in different threads. - /// If finish is not true and no exception occurs, we can ensure that the work is - /// all done when the function returns in any thread. + /// If no exception occurs, we can ensure that the work is all done when the function + /// returns in any thread. void loop(size_t thread_num, WorkingInputs & work) { if (work.active_inputs == 0) @@ -296,18 +292,8 @@ class ParallelInputsProcessor InputData input; - while (!finish) + while (work.unprepared_inputs.tryPop(input)) { - { - std::lock_guard lock(work.unprepared_inputs_mutex); - - if (work.unprepared_inputs.empty()) - break; - - input = work.unprepared_inputs.front(); - work.unprepared_inputs.pop(); - } - input.in->readPrefix(); work.available_inputs.push(input); @@ -349,8 +335,6 @@ class ParallelInputsProcessor /// How many sources ran out. std::atomic active_threads{0}; - /// Finish the threads work (before the sources run out). - std::atomic finish{false}; /// Wait for the completion of all threads. std::atomic joined_threads{false}; From fa00bd0810f041b91054f4610c735039d670375d Mon Sep 17 00:00:00 2001 From: gengliqi Date: Fri, 8 Jul 2022 02:03:58 +0800 Subject: [PATCH 14/16] add sum Signed-off-by: gengliqi --- dbms/src/Debug/astToExecutor.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/Debug/astToExecutor.cpp b/dbms/src/Debug/astToExecutor.cpp index 78f2485beb6..481eac65fe2 100644 --- a/dbms/src/Debug/astToExecutor.cpp +++ b/dbms/src/Debug/astToExecutor.cpp @@ -1540,12 +1540,12 @@ ExecutorPtr compileAggregation(ExecutorPtr input, size_t & executor_index, ASTPt } TiDB::ColumnInfo ci; - if (func->name == "count" || func->name == "sum") + if (func->name == "count") { ci.tp = TiDB::TypeLongLong; ci.flag = TiDB::ColumnFlagUnsigned | TiDB::ColumnFlagNotNull; } - else if (func->name == "max" || func->name == "min" || func->name == "first_row") + else if (func->name == "max" || func->name == "min" || func->name == "first_row" || func->name == "sum") { ci = children_ci[0]; ci.flag &= ~TiDB::ColumnFlagNotNull; From 5e1cfa19deef6aae23067a3c2752b2168b94d606 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Fri, 8 Jul 2022 10:37:22 +0800 Subject: [PATCH 15/16] address comment Signed-off-by: gengliqi --- dbms/src/DataStreams/ParallelInputsProcessor.h | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/dbms/src/DataStreams/ParallelInputsProcessor.h b/dbms/src/DataStreams/ParallelInputsProcessor.h index 2f8071b27a1..57ab37e1756 100644 --- a/dbms/src/DataStreams/ParallelInputsProcessor.h +++ b/dbms/src/DataStreams/ParallelInputsProcessor.h @@ -141,11 +141,11 @@ class ParallelInputsProcessor /// Wait until all threads are finished, before the destructor. void wait() { - if (joined_threads) - return; if (thread_manager) + { thread_manager->wait(); - joined_threads = true; + thread_manager.reset(); + } } size_t getNumActiveThreads() const @@ -335,8 +335,6 @@ class ParallelInputsProcessor /// How many sources ran out. std::atomic active_threads{0}; - /// Wait for the completion of all threads. - std::atomic joined_threads{false}; const LoggerPtr log; }; From 11367dabb9866467864556f864db39b678a7979d Mon Sep 17 00:00:00 2001 From: gengliqi Date: Fri, 8 Jul 2022 23:27:54 +0800 Subject: [PATCH 16/16] fix dag & select interpreter Signed-off-by: gengliqi --- .../Coprocessor/DAGQueryBlockInterpreter.cpp | 7 ++- .../Flash/Coprocessor/InterpreterUtils.cpp | 39 +++++++++---- .../Interpreters/InterpreterSelectQuery.cpp | 58 +++++++++++-------- .../src/Interpreters/InterpreterSelectQuery.h | 8 +-- 4 files changed, 72 insertions(+), 40 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index c234cda79e5..bf695da34c1 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -395,6 +395,7 @@ void DAGQueryBlockInterpreter::executeAggregation( max_streams, settings.aggregation_memory_efficient_merge_threads ? static_cast(settings.aggregation_memory_efficient_merge_threads) : static_cast(settings.max_threads), log->identifier()); + pipeline.streams.resize(1); pipeline.streams_with_non_joined_data.clear(); pipeline.firstStream() = std::move(stream); @@ -408,11 +409,13 @@ void DAGQueryBlockInterpreter::executeAggregation( BlockInputStreams inputs; if (!pipeline.streams.empty()) inputs.push_back(pipeline.firstStream()); - else - pipeline.streams.resize(1); + if (!pipeline.streams_with_non_joined_data.empty()) inputs.push_back(pipeline.streams_with_non_joined_data.at(0)); + pipeline.streams.resize(1); + pipeline.streams_with_non_joined_data.clear(); + pipeline.firstStream() = std::make_shared( std::make_shared(inputs, log->identifier()), params, diff --git a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp index 58d6f21a578..6415d36389b 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp @@ -46,17 +46,34 @@ void executeUnion( bool ignore_block, const String & extra_info) { - if (pipeline.streams.size() == 1 && pipeline.streams_with_non_joined_data.empty()) - return; - BlockInputStreamPtr stream; - if (ignore_block) - stream = std::make_shared(pipeline.streams, pipeline.streams_with_non_joined_data, max_streams, log->identifier()); - else - stream = std::make_shared(pipeline.streams, pipeline.streams_with_non_joined_data, max_streams, log->identifier()); - stream->setExtraInfo(extra_info); - pipeline.streams.resize(1); - pipeline.streams_with_non_joined_data.clear(); - pipeline.firstStream() = std::move(stream); + switch (pipeline.streams.size() + pipeline.streams_with_non_joined_data.size()) + { + case 0: + break; + case 1: + { + if (pipeline.streams.size() == 1) + break; + // streams_with_non_joined_data's size is 1. + pipeline.streams.push_back(pipeline.streams_with_non_joined_data.at(0)); + pipeline.streams_with_non_joined_data.clear(); + break; + } + default: + { + BlockInputStreamPtr stream; + if (ignore_block) + stream = std::make_shared(pipeline.streams, pipeline.streams_with_non_joined_data, max_streams, log->identifier()); + else + stream = std::make_shared(pipeline.streams, pipeline.streams_with_non_joined_data, max_streams, log->identifier()); + stream->setExtraInfo(extra_info); + + pipeline.streams.resize(1); + pipeline.streams_with_non_joined_data.clear(); + pipeline.firstStream() = std::move(stream); + break; + } + } } ExpressionActionsPtr generateProjectExpressionActions( diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 8028306bcb8..3514f915626 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -512,13 +512,13 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt { const auto & join = static_cast(*query.join()->table_join); if (join.kind == ASTTableJoin::Kind::Full || join.kind == ASTTableJoin::Kind::Right) - pipeline.stream_with_non_joined_data = expressions.before_join->createStreamWithNonJoinedDataIfFullOrRightJoin( + pipeline.streams_with_non_joined_data.push_back(expressions.before_join->createStreamWithNonJoinedDataIfFullOrRightJoin( pipeline.firstStream()->getHeader(), 0, 1, - settings.max_block_size); + settings.max_block_size)); - for (auto & stream : pipeline.streams) /// Applies to all sources except stream_with_non_joined_data. + for (auto & stream : pipeline.streams) /// Applies to all sources except streams_with_non_joined_data. stream = std::make_shared(stream, expressions.before_join, /*req_id=*/""); } @@ -603,7 +603,7 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt if (need_second_distinct_pass || query.limit_length || query.limit_by_expression_list - || pipeline.stream_with_non_joined_data) + || !pipeline.streams_with_non_joined_data.empty()) { need_merge_streams = true; } @@ -987,11 +987,11 @@ void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const Expre Aggregator::Params params(header, keys, aggregates, overflow_row, settings.max_rows_to_group_by, settings.group_by_overflow_mode, allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0), allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0), settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set, context.getTemporaryPath()); /// If there are several sources, then we perform parallel aggregation - if (pipeline.streams.size() > 1) + if (pipeline.streams.size() > 1 || pipeline.streams_with_non_joined_data.size() > 1) { - pipeline.firstStream() = std::make_shared( + auto stream = std::make_shared( pipeline.streams, - BlockInputStreams{pipeline.stream_with_non_joined_data}, + pipeline.streams_with_non_joined_data, params, file_provider, final, @@ -1001,19 +1001,21 @@ void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const Expre : static_cast(settings.max_threads), /*req_id=*/""); - pipeline.stream_with_non_joined_data = nullptr; pipeline.streams.resize(1); + pipeline.streams_with_non_joined_data.clear(); + pipeline.firstStream() = std::move(stream); } else { BlockInputStreams inputs; if (!pipeline.streams.empty()) inputs.push_back(pipeline.firstStream()); - else - pipeline.streams.resize(1); - if (pipeline.stream_with_non_joined_data) - inputs.push_back(pipeline.stream_with_non_joined_data); + if (!pipeline.streams_with_non_joined_data.empty()) + inputs.push_back(pipeline.streams_with_non_joined_data.at(0)); + + pipeline.streams.resize(1); + pipeline.streams_with_non_joined_data.clear(); pipeline.firstStream() = std::make_shared( std::make_shared(inputs, /*req_id=*/""), @@ -1021,8 +1023,6 @@ void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const Expre file_provider, final, /*req_id=*/""); - - pipeline.stream_with_non_joined_data = nullptr; } } @@ -1244,21 +1244,33 @@ void InterpreterSelectQuery::executeDistinct(Pipeline & pipeline, bool before_or void InterpreterSelectQuery::executeUnion(Pipeline & pipeline) { - /// If there are still several streams, then we combine them into one - if (pipeline.hasMoreThanOneStream()) + switch (pipeline.streams.size() + pipeline.streams_with_non_joined_data.size()) { - pipeline.firstStream() = std::make_shared>( + case 0: + break; + case 1: + { + if (pipeline.streams.size() == 1) + break; + // streams_with_non_joined_data's size is 1. + pipeline.streams.push_back(pipeline.streams_with_non_joined_data.at(0)); + pipeline.streams_with_non_joined_data.clear(); + break; + } + default: + { + BlockInputStreamPtr stream = std::make_shared>( pipeline.streams, - BlockInputStreams{pipeline.stream_with_non_joined_data}, + pipeline.streams_with_non_joined_data, max_streams, /*req_id=*/""); - pipeline.stream_with_non_joined_data = nullptr; + ; + pipeline.streams.resize(1); + pipeline.streams_with_non_joined_data.clear(); + pipeline.firstStream() = std::move(stream); + break; } - else if (pipeline.stream_with_non_joined_data) - { - pipeline.streams.push_back(pipeline.stream_with_non_joined_data); - pipeline.stream_with_non_joined_data = nullptr; } } diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.h b/dbms/src/Interpreters/InterpreterSelectQuery.h index 474ace7ee84..d1bcec2a3dd 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.h +++ b/dbms/src/Interpreters/InterpreterSelectQuery.h @@ -95,7 +95,7 @@ class InterpreterSelectQuery : public IInterpreter * It has a special meaning, since reading from it should be done after reading from the main streams. * It is appended to the main streams in UnionBlockInputStream or ParallelAggregatingBlockInputStream. */ - BlockInputStreamPtr stream_with_non_joined_data; + BlockInputStreams streams_with_non_joined_data; BlockInputStreamPtr & firstStream() { return streams.at(0); } @@ -105,13 +105,13 @@ class InterpreterSelectQuery : public IInterpreter for (auto & stream : streams) transform(stream); - if (stream_with_non_joined_data) - transform(stream_with_non_joined_data); + for (auto & stream : streams_with_non_joined_data) + transform(stream); } bool hasMoreThanOneStream() const { - return streams.size() + (stream_with_non_joined_data ? 1 : 0) > 1; + return streams.size() + streams_with_non_joined_data.size() > 1; } };