Skip to content

Commit ae2b254

Browse files
mrhhsgdataroaring
authored andcommitted
[fix](scan) catch exceptions thrown in scanner (#36101)
## Proposed changes The uncaught exceptions thrown in the scanner will cause the BE to crash. <!--Describe your changes.-->
1 parent d0ab33d commit ae2b254

File tree

1 file changed

+31
-6
lines changed

1 file changed

+31
-6
lines changed

be/src/vec/exec/scan/scanner_scheduler.cpp

+31-6
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,17 @@ void ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
136136
}
137137

138138
scanner_delegate->_scanner->start_wait_worker_timer();
139-
auto s = ctx->thread_token->submit_func(
140-
[this, scanner_ref = scan_task, ctx]() { this->_scanner_scan(ctx, scanner_ref); });
139+
auto s = ctx->thread_token->submit_func([scanner_ref = scan_task, ctx]() {
140+
auto status = [&] {
141+
RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
142+
return Status::OK();
143+
}();
144+
145+
if (!status.ok()) {
146+
scanner_ref->set_status(status);
147+
ctx->append_block_to_queue(scanner_ref);
148+
}
149+
});
141150
if (!s.ok()) {
142151
scan_task->set_status(s);
143152
ctx->append_block_to_queue(scan_task);
@@ -157,16 +166,32 @@ void ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
157166
is_local ? ctx->get_simple_scan_scheduler() : ctx->get_remote_scan_scheduler();
158167
auto& thread_pool = is_local ? _local_scan_thread_pool : _remote_scan_thread_pool;
159168
if (scan_sched) {
160-
auto work_func = [this, scanner_ref = scan_task, ctx]() {
161-
this->_scanner_scan(ctx, scanner_ref);
169+
auto work_func = [scanner_ref = scan_task, ctx]() {
170+
auto status = [&] {
171+
RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
172+
return Status::OK();
173+
}();
174+
175+
if (!status.ok()) {
176+
scanner_ref->set_status(status);
177+
ctx->append_block_to_queue(scanner_ref);
178+
}
162179
};
163180
SimplifiedScanTask simple_scan_task = {work_func, ctx};
164181
return scan_sched->submit_scan_task(simple_scan_task);
165182
}
166183

167184
PriorityThreadPool::Task task;
168-
task.work_function = [this, scanner_ref = scan_task, ctx]() {
169-
this->_scanner_scan(ctx, scanner_ref);
185+
task.work_function = [scanner_ref = scan_task, ctx]() {
186+
auto status = [&] {
187+
RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
188+
return Status::OK();
189+
}();
190+
191+
if (!status.ok()) {
192+
scanner_ref->set_status(status);
193+
ctx->append_block_to_queue(scanner_ref);
194+
}
170195
};
171196
task.priority = nice;
172197
return thread_pool->offer(task)

0 commit comments

Comments
 (0)