diff --git a/distsql/select_result.go b/distsql/select_result.go index 797c977e68f57..db2ed3680596c 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -14,6 +14,7 @@ package distsql import ( + "fmt" "time" "github.com/pingcap/errors" @@ -25,8 +26,10 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tipb/go-tipb" + "go.uber.org/zap" "golang.org/x/net/context" ) @@ -80,6 +83,12 @@ func (r *selectResult) Fetch(ctx context.Context) { func (r *selectResult) fetch(ctx context.Context) { startTime := time.Now() defer func() { + if c := recover(); c != nil { + err := fmt.Errorf("%v", c) + logutil.Logger(ctx).Error("OOM", zap.Error(err)) + r.results <- resultWithErr{err: err} + } + close(r.results) duration := time.Since(startTime) metrics.DistSQLQueryHistgram.WithLabelValues(r.label, r.sqlType).Observe(duration.Seconds()) diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index 2e1a1b021bf03..903dfd5ba4b91 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -545,8 +545,8 @@ func (sender *copIteratorTaskSender) sendToTaskCh(t *copTask) (exit bool) { return } -func (worker *copIteratorWorker) sendToRespCh(resp *copResponse, respCh chan<- *copResponse) (exit bool) { - if worker.memTracker != nil { +func (worker *copIteratorWorker) sendToRespCh(resp *copResponse, respCh chan<- *copResponse, checkOOM bool) (exit bool) { + if worker.memTracker != nil && checkOOM { worker.memTracker.Consume(int64(resp.MemSize())) } select { @@ -607,12 +607,24 @@ func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) { // handleTask handles single copTask, sends the result to channel, retry automatically on error. func (worker *copIteratorWorker) handleTask(bo *Backoffer, task *copTask, respCh chan<- *copResponse) { + defer func() { + r := recover() + if r != nil { + logutil.Logger(context.Background()).Error("copIteratorWork meet panic", + zap.Reflect("r", r), + zap.Stack("stack trace")) + resp := &copResponse{err: errors.Errorf("%v", r)} + // if panic has happened, set checkOOM to false to avoid another panic. + worker.sendToRespCh(resp, task.respChan, false) + } + }() + remainTasks := []*copTask{task} for len(remainTasks) > 0 { tasks, err := worker.handleTaskOnce(bo, remainTasks[0], respCh) if err != nil { resp := &copResponse{err: errors.Trace(err)} - worker.sendToRespCh(resp, respCh) + worker.sendToRespCh(resp, respCh, true) return } if len(tasks) > 0 { @@ -803,7 +815,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, resp *copRespo } } } - worker.sendToRespCh(resp, ch) + worker.sendToRespCh(resp, ch, true) return nil, nil }