Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#39023
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
xhebox authored and ti-chi-bot committed Nov 10, 2022
1 parent 113e2cc commit 464a011
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 26 deletions.
53 changes: 28 additions & 25 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"runtime/trace"
"strconv"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -70,6 +69,8 @@ type HashJoinExec struct {

// closeCh add a lock for closing executor.
closeCh chan struct{}
worker util.WaitGroupWrapper
waiter util.WaitGroupWrapper
joinType plannercore.JoinType
requiredRows int64

Expand All @@ -92,9 +93,7 @@ type HashJoinExec struct {
prepared bool
isOuterJoin bool

// joinWorkerWaitGroup is for sync multiple join workers.
joinWorkerWaitGroup sync.WaitGroup
finished atomic.Value
finished atomic.Bool

stats *hashJoinRuntimeStats

Expand Down Expand Up @@ -152,6 +151,7 @@ func (e *HashJoinExec) Close() error {
e.probeChkResourceCh = nil
e.joinChkResourceCh = nil
terror.Call(e.rowContainer.Close)
e.waiter.Wait()
}
e.outerMatchedStatus = e.outerMatchedStatus[:0]
e.buildSideRows = nil
Expand All @@ -177,9 +177,10 @@ func (e *HashJoinExec) Open(ctx context.Context) error {
e.diskTracker = disk.NewTracker(e.id, -1)
e.diskTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.DiskTracker)

e.worker = util.WaitGroupWrapper{}
e.waiter = util.WaitGroupWrapper{}
e.closeCh = make(chan struct{})
e.finished.Store(false)
e.joinWorkerWaitGroup = sync.WaitGroup{}

if e.probeTypes == nil {
e.probeTypes = retTypes(e.probeSideExec)
Expand All @@ -201,7 +202,7 @@ func (e *HashJoinExec) Open(ctx context.Context) error {
func (e *HashJoinExec) fetchProbeSideChunks(ctx context.Context) {
hasWaitedForBuild := false
for {
if e.finished.Load().(bool) {
if e.finished.Load() {
return
}

Expand Down Expand Up @@ -279,24 +280,24 @@ func (e *HashJoinExec) wait4BuildSide() (emptyBuild bool, err error) {

// fetchBuildSideRows fetches all rows from build side executor, and append them
// to e.buildSideResult.
func (e *HashJoinExec) fetchBuildSideRows(ctx context.Context, chkCh chan<- *chunk.Chunk, doneCh <-chan struct{}) {
func (e *HashJoinExec) fetchBuildSideRows(ctx context.Context, chkCh chan<- *chunk.Chunk, errCh chan<- error, doneCh <-chan struct{}) {
defer close(chkCh)
var err error
failpoint.Inject("issue30289", func(val failpoint.Value) {
if val.(bool) {
err = errors.Errorf("issue30289 build return error")
e.buildFinished <- errors.Trace(err)
errCh <- errors.Trace(err)
return
}
})
for {
if e.finished.Load().(bool) {
if e.finished.Load() {
return
}
chk := chunk.NewChunkWithCapacity(e.buildSideExec.base().retFieldTypes, e.ctx.GetSessionVars().MaxChunkSize)
err = Next(ctx, e.buildSideExec, chk)
if err != nil {
e.buildFinished <- errors.Trace(err)
errCh <- errors.Trace(err)
return
}
failpoint.Inject("errorFetchBuildSideRowsMockOOMPanic", nil)
Expand Down Expand Up @@ -353,8 +354,7 @@ func (e *HashJoinExec) initializeForProbe() {

func (e *HashJoinExec) fetchAndProbeHashTable(ctx context.Context) {
e.initializeForProbe()
e.joinWorkerWaitGroup.Add(1)
go util.WithRecovery(func() {
e.worker.RunWithRecover(func() {
defer trace.StartRegion(ctx, "HashJoinProbeSideFetcher").End()
e.fetchProbeSideChunks(ctx)
}, e.handleProbeSideFetcherPanic)
Expand All @@ -369,14 +369,13 @@ func (e *HashJoinExec) fetchAndProbeHashTable(ctx context.Context) {
}

for i := uint(0); i < e.concurrency; i++ {
e.joinWorkerWaitGroup.Add(1)
workID := i
go util.WithRecovery(func() {
e.worker.RunWithRecover(func() {
defer trace.StartRegion(ctx, "HashJoinWorker").End()
e.runJoinWorker(workID, probeKeyColIdx, probeNAKeColIdx)
}, e.handleJoinWorkerPanic)
}
go util.WithRecovery(e.waitJoinWorkersAndCloseResultChan, nil)
e.waiter.RunWithRecover(e.waitJoinWorkersAndCloseResultChan, nil)
}

func (e *HashJoinExec) handleProbeSideFetcherPanic(r interface{}) {
Expand All @@ -386,14 +385,12 @@ func (e *HashJoinExec) handleProbeSideFetcherPanic(r interface{}) {
if r != nil {
e.joinResultCh <- &hashjoinWorkerResult{err: errors.Errorf("%v", r)}
}
e.joinWorkerWaitGroup.Done()
}

func (e *HashJoinExec) handleJoinWorkerPanic(r interface{}) {
if r != nil {
e.joinResultCh <- &hashjoinWorkerResult{err: errors.Errorf("%v", r)}
}
e.joinWorkerWaitGroup.Done()
}

// Concurrently handling unmatched rows from the hash table
Expand Down Expand Up @@ -433,15 +430,14 @@ func (e *HashJoinExec) handleUnmatchedRowsFromHashTable(workerID uint) {
}

func (e *HashJoinExec) waitJoinWorkersAndCloseResultChan() {
e.joinWorkerWaitGroup.Wait()
e.worker.Wait()
if e.useOuterToBuild {
// Concurrently handling unmatched rows from the hash table at the tail
for i := uint(0); i < e.concurrency; i++ {
var workerID = i
e.joinWorkerWaitGroup.Add(1)
go util.WithRecovery(func() { e.handleUnmatchedRowsFromHashTable(workerID) }, e.handleJoinWorkerPanic)
e.worker.RunWithRecover(func() { e.handleUnmatchedRowsFromHashTable(workerID) }, e.handleJoinWorkerPanic)
}
e.joinWorkerWaitGroup.Wait()
e.worker.Wait()
}
close(e.joinResultCh)
}
Expand Down Expand Up @@ -477,7 +473,7 @@ func (e *HashJoinExec) runJoinWorker(workerID uint, probeKeyColIdx, probeNAKeyCo
naKeyColIdx: probeNAKeyColIdx,
}
for ok := true; ok; {
if e.finished.Load().(bool) {
if e.finished.Load() {
break
}
select {
Expand Down Expand Up @@ -1114,7 +1110,14 @@ func (e *HashJoinExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
e.rowContainerForProbe[i] = e.rowContainer.ShallowCopy()
}
}
<<<<<<< HEAD
go util.WithRecovery(func() {
=======
for i := uint(0); i < e.concurrency; i++ {
e.rowIters = append(e.rowIters, chunk.NewIterator4Slice([]chunk.Row{}).(*chunk.Iterator4Slice))
}
e.worker.RunWithRecover(func() {
>>>>>>> 208478bf20 (executor: fix hashjoin goleak (#39023))
defer trace.StartRegion(ctx, "HashJoinHashTableBuilder").End()
e.fetchAndBuildHashTable(ctx)
}, e.handleFetchAndBuildHashTablePanic)
Expand Down Expand Up @@ -1157,10 +1160,10 @@ func (e *HashJoinExec) fetchAndBuildHashTable(ctx context.Context) {
buildSideResultCh := make(chan *chunk.Chunk, 1)
doneCh := make(chan struct{})
fetchBuildSideRowsOk := make(chan error, 1)
go util.WithRecovery(
e.worker.RunWithRecover(
func() {
defer trace.StartRegion(ctx, "HashJoinBuildSideFetcher").End()
e.fetchBuildSideRows(ctx, buildSideResultCh, doneCh)
e.fetchBuildSideRows(ctx, buildSideResultCh, fetchBuildSideRowsOk, doneCh)
},
func(r interface{}) {
if r != nil {
Expand Down Expand Up @@ -1207,7 +1210,7 @@ func (e *HashJoinExec) buildHashTableForList(buildSideResultCh <-chan *chunk.Chu
e.ctx.GetSessionVars().StmtCtx.MemTracker.FallbackOldAndSetNewAction(actionSpill)
}
for chk := range buildSideResultCh {
if e.finished.Load().(bool) {
if e.finished.Load() {
return nil
}
if !e.useOuterToBuild {
Expand Down
2 changes: 1 addition & 1 deletion util/wait_group_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (w *WaitGroupWrapper) RunWithRecover(exec func(), recoverFn func(r interfac
go func() {
defer func() {
r := recover()
if r != nil && recoverFn != nil {
if recoverFn != nil {
recoverFn(r)
}
w.Done()
Expand Down

0 comments on commit 464a011

Please sign in to comment.