From baf76b6b5145d7b44641935ad2f8da2a37918214 Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Mon, 20 May 2019 19:46:34 +0800 Subject: [PATCH 1/2] executor: handle OOM panic which not be recovered now in distSQL layer --- distsql/select_result.go | 6 ++++++ store/tikv/coprocessor.go | 20 ++++++++++++++++---- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/distsql/select_result.go b/distsql/select_result.go index 797c977e68f57..ae998477595db 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -80,6 +80,12 @@ func (r *selectResult) Fetch(ctx context.Context) { func (r *selectResult) fetch(ctx context.Context) { startTime := time.Now() defer func() { + if c := recover(); c != nil { + err := fmt.Errorf("%v", c) + logutil.Logger(ctx).Error("OOM", zap.Error(err)) + r.results <- resultWithErr{err: err} + } + close(r.results) duration := time.Since(startTime) metrics.DistSQLQueryHistgram.WithLabelValues(r.label, r.sqlType).Observe(duration.Seconds()) diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index 2e1a1b021bf03..903dfd5ba4b91 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -545,8 +545,8 @@ func (sender *copIteratorTaskSender) sendToTaskCh(t *copTask) (exit bool) { return } -func (worker *copIteratorWorker) sendToRespCh(resp *copResponse, respCh chan<- *copResponse) (exit bool) { - if worker.memTracker != nil { +func (worker *copIteratorWorker) sendToRespCh(resp *copResponse, respCh chan<- *copResponse, checkOOM bool) (exit bool) { + if worker.memTracker != nil && checkOOM { worker.memTracker.Consume(int64(resp.MemSize())) } select { @@ -607,12 +607,24 @@ func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) { // handleTask handles single copTask, sends the result to channel, retry automatically on error. func (worker *copIteratorWorker) handleTask(bo *Backoffer, task *copTask, respCh chan<- *copResponse) { + defer func() { + r := recover() + if r != nil { + logutil.Logger(context.Background()).Error("copIteratorWork meet panic", + zap.Reflect("r", r), + zap.Stack("stack trace")) + resp := &copResponse{err: errors.Errorf("%v", r)} + // if panic has happened, set checkOOM to false to avoid another panic. + worker.sendToRespCh(resp, task.respChan, false) + } + }() + remainTasks := []*copTask{task} for len(remainTasks) > 0 { tasks, err := worker.handleTaskOnce(bo, remainTasks[0], respCh) if err != nil { resp := &copResponse{err: errors.Trace(err)} - worker.sendToRespCh(resp, respCh) + worker.sendToRespCh(resp, respCh, true) return } if len(tasks) > 0 { @@ -803,7 +815,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, resp *copRespo } } } - worker.sendToRespCh(resp, ch) + worker.sendToRespCh(resp, ch, true) return nil, nil } From 0deed227b36836ac8db15c456a46ff88e8bab04e Mon Sep 17 00:00:00 2001 From: qw4990 Date: Mon, 20 May 2019 19:54:15 +0800 Subject: [PATCH 2/2] fix import --- distsql/select_result.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/distsql/select_result.go b/distsql/select_result.go index ae998477595db..db2ed3680596c 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -14,6 +14,7 @@ package distsql import ( + "fmt" "time" "github.com/pingcap/errors" @@ -25,8 +26,10 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tipb/go-tipb" + "go.uber.org/zap" "golang.org/x/net/context" )