Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#55824
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
solotzg authored and ti-chi-bot committed Sep 6, 2024
1 parent 2f84b4b commit 9d40e67
Show file tree
Hide file tree
Showing 2 changed files with 945 additions and 4 deletions.
42 changes: 38 additions & 4 deletions executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"hash"
"hash/fnv"
"runtime"
"runtime/trace"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -75,7 +76,11 @@ type IndexNestedLoopHashJoin struct {
prepared bool
// panicErr records the error generated by panic recover. This is introduced to
// return the actual error message instead of `context cancelled` to the client.
panicErr error
panicErr struct {
sync.Mutex
atomic.Bool
error
}
ctxWithCancel context.Context
}

Expand Down Expand Up @@ -191,13 +196,29 @@ func (e *IndexNestedLoopHashJoin) finishJoinWorkers(r interface{}) {
if r != nil {
e.IndexLookUpJoin.finished.Store(true)
err := fmt.Errorf("%v", r)
<<<<<<< HEAD:executor/index_lookup_hash_join.go
if !e.keepOuterOrder {
=======
if recoverdErr, ok := r.(error); ok {
err = recoverdErr
}

if !e.panicErr.Load() {
e.panicErr.Lock()
if !e.panicErr.Load() {
e.panicErr.error = err
e.panicErr.Store(true)
}
e.panicErr.Unlock()
}

if !e.KeepOuterOrder {
>>>>>>> 8cdd449c5fa (executor: fix data race in `IndexNestedLoopHashJoin` (#55824)):pkg/executor/join/index_lookup_hash_join.go
e.resultCh <- &indexHashJoinResult{err: err}
} else {
task := &indexHashJoinTask{err: err}
e.taskCh <- task
}
e.panicErr = err
if e.cancelFunc != nil {
e.cancelFunc()
}
Expand Down Expand Up @@ -231,7 +252,10 @@ func (e *IndexNestedLoopHashJoin) Next(ctx context.Context, req *chunk.Chunk) er
func (e *IndexNestedLoopHashJoin) runInOrder(ctx context.Context, req *chunk.Chunk) error {
for {
if e.isDryUpTasks(ctx) {
return e.panicErr
if e.panicErr.Load() {
return e.panicErr.error
}
return nil
}
if e.curTask.err != nil {
return e.curTask.err
Expand Down Expand Up @@ -287,7 +311,17 @@ func (e *IndexNestedLoopHashJoin) getResultFromChannel(ctx context.Context, resu
return nil, result.err
}
case <-ctx.Done():
err := e.panicErr
failpoint.Inject("TestIssue49692", func() {
for !e.panicErr.Load() {
runtime.Gosched()
}
})

err := error(nil)
if e.panicErr.Load() {
err = e.panicErr.error
}

if err == nil {
err = ctx.Err()
}
Expand Down
Loading

0 comments on commit 9d40e67

Please sign in to comment.