From d5b2b2d5b8d6c90beb3086289d60b7b37efa6347 Mon Sep 17 00:00:00 2001 From: "TONG, Zhigao" Date: Wed, 4 Sep 2024 19:23:03 +0800 Subject: [PATCH] This is an automated cherry-pick of #55824 Signed-off-by: ti-chi-bot --- pkg/executor/index_lookup_hash_join.go | 39 ++++++++++++++++++++++--- pkg/executor/test/jointest/join_test.go | 6 ++++ 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/pkg/executor/index_lookup_hash_join.go b/pkg/executor/index_lookup_hash_join.go index 06bb650d81ed3..a5bff8bb468c4 100644 --- a/pkg/executor/index_lookup_hash_join.go +++ b/pkg/executor/index_lookup_hash_join.go @@ -19,6 +19,7 @@ import ( "fmt" "hash" "hash/fnv" + "runtime" "runtime/trace" "sync" "sync/atomic" @@ -76,7 +77,11 @@ type IndexNestedLoopHashJoin struct { prepared bool // panicErr records the error generated by panic recover. This is introduced to // return the actual error message instead of `context cancelled` to the client. - panicErr error + panicErr struct { + sync.Mutex + atomic.Bool + error + } ctxWithCancel context.Context } @@ -194,13 +199,26 @@ func (e *IndexNestedLoopHashJoin) finishJoinWorkers(r any) { if recoverdErr, ok := r.(error); ok { err = recoverdErr } +<<<<<<< HEAD:pkg/executor/index_lookup_hash_join.go if !e.keepOuterOrder { +======= + + if !e.panicErr.Load() { + e.panicErr.Lock() + if !e.panicErr.Load() { + e.panicErr.error = err + e.panicErr.Store(true) + } + e.panicErr.Unlock() + } + + if !e.KeepOuterOrder { +>>>>>>> 8cdd449c5fa (executor: fix data race in `IndexNestedLoopHashJoin` (#55824)):pkg/executor/join/index_lookup_hash_join.go e.resultCh <- &indexHashJoinResult{err: err} } else { task := &indexHashJoinTask{err: err} e.taskCh <- task } - e.panicErr = err if e.cancelFunc != nil { e.cancelFunc() } @@ -234,7 +252,10 @@ func (e *IndexNestedLoopHashJoin) Next(ctx context.Context, req *chunk.Chunk) er func (e *IndexNestedLoopHashJoin) runInOrder(ctx context.Context, req *chunk.Chunk) error { for { if e.isDryUpTasks(ctx) { - return e.panicErr + if e.panicErr.Load() { + return e.panicErr.error + } + return nil } if e.curTask.err != nil { return e.curTask.err @@ -290,7 +311,17 @@ func (e *IndexNestedLoopHashJoin) getResultFromChannel(ctx context.Context, resu return nil, result.err } case <-ctx.Done(): - err := e.panicErr + failpoint.Inject("TestIssue49692", func() { + for !e.panicErr.Load() { + runtime.Gosched() + } + }) + + err := error(nil) + if e.panicErr.Load() { + err = e.panicErr.error + } + if err == nil { err = ctx.Err() } diff --git a/pkg/executor/test/jointest/join_test.go b/pkg/executor/test/jointest/join_test.go index de519cc21f421..79534fd0e7934 100644 --- a/pkg/executor/test/jointest/join_test.go +++ b/pkg/executor/test/jointest/join_test.go @@ -757,6 +757,12 @@ func TestIssue30211(t *testing.T) { tk.MustExec("drop table if exists t1, t2;") tk.MustExec("create table t1(a int, index(a));") tk.MustExec("create table t2(a int, index(a));") + fpName2 := "github.com/pingcap/tidb/pkg/executor/join/TestIssue49692" + require.NoError(t, failpoint.Enable(fpName2, `return`)) + defer func() { + require.NoError(t, failpoint.Disable(fpName2)) + }() + func() { fpName := "github.com/pingcap/tidb/pkg/executor/TestIssue30211" require.NoError(t, failpoint.Enable(fpName, `panic("TestIssue30211 IndexJoinPanic")`))