diff --git a/pkg/executor/index_lookup_hash_join.go b/pkg/executor/index_lookup_hash_join.go index 9f1d7a94927b7..4783c4ccf1f79 100644 --- a/pkg/executor/index_lookup_hash_join.go +++ b/pkg/executor/index_lookup_hash_join.go @@ -19,6 +19,7 @@ import ( "fmt" "hash" "hash/fnv" + "runtime" "runtime/trace" "sync" "sync/atomic" @@ -76,7 +77,11 @@ type IndexNestedLoopHashJoin struct { prepared bool // panicErr records the error generated by panic recover. This is introduced to // return the actual error message instead of `context cancelled` to the client. - panicErr error + panicErr struct { + sync.Mutex + atomic.Bool + error + } ctxWithCancel context.Context } @@ -192,13 +197,29 @@ func (e *IndexNestedLoopHashJoin) finishJoinWorkers(r interface{}) { if r != nil { e.IndexLookUpJoin.finished.Store(true) err := fmt.Errorf("%v", r) + + if !e.panicErr.Load() { + e.panicErr.Lock() + if !e.panicErr.Load() { + e.panicErr.error = err + e.panicErr.Store(true) + } + e.panicErr.Unlock() + } + + failpoint.Inject("TestIssue49692", func() { + failpoint.Goto("TestIssue49692End") + }) + if !e.keepOuterOrder { e.resultCh <- &indexHashJoinResult{err: err} } else { task := &indexHashJoinTask{err: err} e.taskCh <- task } - e.panicErr = err + + failpoint.Label("TestIssue49692End") + if e.cancelFunc != nil { e.cancelFunc() } @@ -232,7 +253,7 @@ func (e *IndexNestedLoopHashJoin) Next(ctx context.Context, req *chunk.Chunk) er func (e *IndexNestedLoopHashJoin) runInOrder(ctx context.Context, req *chunk.Chunk) error { for { if e.isDryUpTasks(ctx) { - return e.panicErr + return e.getPanicErr() } if e.curTask.err != nil { return e.curTask.err @@ -274,6 +295,13 @@ func (e *IndexNestedLoopHashJoin) isDryUpTasks(ctx context.Context) bool { return false } +func (e *IndexNestedLoopHashJoin) getPanicErr() error { + if e.panicErr.Load() { + return e.panicErr.error + } + return nil +} + func (e *IndexNestedLoopHashJoin) getResultFromChannel(ctx context.Context, resultCh <-chan *indexHashJoinResult) (*indexHashJoinResult, error) { var ( result *indexHashJoinResult @@ -288,7 +316,14 @@ func (e *IndexNestedLoopHashJoin) getResultFromChannel(ctx context.Context, resu return nil, result.err } case <-ctx.Done(): - err := e.panicErr + failpoint.Inject("TestIssue49692", func() { + for !e.panicErr.Load() { + runtime.Gosched() + } + }) + + err := e.getPanicErr() + if err == nil { err = ctx.Err() } diff --git a/pkg/executor/test/jointest/join_test.go b/pkg/executor/test/jointest/join_test.go index 684202925e2b8..547bd13890bd9 100644 --- a/pkg/executor/test/jointest/join_test.go +++ b/pkg/executor/test/jointest/join_test.go @@ -1367,6 +1367,24 @@ func TestIssue30211(t *testing.T) { err = tk.QueryToErr("select /*+ inl_hash_join(t1) */ * from t1 join t2 on t1.a = t2.a;") require.EqualError(t, err, "failpoint panic: TestIssue30211 IndexJoinPanic") }() + + func() { + fpName := "github.com/pingcap/tidb/pkg/executor/TestIssue30211" + require.NoError(t, failpoint.Enable(fpName, `panic("TestIssue30211 IndexJoinPanic")`)) + fpName2 := "github.com/pingcap/tidb/pkg/executor/TestIssue49692" + require.NoError(t, failpoint.Enable(fpName2, `return`)) + + defer func() { + require.NoError(t, failpoint.Disable(fpName)) + require.NoError(t, failpoint.Disable(fpName2)) + }() + err := tk.QueryToErr("select /*+ inl_join(t1) */ * from t1 join t2 on t1.a = t2.a;") + require.EqualError(t, err, "failpoint panic: TestIssue30211 IndexJoinPanic") + + err = tk.QueryToErr("select /*+ inl_hash_join(t1) */ * from t1 join t2 on t1.a = t2.a;") + require.EqualError(t, err, "failpoint panic: TestIssue30211 IndexJoinPanic") + }() + tk.MustExec("insert into t1 values(1),(2);") tk.MustExec("insert into t2 values(1),(1),(2),(2);") tk.MustExec("set @@tidb_mem_quota_query=8000;") @@ -1377,6 +1395,18 @@ func TestIssue30211(t *testing.T) { require.True(t, strings.Contains(err, memory.PanicMemoryExceedWarnMsg+memory.WarnMsgSuffixForSingleQuery)) err = tk.QueryToErr("select /*+ inl_hash_join(t1) */ * from t1 join t2 on t1.a = t2.a;").Error() require.True(t, strings.Contains(err, memory.PanicMemoryExceedWarnMsg+memory.WarnMsgSuffixForSingleQuery)) + + func() { + fpName2 := "github.com/pingcap/tidb/pkg/executor/TestIssue49692" + require.NoError(t, failpoint.Enable(fpName2, `return`)) + defer func() { + require.NoError(t, failpoint.Disable(fpName2)) + }() + err := tk.QueryToErr("select /*+ inl_join(t1) */ * from t1 join t2 on t1.a = t2.a order by t2.a;").Error() + require.True(t, strings.Contains(err, memory.PanicMemoryExceedWarnMsg+memory.WarnMsgSuffixForSingleQuery)) + err = tk.QueryToErr("select /*+ inl_hash_join(t1) */ * from t1 join t2 on t1.a = t2.a order by t2.a;").Error() + require.True(t, strings.Contains(err, memory.PanicMemoryExceedWarnMsg+memory.WarnMsgSuffixForSingleQuery)) + }() } func TestIssue37932(t *testing.T) {