Skip to content

Commit

Permalink
try to improve locality
Browse files Browse the repository at this point in the history
Signed-off-by: gengliqi <gengliqiii@gmail.com>
  • Loading branch information
gengliqi committed Jul 1, 2022
1 parent 858f851 commit 2ed4141
Showing 1 changed file with 7 additions and 28 deletions.
35 changes: 7 additions & 28 deletions dbms/src/DataStreams/ParallelInputsProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -303,12 +303,12 @@ class ParallelInputsProcessor
}
}

InputData input;
while (!finish) /// You may need to stop work earlier than all sources run out.
{
InputData input;

/// Select the next source.
if (!input.in)
{
/// Select the next source.
std::lock_guard lock(available_inputs_mutex);

/// If there are no free sources, then this thread is no longer needed. (But other threads can work with their sources.)
Expand All @@ -324,31 +324,10 @@ class ParallelInputsProcessor
/// The main work.
Block block = input.in->read();

{
if (finish)
break;

/// If this source is not run out yet, then put the resulting block in the ready queue.
{
std::lock_guard lock(available_inputs_mutex);

if (block)
{
available_inputs.push(input);
}
else
{
if (available_inputs.empty())
break;
}
}

if (finish)
break;

if (block)
publishPayload(input.in, block, thread_num);
}
if (block)
publishPayload(input.in, block, thread_num);
else
input = InputData();
}
}

Expand Down

0 comments on commit 2ed4141

Please sign in to comment.