Skip to content

Commit

Permalink
executor: fix data race in IndexNestedLoopHashJoin (#55824)
Browse files Browse the repository at this point in the history
close #49692
  • Loading branch information
solotzg authored Sep 4, 2024
1 parent e00454a commit 8cdd449
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 4 deletions.
35 changes: 31 additions & 4 deletions pkg/executor/join/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"hash"
"hash/fnv"
"runtime"
"runtime/trace"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -76,7 +77,11 @@ type IndexNestedLoopHashJoin struct {
prepared bool
// panicErr records the error generated by panic recover. This is introduced to
// return the actual error message instead of `context cancelled` to the client.
panicErr error
panicErr struct {
sync.Mutex
atomic.Bool
error
}
ctxWithCancel context.Context
}

Expand Down Expand Up @@ -194,13 +199,22 @@ func (e *IndexNestedLoopHashJoin) finishJoinWorkers(r any) {
if recoverdErr, ok := r.(error); ok {
err = recoverdErr
}

if !e.panicErr.Load() {
e.panicErr.Lock()
if !e.panicErr.Load() {
e.panicErr.error = err
e.panicErr.Store(true)
}
e.panicErr.Unlock()
}

if !e.KeepOuterOrder {
e.resultCh <- &indexHashJoinResult{err: err}
} else {
task := &indexHashJoinTask{err: err}
e.taskCh <- task
}
e.panicErr = err
if e.cancelFunc != nil {
e.cancelFunc()
}
Expand Down Expand Up @@ -234,7 +248,10 @@ func (e *IndexNestedLoopHashJoin) Next(ctx context.Context, req *chunk.Chunk) er
func (e *IndexNestedLoopHashJoin) runInOrder(ctx context.Context, req *chunk.Chunk) error {
for {
if e.isDryUpTasks(ctx) {
return e.panicErr
if e.panicErr.Load() {
return e.panicErr.error
}
return nil
}
if e.curTask.err != nil {
return e.curTask.err
Expand Down Expand Up @@ -290,7 +307,17 @@ func (e *IndexNestedLoopHashJoin) getResultFromChannel(ctx context.Context, resu
return nil, result.err
}
case <-ctx.Done():
err := e.panicErr
failpoint.Inject("TestIssue49692", func() {
for !e.panicErr.Load() {
runtime.Gosched()
}
})

err := error(nil)
if e.panicErr.Load() {
err = e.panicErr.error
}

if err == nil {
err = ctx.Err()
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/executor/test/jointest/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,12 @@ func TestIssue30211(t *testing.T) {
tk.MustExec("drop table if exists t1, t2;")
tk.MustExec("create table t1(a int, index(a));")
tk.MustExec("create table t2(a int, index(a));")
fpName2 := "github.com/pingcap/tidb/pkg/executor/join/TestIssue49692"
require.NoError(t, failpoint.Enable(fpName2, `return`))
defer func() {
require.NoError(t, failpoint.Disable(fpName2))
}()

func() {
fpName := "github.com/pingcap/tidb/pkg/executor/join/TestIssue30211"
require.NoError(t, failpoint.Enable(fpName, `panic("TestIssue30211 IndexJoinPanic")`))
Expand Down

0 comments on commit 8cdd449

Please sign in to comment.