From c7c65c68b121462b4933671a6380525a79483814 Mon Sep 17 00:00:00 2001 From: xufei Date: Tue, 31 Jan 2023 17:33:42 +0800 Subject: [PATCH 1/3] save work Signed-off-by: xufei --- executor/index_merge_reader.go | 19 +++++++++++-------- executor/index_merge_reader_test.go | 19 +++++++++++++++++++ 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go index 8dc359fa37163..b190899f46d06 100644 --- a/executor/index_merge_reader.go +++ b/executor/index_merge_reader.go @@ -612,8 +612,8 @@ func (e *IndexMergeReaderExecutor) startIndexMergeTableScanWorker(ctx context.Co defer trace.StartRegion(ctx, "IndexMergeTableScanWorker").End() var task *indexMergeTableTask util.WithRecovery( - func() { task = worker.pickAndExecTask(ctx1) }, - worker.handlePickAndExecTaskPanic(ctx1, task), + func() { worker.pickAndExecTask(ctx1, &task) }, + worker.handlePickAndExecTaskPanic(ctx1, &task), ) cancel() e.tblWorkerWg.Done() @@ -1107,12 +1107,12 @@ type indexMergeTableScanWorker struct { memTracker *memory.Tracker } -func (w *indexMergeTableScanWorker) pickAndExecTask(ctx context.Context) (task *indexMergeTableTask) { +func (w *indexMergeTableScanWorker) pickAndExecTask(ctx context.Context, task **indexMergeTableTask) { var ok bool for { waitStart := time.Now() select { - case task, ok = <-w.workCh: + case *task, ok = <-w.workCh: if !ok { return } @@ -1120,17 +1120,18 @@ func (w *indexMergeTableScanWorker) pickAndExecTask(ctx context.Context) (task * return } execStart := time.Now() - err := w.executeTask(ctx, task) + err := w.executeTask(ctx, *task) if w.stats != nil { atomic.AddInt64(&w.stats.WaitTime, int64(execStart.Sub(waitStart))) atomic.AddInt64(&w.stats.FetchRow, int64(time.Since(execStart))) atomic.AddInt64(&w.stats.TableTaskNum, 1) } - task.doneCh <- err + failpoint.Inject("testIndexMergePickAndExecTaskPanic", nil) + (*task).doneCh <- err } } -func (w *indexMergeTableScanWorker) handlePickAndExecTaskPanic(ctx context.Context, task *indexMergeTableTask) func(r interface{}) { +func (w *indexMergeTableScanWorker) handlePickAndExecTaskPanic(ctx context.Context, task **indexMergeTableTask) func(r interface{}) { return func(r interface{}) { if r == nil { return @@ -1138,7 +1139,9 @@ func (w *indexMergeTableScanWorker) handlePickAndExecTaskPanic(ctx context.Conte err4Panic := errors.Errorf("panic in IndexMergeReaderExecutor indexMergeTableWorker: %v", r) logutil.Logger(ctx).Error(err4Panic.Error()) - task.doneCh <- err4Panic + if *task != nil { + (*task).doneCh <- err4Panic + } } } diff --git a/executor/index_merge_reader_test.go b/executor/index_merge_reader_test.go index 79d2d8b895a81..ab8b084912075 100644 --- a/executor/index_merge_reader_test.go +++ b/executor/index_merge_reader_test.go @@ -51,6 +51,25 @@ func TestSingleTableRead(t *testing.T) { tk.MustQuery("select /*+ use_index_merge(t1, t1a, t1b) */ sum(a) from t1 where a < 2 or b > 4").Check(testkit.Rows("6")) } +func TestIndexMergePickAndExecTaskPanic(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t1, t2") + tk.MustExec("create table t1(id int primary key, a int, b int, c int, d int)") + tk.MustExec("create index t1a on t1(a)") + tk.MustExec("create index t1b on t1(b)") + tk.MustExec("insert into t1 values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5)") + tk.MustQuery("select /*+ use_index_merge(t1, primary, t1a) */ * from t1 where id < 2 or a > 4 order by id").Check(testkit.Rows("1 1 1 1 1", + "5 5 5 5 5")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergePickAndExecTaskPanic", "panic")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergePickAndExecTaskPanic")) + }() + err := tk.QueryToErr("select /*+ use_index_merge(t1, primary, t1a) */ * from t1 where id < 2 or a > 4 order by id") + require.Contains(t, err.Error(), "testIndexMergePickAndExecTaskPanic") +} + func TestJoin(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) From d711603f3d0ed52121589c1e84fb5d95e082972f Mon Sep 17 00:00:00 2001 From: xufei Date: Tue, 31 Jan 2023 17:50:28 +0800 Subject: [PATCH 2/3] refine Signed-off-by: xufei --- executor/index_merge_reader_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/executor/index_merge_reader_test.go b/executor/index_merge_reader_test.go index ab8b084912075..9f21d416c34b9 100644 --- a/executor/index_merge_reader_test.go +++ b/executor/index_merge_reader_test.go @@ -62,12 +62,12 @@ func TestIndexMergePickAndExecTaskPanic(t *testing.T) { tk.MustExec("insert into t1 values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5)") tk.MustQuery("select /*+ use_index_merge(t1, primary, t1a) */ * from t1 where id < 2 or a > 4 order by id").Check(testkit.Rows("1 1 1 1 1", "5 5 5 5 5")) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergePickAndExecTaskPanic", "panic")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergePickAndExecTaskPanic", "panic(\"pickAndExecTaskPanic\")")) defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergePickAndExecTaskPanic")) }() err := tk.QueryToErr("select /*+ use_index_merge(t1, primary, t1a) */ * from t1 where id < 2 or a > 4 order by id") - require.Contains(t, err.Error(), "testIndexMergePickAndExecTaskPanic") + require.Contains(t, err.Error(), "pickAndExecTaskPanic") } func TestJoin(t *testing.T) { From 1566a2b6a84738706913579b629f9cc801a44768 Mon Sep 17 00:00:00 2001 From: xufei Date: Wed, 1 Feb 2023 13:26:56 +0800 Subject: [PATCH 3/3] add more comments Signed-off-by: xufei --- executor/index_merge_reader.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go index b190899f46d06..895bcd2a4f744 100644 --- a/executor/index_merge_reader.go +++ b/executor/index_merge_reader.go @@ -612,6 +612,11 @@ func (e *IndexMergeReaderExecutor) startIndexMergeTableScanWorker(ctx context.Co defer trace.StartRegion(ctx, "IndexMergeTableScanWorker").End() var task *indexMergeTableTask util.WithRecovery( + // Note we use the address of `task` as the argument of both `pickAndExecTask` and `handlePickAndExecTaskPanic` + // because `task` is expected to be assigned in `pickAndExecTask`, and this assignment should also be visible + // in `handlePickAndExecTaskPanic` since it will get `doneCh` from `task`. Golang always pass argument by value, + // so if we don't use the address of `task` as the argument, the assignment to `task` in `pickAndExecTask` is + // not visible in `handlePickAndExecTaskPanic` func() { worker.pickAndExecTask(ctx1, &task) }, worker.handlePickAndExecTaskPanic(ctx1, &task), )