Skip to content

Commit cc0f1ce

Browse files
qw4990winkyao
authored andcommitted
executor: handle OOM panic which not be recovered now in distSQL layer (#10545)
1 parent f7899de commit cc0f1ce

File tree

2 files changed

+25
-4
lines changed

2 files changed

+25
-4
lines changed

distsql/select_result.go

+9
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package distsql
1515

1616
import (
17+
"fmt"
1718
"time"
1819

1920
"github.com/pingcap/errors"
@@ -25,8 +26,10 @@ import (
2526
"github.com/pingcap/tidb/types"
2627
"github.com/pingcap/tidb/util/chunk"
2728
"github.com/pingcap/tidb/util/codec"
29+
"github.com/pingcap/tidb/util/logutil"
2830
"github.com/pingcap/tidb/util/memory"
2931
"github.com/pingcap/tipb/go-tipb"
32+
"go.uber.org/zap"
3033
"golang.org/x/net/context"
3134
)
3235

@@ -80,6 +83,12 @@ func (r *selectResult) Fetch(ctx context.Context) {
8083
func (r *selectResult) fetch(ctx context.Context) {
8184
startTime := time.Now()
8285
defer func() {
86+
if c := recover(); c != nil {
87+
err := fmt.Errorf("%v", c)
88+
logutil.Logger(ctx).Error("OOM", zap.Error(err))
89+
r.results <- resultWithErr{err: err}
90+
}
91+
8392
close(r.results)
8493
duration := time.Since(startTime)
8594
metrics.DistSQLQueryHistgram.WithLabelValues(r.label, r.sqlType).Observe(duration.Seconds())

store/tikv/coprocessor.go

+16-4
Original file line numberDiff line numberDiff line change
@@ -545,8 +545,8 @@ func (sender *copIteratorTaskSender) sendToTaskCh(t *copTask) (exit bool) {
545545
return
546546
}
547547

548-
func (worker *copIteratorWorker) sendToRespCh(resp *copResponse, respCh chan<- *copResponse) (exit bool) {
549-
if worker.memTracker != nil {
548+
func (worker *copIteratorWorker) sendToRespCh(resp *copResponse, respCh chan<- *copResponse, checkOOM bool) (exit bool) {
549+
if worker.memTracker != nil && checkOOM {
550550
worker.memTracker.Consume(int64(resp.MemSize()))
551551
}
552552
select {
@@ -607,12 +607,24 @@ func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) {
607607

608608
// handleTask handles single copTask, sends the result to channel, retry automatically on error.
609609
func (worker *copIteratorWorker) handleTask(bo *Backoffer, task *copTask, respCh chan<- *copResponse) {
610+
defer func() {
611+
r := recover()
612+
if r != nil {
613+
logutil.Logger(context.Background()).Error("copIteratorWork meet panic",
614+
zap.Reflect("r", r),
615+
zap.Stack("stack trace"))
616+
resp := &copResponse{err: errors.Errorf("%v", r)}
617+
// if panic has happened, set checkOOM to false to avoid another panic.
618+
worker.sendToRespCh(resp, task.respChan, false)
619+
}
620+
}()
621+
610622
remainTasks := []*copTask{task}
611623
for len(remainTasks) > 0 {
612624
tasks, err := worker.handleTaskOnce(bo, remainTasks[0], respCh)
613625
if err != nil {
614626
resp := &copResponse{err: errors.Trace(err)}
615-
worker.sendToRespCh(resp, respCh)
627+
worker.sendToRespCh(resp, respCh, true)
616628
return
617629
}
618630
if len(tasks) > 0 {
@@ -803,7 +815,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, resp *copRespo
803815
}
804816
}
805817
}
806-
worker.sendToRespCh(resp, ch)
818+
worker.sendToRespCh(resp, ch, true)
807819
return nil, nil
808820
}
809821

0 commit comments

Comments
 (0)