Skip to content

Commit

Permalink
[fix](scanner) Fix deadlock when scanner submit failed (apache#40495)
Browse files Browse the repository at this point in the history
We have dead lock when submit scanner to scheduler failed.

pstack looks like
```txt
Thread 2012 (Thread 0x7f87363fb700 (LWP 4179707) "Pipe_normal [wo"):
#0  0x00007f8b8f3dc82d in __lll_lock_wait () from /lib64/libpthread.so.0
#1  0x00007f8b8f3d5ad9 in pthread_mutex_lock () from /lib64/libpthread.so.0
#2  0x000055b20f333e7a in __gthread_mutex_lock (__mutex=0x7f8733d960a8) at /mnt/disk1/hezhiqiang/toolchains/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/x86_64-linux-gnu/c++/11/bits/gthr-default
.h:749
#3  std::mutex::lock (this=0x7f8733d960a8) at /mnt/disk1/hezhiqiang/toolchains/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/std_mutex.h:100
#4  std::lock_guard<std::mutex>::lock_guard (__m=..., this=<optimized out>) at /mnt/disk1/hezhiqiang/toolchains/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/std_mutex.h:229
#5  doris::vectorized::ScannerContext::append_block_to_queue (this=<optimized out>, scan_task=...) at /mnt/disk1/hezhiqiang/doris/be/src/vec/exec/scan/scanner_context.cpp:234
#6  0x000055b20f32c0f9 in doris::vectorized::ScannerScheduler::submit (this=<optimized out>, ctx=..., scan_task=...) at /mnt/disk1/hezhiqiang/doris/be/src/vec/exec/scan/scanner_scheduler.cpp:209
apache#7  0x000055b20f3338fc in doris::vectorized::ScannerContext::submit_scan_task (this=this@entry=0x7f8733d96010, scan_task=...) at /mnt/disk1/hezhiqiang/doris/be/src/vec/exec/scan/scanner_context.cpp:217
apache#8  0x000055b20f3346cd in doris::vectorized::ScannerContext::get_block_from_queue (this=0x7f8733d96010, state=<optimized out>, block=0x7f871f728de0, eos=0x7f871abce470, id=<optimized out>) at /mnt/disk1/hezhiqiang/doris/be/src/vec/exec/scan/scanner_context.cpp:290
apache#9  0x000055b214cb4f13 in doris::pipeline::ScanOperatorX<doris::pipeline::OlapScanLocalState>::get_block (this=<optimized out>, state=0x7f872f0eb400, block=0x7f8b8f3dc82d <__lll_lock_wait+29>, eos=0x7f871abce470) at /mnt/disk1/hezhiqiang/doris/be/src/pipeline/exec/scan_operator.cpp:1292
apache#10 0x000055b2142b5772 in doris::pipeline::ScanOperatorX<doris::pipeline::OlapScanLocalState>::get_block_after_projects (this=0x80, state=0x0, block=0x7f8b8f3dc82d <__lll_lock_wait+29>, eos=0x7f8733d960a8) at /mnt/disk1/hezhiqiang/doris/be/src/pipeline/exec/scan_operator.h:363
apache#11 0x000055b2142e7880 in doris::pipeline::StatefulOperatorX<doris::pipeline::StreamingAggLocalState>::get_block (this=0x7f871f9bee00, state=0x7f872f0eb400, block=0x7f8716d49060, eos=0x7f87363f4937) at /mnt/disk1/hezhiqiang/doris/be/src/pipeline/exec/operator.cpp:587
```
Deallock happens with following
```cpp
Status ScannerContext::get_block_from_queue {
     std::unique_lock l(_transfer_lock);
     ...
     if (scan_task->is_eos()) {
     ...
     } else {
          // resubmit current running scanner to read the next block
         submit_scan_task(scan_task);
     }
}

ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task) {
     _scanner_scheduler->submit(shared_from_this(), scan_task);
}

void ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
                              std::shared_ptr<ScanTask> scan_task) {
    ...
    if (auto ret = sumbit_task(); !ret) {
        scan_task->set_status(Status::InternalError(
                "Failed to submit scanner to scanner pool reason:" + std::string(ret.msg()) +
                "|type:" + std::to_string(type)));
        ctx->append_block_to_queue(scan_task);
        return;
    }
}

void ScannerContext::append_block_to_queue(std::shared_ptr<ScanTask> scan_task) {
    ...
    std::lock_guard<std::mutex> l(_transfer_lock);
    ...
}
```
Since mutex in cpp is not re-enterable, so the scanner thread will
deadlock with itself.

This pr fix the problem by making `ScannerScheduler::submit` return a
Status instead of doing append failed task to the ScannerContext. The
caller itself will decide where resubmit the scanner or just abort the
execution of the query.
  • Loading branch information
zhiqiang-hhhh authored and gavinchou committed Sep 12, 2024
1 parent bc02f3f commit d926041
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 28 deletions.
44 changes: 33 additions & 11 deletions be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ Status ScannerContext::init() {
for (int i = 0; i < _max_thread_num; ++i) {
std::weak_ptr<ScannerDelegate> next_scanner;
if (_scanners.try_dequeue(next_scanner)) {
submit_scan_task(std::make_shared<ScanTask>(next_scanner));
RETURN_IF_ERROR(submit_scan_task(std::make_shared<ScanTask>(next_scanner)));
_num_running_scanners++;
}
}
Expand Down Expand Up @@ -181,10 +181,10 @@ bool ScannerContext::empty_in_queue(int id) {
return _blocks_queue.empty();
}

void ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task) {
Status ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task) {
_scanner_sched_counter->update(1);
_num_scheduled_scanners++;
_scanner_scheduler->submit(shared_from_this(), scan_task);
return _scanner_scheduler->submit(shared_from_this(), scan_task);
}

void ScannerContext::append_block_to_queue(std::shared_ptr<ScanTask> scan_task) {
Expand Down Expand Up @@ -232,10 +232,15 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
auto scan_task = _blocks_queue.front();
DCHECK(scan_task);

// The abnormal status of scanner may come from the execution of the scanner itself,
// or come from the scanner scheduler, such as TooManyTasks.
if (!scan_task->status_ok()) {
// TODO: If the scanner status is TooManyTasks, maybe we can retry the scanner after a while.
_process_status = scan_task->get_status();
_set_scanner_done();
return scan_task->get_status();
return _process_status;
}

if (!scan_task->cached_blocks.empty()) {
auto [current_block, block_size] = std::move(scan_task->cached_blocks.front());
scan_task->cached_blocks.pop_front();
Expand All @@ -248,13 +253,20 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
block->swap(*current_block);
return_free_block(std::move(current_block));
} else {
// This scan task do not have any cached blocks.
_blocks_queue.pop_front();
if (scan_task->is_eos()) { // current scanner is finished, and no more data to read
// current scanner is finished, and no more data to read
if (scan_task->is_eos()) {
_num_finished_scanners++;
std::weak_ptr<ScannerDelegate> next_scanner;
// submit one of the remaining scanners
if (_scanners.try_dequeue(next_scanner)) {
submit_scan_task(std::make_shared<ScanTask>(next_scanner));
auto submit_status = submit_scan_task(std::make_shared<ScanTask>(next_scanner));
if (!submit_status.ok()) {
_process_status = submit_status;
_set_scanner_done();
return _process_status;
}
} else {
// no more scanner to be scheduled
// `_free_blocks` serve all running scanners, maybe it's too large for the remaining scanners
Expand All @@ -270,11 +282,16 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
}
} else {
// resubmit current running scanner to read the next block
submit_scan_task(scan_task);
Status submit_status = submit_scan_task(scan_task);
if (!submit_status.ok()) {
_process_status = submit_status;
_set_scanner_done();
return _process_status;
}
}
}
// scale up
_try_to_scale_up();
RETURN_IF_ERROR(_try_to_scale_up());
}

if (_num_finished_scanners == _all_scanners.size() && _blocks_queue.empty()) {
Expand All @@ -289,7 +306,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
return Status::OK();
}

void ScannerContext::_try_to_scale_up() {
Status ScannerContext::_try_to_scale_up() {
// Four criteria to determine whether to increase the parallelism of the scanners
// 1. It ran for at least `SCALE_UP_DURATION` ms after last scale up
// 2. Half(`WAIT_BLOCK_DURATION_RATIO`) of the duration is waiting to get blocks
Expand All @@ -306,7 +323,7 @@ void ScannerContext::_try_to_scale_up() {
// when _last_wait_duration_ratio > 0, it has scaled up before.
// we need to determine if the scale-up is effective:
// the wait duration ratio after last scaling up should less than 80% of `_last_wait_duration_ratio`
return;
return Status::OK();
}

bool is_scale_up = false;
Expand All @@ -322,7 +339,10 @@ void ScannerContext::_try_to_scale_up() {
// get enough memory to launch one more scanner.
std::weak_ptr<ScannerDelegate> scale_up_scanner;
if (_scanners.try_dequeue(scale_up_scanner)) {
submit_scan_task(std::make_shared<ScanTask>(scale_up_scanner));
// Just return error to caller.
// Because _try_to_scale_up is called under _transfer_lock locked, if we add the scanner
// to the block queue, we will get a deadlock.
RETURN_IF_ERROR(submit_scan_task(std::make_shared<ScanTask>(scale_up_scanner)));
_num_running_scanners++;
_scale_up_scanners_counter->update(1);
is_scale_up = true;
Expand All @@ -337,6 +357,8 @@ void ScannerContext::_try_to_scale_up() {
_total_wait_block_time = 0;
}
}

return Status::OK();
}

Status ScannerContext::validate_block_schema(Block* block) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/exec/scan/scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
// set the next scanned block to `ScanTask::current_block`
// set the error state to `ScanTask::status`
// set the `eos` to `ScanTask::eos` if there is no more data in current scanner
void submit_scan_task(std::shared_ptr<ScanTask> scan_task);
Status submit_scan_task(std::shared_ptr<ScanTask> scan_task);

// append the running scanner and its cached block to `_blocks_queue`
void append_block_to_queue(std::shared_ptr<ScanTask> scan_task);
Expand Down Expand Up @@ -184,7 +184,7 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
/// 3. `_free_blocks_memory_usage` < `_max_bytes_in_queue`, remains enough memory to scale up
/// 4. At most scale up `MAX_SCALE_UP_RATIO` times to `_max_thread_num`
void _set_scanner_done();
void _try_to_scale_up();
Status _try_to_scale_up();

RuntimeState* _state = nullptr;
pipeline::ScanLocalStateBase* _local_state = nullptr;
Expand Down
31 changes: 17 additions & 14 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,23 +119,23 @@ Status ScannerScheduler::init(ExecEnv* env) {
return Status::OK();
}

void ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
std::shared_ptr<ScanTask> scan_task) {
Status ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
std::shared_ptr<ScanTask> scan_task) {
scan_task->last_submit_time = GetCurrentTimeNanos();
if (ctx->done()) {
return;
return Status::OK();
}
auto task_lock = ctx->task_exec_ctx();
if (task_lock == nullptr) {
LOG(INFO) << "could not lock task execution context, query " << ctx->debug_string()
<< " maybe finished";
return;
return Status::OK();
}

if (ctx->thread_token != nullptr) {
std::shared_ptr<ScannerDelegate> scanner_delegate = scan_task->scanner.lock();
if (scanner_delegate == nullptr) {
return;
return Status::OK();
}

scanner_delegate->_scanner->start_wait_worker_timer();
Expand All @@ -152,13 +152,12 @@ void ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
});
if (!s.ok()) {
scan_task->set_status(s);
ctx->append_block_to_queue(scan_task);
return;
return s;
}
} else {
std::shared_ptr<ScannerDelegate> scanner_delegate = scan_task->scanner.lock();
if (scanner_delegate == nullptr) {
return;
return Status::OK();
}

scanner_delegate->_scanner->start_wait_worker_timer();
Expand Down Expand Up @@ -186,14 +185,18 @@ void ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
return scan_sched->submit_scan_task(simple_scan_task);
};

if (auto ret = sumbit_task(); !ret) {
scan_task->set_status(Status::InternalError(
"Failed to submit scanner to scanner pool reason:" + std::string(ret.msg()) +
"|type:" + std::to_string(type)));
ctx->append_block_to_queue(scan_task);
return;
Status submit_status = sumbit_task();
if (!submit_status.ok()) {
// User will see TooManyTasks error. It looks like a more reasonable error.
Status scan_task_status = Status::TooManyTasks(
"Failed to submit scanner to scanner pool reason:" +
std::string(submit_status.msg()) + "|type:" + std::to_string(type));
scan_task->set_status(scan_task_status);
return scan_task_status;
}
}

return Status::OK();
}

std::unique_ptr<ThreadPoolToken> ScannerScheduler::new_limited_scan_pool_token(
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/scan/scanner_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class ScannerScheduler {

[[nodiscard]] Status init(ExecEnv* env);

void submit(std::shared_ptr<ScannerContext> ctx, std::shared_ptr<ScanTask> scan_task);
Status submit(std::shared_ptr<ScannerContext> ctx, std::shared_ptr<ScanTask> scan_task);

void stop();

Expand Down

0 comments on commit d926041

Please sign in to comment.