Skip to content

Commit

Permalink
executor: fix data race in IndexNestedLoopHashJoin (pingcap#55824) (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Sep 10, 2024
1 parent 8fa6619 commit 5665343
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 4 deletions.
43 changes: 39 additions & 4 deletions pkg/executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"hash"
"hash/fnv"
"runtime"
"runtime/trace"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -76,7 +77,11 @@ type IndexNestedLoopHashJoin struct {
prepared bool
// panicErr records the error generated by panic recover. This is introduced to
// return the actual error message instead of `context cancelled` to the client.
panicErr error
panicErr struct {
sync.Mutex
atomic.Bool
error
}
ctxWithCancel context.Context
}

Expand Down Expand Up @@ -192,13 +197,29 @@ func (e *IndexNestedLoopHashJoin) finishJoinWorkers(r interface{}) {
if r != nil {
e.IndexLookUpJoin.finished.Store(true)
err := fmt.Errorf("%v", r)

if !e.panicErr.Load() {
e.panicErr.Lock()
if !e.panicErr.Load() {
e.panicErr.error = err
e.panicErr.Store(true)
}
e.panicErr.Unlock()
}

failpoint.Inject("TestIssue49692", func() {
failpoint.Goto("TestIssue49692End")
})

if !e.keepOuterOrder {
e.resultCh <- &indexHashJoinResult{err: err}
} else {
task := &indexHashJoinTask{err: err}
e.taskCh <- task
}
e.panicErr = err

failpoint.Label("TestIssue49692End")

if e.cancelFunc != nil {
e.cancelFunc()
}
Expand Down Expand Up @@ -232,7 +253,7 @@ func (e *IndexNestedLoopHashJoin) Next(ctx context.Context, req *chunk.Chunk) er
func (e *IndexNestedLoopHashJoin) runInOrder(ctx context.Context, req *chunk.Chunk) error {
for {
if e.isDryUpTasks(ctx) {
return e.panicErr
return e.getPanicErr()
}
if e.curTask.err != nil {
return e.curTask.err
Expand Down Expand Up @@ -274,6 +295,13 @@ func (e *IndexNestedLoopHashJoin) isDryUpTasks(ctx context.Context) bool {
return false
}

func (e *IndexNestedLoopHashJoin) getPanicErr() error {
if e.panicErr.Load() {
return e.panicErr.error
}
return nil
}

func (e *IndexNestedLoopHashJoin) getResultFromChannel(ctx context.Context, resultCh <-chan *indexHashJoinResult) (*indexHashJoinResult, error) {
var (
result *indexHashJoinResult
Expand All @@ -288,7 +316,14 @@ func (e *IndexNestedLoopHashJoin) getResultFromChannel(ctx context.Context, resu
return nil, result.err
}
case <-ctx.Done():
err := e.panicErr
failpoint.Inject("TestIssue49692", func() {
for !e.panicErr.Load() {
runtime.Gosched()
}
})

err := e.getPanicErr()

if err == nil {
err = ctx.Err()
}
Expand Down
30 changes: 30 additions & 0 deletions pkg/executor/test/jointest/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1367,6 +1367,24 @@ func TestIssue30211(t *testing.T) {
err = tk.QueryToErr("select /*+ inl_hash_join(t1) */ * from t1 join t2 on t1.a = t2.a;")
require.EqualError(t, err, "failpoint panic: TestIssue30211 IndexJoinPanic")
}()

func() {
fpName := "github.com/pingcap/tidb/pkg/executor/TestIssue30211"
require.NoError(t, failpoint.Enable(fpName, `panic("TestIssue30211 IndexJoinPanic")`))
fpName2 := "github.com/pingcap/tidb/pkg/executor/TestIssue49692"
require.NoError(t, failpoint.Enable(fpName2, `return`))

defer func() {
require.NoError(t, failpoint.Disable(fpName))
require.NoError(t, failpoint.Disable(fpName2))
}()
err := tk.QueryToErr("select /*+ inl_join(t1) */ * from t1 join t2 on t1.a = t2.a;")
require.EqualError(t, err, "failpoint panic: TestIssue30211 IndexJoinPanic")

err = tk.QueryToErr("select /*+ inl_hash_join(t1) */ * from t1 join t2 on t1.a = t2.a;")
require.EqualError(t, err, "failpoint panic: TestIssue30211 IndexJoinPanic")
}()

tk.MustExec("insert into t1 values(1),(2);")
tk.MustExec("insert into t2 values(1),(1),(2),(2);")
tk.MustExec("set @@tidb_mem_quota_query=8000;")
Expand All @@ -1377,6 +1395,18 @@ func TestIssue30211(t *testing.T) {
require.True(t, strings.Contains(err, memory.PanicMemoryExceedWarnMsg+memory.WarnMsgSuffixForSingleQuery))
err = tk.QueryToErr("select /*+ inl_hash_join(t1) */ * from t1 join t2 on t1.a = t2.a;").Error()
require.True(t, strings.Contains(err, memory.PanicMemoryExceedWarnMsg+memory.WarnMsgSuffixForSingleQuery))

func() {
fpName2 := "github.com/pingcap/tidb/pkg/executor/TestIssue49692"
require.NoError(t, failpoint.Enable(fpName2, `return`))
defer func() {
require.NoError(t, failpoint.Disable(fpName2))
}()
err := tk.QueryToErr("select /*+ inl_join(t1) */ * from t1 join t2 on t1.a = t2.a order by t2.a;").Error()
require.True(t, strings.Contains(err, memory.PanicMemoryExceedWarnMsg+memory.WarnMsgSuffixForSingleQuery))
err = tk.QueryToErr("select /*+ inl_hash_join(t1) */ * from t1 join t2 on t1.a = t2.a order by t2.a;").Error()
require.True(t, strings.Contains(err, memory.PanicMemoryExceedWarnMsg+memory.WarnMsgSuffixForSingleQuery))
}()
}

func TestIssue37932(t *testing.T) {
Expand Down

0 comments on commit 5665343

Please sign in to comment.