diff --git a/dbms/src/DataStreams/ParallelInputsProcessor.h b/dbms/src/DataStreams/ParallelInputsProcessor.h index b3480e79b76..11511d111b2 100644 --- a/dbms/src/DataStreams/ParallelInputsProcessor.h +++ b/dbms/src/DataStreams/ParallelInputsProcessor.h @@ -303,12 +303,12 @@ class ParallelInputsProcessor } } + InputData input; while (!finish) /// You may need to stop work earlier than all sources run out. { - InputData input; - - /// Select the next source. + if (!input.in) { + /// Select the next source. std::lock_guard lock(available_inputs_mutex); /// If there are no free sources, then this thread is no longer needed. (But other threads can work with their sources.) @@ -324,31 +324,10 @@ class ParallelInputsProcessor /// The main work. Block block = input.in->read(); - { - if (finish) - break; - - /// If this source is not run out yet, then put the resulting block in the ready queue. - { - std::lock_guard lock(available_inputs_mutex); - - if (block) - { - available_inputs.push(input); - } - else - { - if (available_inputs.empty()) - break; - } - } - - if (finish) - break; - - if (block) - publishPayload(input.in, block, thread_num); - } + if (block) + publishPayload(input.in, block, thread_num); + else + input = InputData(); } }