Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: fix inl_hash join performance regression because of Iterator4Slice #38741

Merged
merged 8 commits into from
Oct 31, 2022
16 changes: 9 additions & 7 deletions executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type indexHashJoinInnerWorker struct {
wg *sync.WaitGroup
joinKeyBuf []byte
outerRowStatus []outerRowStatusFlag
rowIter *chunk.Iterator4Slice
}

type indexHashJoinResult struct {
Expand Down Expand Up @@ -431,6 +432,7 @@ func (e *IndexNestedLoopHashJoin) newInnerWorker(taskCh chan *indexHashJoinTask,
matchedOuterPtrs: make([]chunk.RowPtr, 0, e.maxChunkSize),
joinKeyBuf: make([]byte, 1),
outerRowStatus: make([]outerRowStatusFlag, 0, e.maxChunkSize),
rowIter: chunk.NewIterator4Slice([]chunk.Row{}).(*chunk.Iterator4Slice),
}
iw.memTracker.AttachTo(e.memTracker)
if len(copiedRanges) != 0 {
Expand Down Expand Up @@ -733,12 +735,11 @@ func (iw *indexHashJoinInnerWorker) joinMatchedInnerRow2Chunk(ctx context.Contex
if len(matchedOuterRows) == 0 {
return true, joinResult
}
var (
ok bool
iter = chunk.NewIterator4Slice(matchedOuterRows)
cursor = 0
)
for iter.Begin(); iter.Current() != iter.End(); {
var ok bool
cursor := 0
iw.rowIter.Reset(matchedOuterRows)
iter := iw.rowIter
for iw.rowIter.Begin(); iter.Current() != iter.End(); {
iw.outerRowStatus, err = iw.joiner.tryToMatchOuters(iter, innerRow, joinResult.chk, iw.outerRowStatus)
if err != nil {
joinResult.err = err
Expand Down Expand Up @@ -821,7 +822,8 @@ func (iw *indexHashJoinInnerWorker) doJoinInOrder(ctx context.Context, task *ind
for _, ptr := range innerRowPtrs {
matchedInnerRows = append(matchedInnerRows, task.innerResult.GetRow(ptr))
}
iter := chunk.NewIterator4Slice(matchedInnerRows)
iw.rowIter.Reset(matchedInnerRows)
iter := iw.rowIter
for iter.Begin(); iter.Current() != iter.End(); {
matched, isNull, err := iw.joiner.tryToMatchInners(outerRow, iter, joinResult.chk)
if err != nil {
Expand Down
7 changes: 5 additions & 2 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type IndexLookUpJoin struct {

task *lookUpJoinTask
joinResult *chunk.Chunk
innerIter chunk.Iterator
innerIter *chunk.Iterator4Slice

joiner joiner
isOuterJoin bool
Expand Down Expand Up @@ -277,7 +277,10 @@ func (e *IndexLookUpJoin) Next(ctx context.Context, req *chunk.Chunk) error {
startTime := time.Now()
if e.innerIter == nil || e.innerIter.Current() == e.innerIter.End() {
e.lookUpMatchedInners(task, task.cursor)
e.innerIter = chunk.NewIterator4Slice(task.matchedInners)
if e.innerIter == nil {
e.innerIter = chunk.NewIterator4Slice(task.matchedInners).(*chunk.Iterator4Slice)
}
e.innerIter.Reset(task.matchedInners)
e.innerIter.Begin()
}

Expand Down
45 changes: 25 additions & 20 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ type HashJoinExec struct {
// for every naaj probe worker, pre-allocate the int slice for store the join column index to check.
needCheckBuildRowPos [][]int
needCheckProbeRowPos [][]int

rowIters []*chunk.Iterator4Slice
}

// probeChkResource stores the result of the join probe side fetch worker,
Expand Down Expand Up @@ -527,8 +529,8 @@ func (e *HashJoinExec) joinMatchedProbeSideRow2ChunkForOuterHashJoin(workerID ui
return true, joinResult
}

iter := chunk.NewIterator4Slice(buildSideRows)
defer chunk.FreeIterator(iter)
iter := e.rowIters[workerID]
iter.Reset(buildSideRows)
var outerMatchStatus []outerRowStatusFlag
rowIdx, ok := 0, false
for iter.Begin(); iter.Current() != iter.End(); {
Expand Down Expand Up @@ -573,8 +575,8 @@ func (e *HashJoinExec) joinNAALOSJMatchProbeSideRow2Chunk(workerID uint, probeKe
return false, joinResult
}
if len(buildSideRows) != 0 {
iter1 := chunk.NewIterator4Slice(buildSideRows)
defer chunk.FreeIterator(iter1)
iter1 := e.rowIters[workerID]
iter1.Reset(buildSideRows)
for iter1.Begin(); iter1.Current() != iter1.End(); {
matched, _, err := e.joiners[workerID].tryToMatchInners(probeSideRow, iter1, joinResult.chk, LeftNotNullRightNotNull)
if err != nil {
Expand Down Expand Up @@ -608,8 +610,8 @@ func (e *HashJoinExec) joinNAALOSJMatchProbeSideRow2Chunk(workerID uint, probeKe
e.joiners[workerID].onMissMatch(false, probeSideRow, joinResult.chk)
return true, joinResult
}
iter2 := chunk.NewIterator4Slice(buildSideRows)
defer chunk.FreeIterator(iter2)
iter2 := e.rowIters[workerID]
iter2.Reset(buildSideRows)
for iter2.Begin(); iter2.Current() != iter2.End(); {
matched, _, err := e.joiners[workerID].tryToMatchInners(probeSideRow, iter2, joinResult.chk, LeftNotNullRightHasNull)
if err != nil {
Expand Down Expand Up @@ -648,8 +650,8 @@ func (e *HashJoinExec) joinNAALOSJMatchProbeSideRow2Chunk(workerID uint, probeKe
return false, joinResult
}
if len(buildSideRows) != 0 {
iter1 := chunk.NewIterator4Slice(buildSideRows)
defer chunk.FreeIterator(iter1)
iter1 := e.rowIters[workerID]
iter1.Reset(buildSideRows)
for iter1.Begin(); iter1.Current() != iter1.End(); {
matched, _, err := e.joiners[workerID].tryToMatchInners(probeSideRow, iter1, joinResult.chk, LeftHasNullRightHasNull)
if err != nil {
Expand Down Expand Up @@ -683,8 +685,8 @@ func (e *HashJoinExec) joinNAALOSJMatchProbeSideRow2Chunk(workerID uint, probeKe
e.joiners[workerID].onMissMatch(false, probeSideRow, joinResult.chk)
return true, joinResult
}
iter2 := chunk.NewIterator4Slice(buildSideRows)
defer chunk.FreeIterator(iter2)
iter2 := e.rowIters[workerID]
iter2.Reset(buildSideRows)
for iter2.Begin(); iter2.Current() != iter2.End(); {
matched, _, err := e.joiners[workerID].tryToMatchInners(probeSideRow, iter2, joinResult.chk, LeftHasNullRightNotNull)
if err != nil {
Expand Down Expand Up @@ -728,8 +730,8 @@ func (e *HashJoinExec) joinNAASJMatchProbeSideRow2Chunk(workerID uint, probeKey
return false, joinResult
}
if len(buildSideRows) != 0 {
iter1 := chunk.NewIterator4Slice(buildSideRows)
defer chunk.FreeIterator(iter1)
iter1 := e.rowIters[workerID]
iter1.Reset(buildSideRows)
for iter1.Begin(); iter1.Current() != iter1.End(); {
matched, _, err := e.joiners[workerID].tryToMatchInners(probeSideRow, iter1, joinResult.chk)
if err != nil {
Expand Down Expand Up @@ -763,8 +765,8 @@ func (e *HashJoinExec) joinNAASJMatchProbeSideRow2Chunk(workerID uint, probeKey
e.joiners[workerID].onMissMatch(false, probeSideRow, joinResult.chk)
return true, joinResult
}
iter2 := chunk.NewIterator4Slice(buildSideRows)
defer chunk.FreeIterator(iter2)
iter2 := e.rowIters[workerID]
iter2.Reset(buildSideRows)
for iter2.Begin(); iter2.Current() != iter2.End(); {
matched, _, err := e.joiners[workerID].tryToMatchInners(probeSideRow, iter2, joinResult.chk)
if err != nil {
Expand Down Expand Up @@ -803,8 +805,8 @@ func (e *HashJoinExec) joinNAASJMatchProbeSideRow2Chunk(workerID uint, probeKey
return false, joinResult
}
if len(buildSideRows) != 0 {
iter1 := chunk.NewIterator4Slice(buildSideRows)
defer chunk.FreeIterator(iter1)
iter1 := e.rowIters[workerID]
iter1.Reset(buildSideRows)
for iter1.Begin(); iter1.Current() != iter1.End(); {
matched, _, err := e.joiners[workerID].tryToMatchInners(probeSideRow, iter1, joinResult.chk)
if err != nil {
Expand Down Expand Up @@ -838,8 +840,8 @@ func (e *HashJoinExec) joinNAASJMatchProbeSideRow2Chunk(workerID uint, probeKey
e.joiners[workerID].onMissMatch(false, probeSideRow, joinResult.chk)
return true, joinResult
}
iter2 := chunk.NewIterator4Slice(buildSideRows)
defer chunk.FreeIterator(iter2)
iter2 := e.rowIters[workerID]
iter2.Reset(buildSideRows)
for iter2.Begin(); iter2.Current() != iter2.End(); {
matched, _, err := e.joiners[workerID].tryToMatchInners(probeSideRow, iter2, joinResult.chk)
if err != nil {
Expand Down Expand Up @@ -912,8 +914,8 @@ func (e *HashJoinExec) joinMatchedProbeSideRow2Chunk(workerID uint, probeKey uin
e.joiners[workerID].onMissMatch(false, probeSideRow, joinResult.chk)
return true, joinResult
}
iter := chunk.NewIterator4Slice(buildSideRows)
defer chunk.FreeIterator(iter)
iter := e.rowIters[workerID]
iter.Reset(buildSideRows)
hasMatch, hasNull, ok := false, false, false
for iter.Begin(); iter.Current() != iter.End(); {
matched, isNull, err := e.joiners[workerID].tryToMatchInners(probeSideRow, iter, joinResult.chk)
Expand Down Expand Up @@ -1116,6 +1118,9 @@ func (e *HashJoinExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
e.rowContainerForProbe[i] = e.rowContainer.ShallowCopy()
}
}
for i := uint(0); i < e.concurrency; i++ {
e.rowIters = append(e.rowIters, chunk.NewIterator4Slice([]chunk.Row{}).(*chunk.Iterator4Slice))
}
go util.WithRecovery(func() {
defer trace.StartRegion(ctx, "HashJoinHashTableBuilder").End()
e.fetchAndBuildHashTable(ctx)
Expand Down
48 changes: 17 additions & 31 deletions util/chunk/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,15 @@

package chunk

import "sync"

var (
_ Iterator = (*Iterator4Chunk)(nil)
_ Iterator = (*iterator4RowPtr)(nil)
_ Iterator = (*iterator4List)(nil)
_ Iterator = (*iterator4Slice)(nil)
_ Iterator = (*Iterator4Slice)(nil)
_ Iterator = (*iterator4RowContainer)(nil)
_ Iterator = (*multiIterator)(nil)
)

var (
iterator4SlicePool = &sync.Pool{New: func() any { return new(iterator4Slice) }}
)

// FreeIterator try to free and reuse the iterator.
func FreeIterator(it any) {
switch it := it.(type) {
case *iterator4Slice:
it.rows = nil
it.cursor = 0
iterator4SlicePool.Put(it)
default:
// Do Nothing.
}
}

// Iterator is used to iterate a number of rows.
//
// for row := it.Begin(); row != it.End(); row = it.Next() {
Expand Down Expand Up @@ -71,19 +53,17 @@ type Iterator interface {

// NewIterator4Slice returns a Iterator for Row slice.
func NewIterator4Slice(rows []Row) Iterator {
it := iterator4SlicePool.Get().(*iterator4Slice)
it.rows = rows
it.cursor = 0
return it
return &Iterator4Slice{rows: rows}
}

type iterator4Slice struct {
// Iterator4Slice is used to iterate rows inside a slice.
type Iterator4Slice struct {
rows []Row
cursor int
}

// Begin implements the Iterator interface.
func (it *iterator4Slice) Begin() Row {
func (it *Iterator4Slice) Begin() Row {
if it.Len() == 0 {
return it.End()
}
Expand All @@ -92,7 +72,7 @@ func (it *iterator4Slice) Begin() Row {
}

// Next implements the Iterator interface.
func (it *iterator4Slice) Next() Row {
func (it *Iterator4Slice) Next() Row {
if l := it.Len(); it.cursor >= l {
it.cursor = l + 1
return it.End()
Expand All @@ -103,30 +83,36 @@ func (it *iterator4Slice) Next() Row {
}

// Current implements the Iterator interface.
func (it *iterator4Slice) Current() Row {
func (it *Iterator4Slice) Current() Row {
if it.cursor == 0 || it.cursor > it.Len() {
return it.End()
}
return it.rows[it.cursor-1]
}

// End implements the Iterator interface.
func (*iterator4Slice) End() Row {
func (*Iterator4Slice) End() Row {
return Row{}
}

// ReachEnd implements the Iterator interface.
func (it *iterator4Slice) ReachEnd() {
func (it *Iterator4Slice) ReachEnd() {
it.cursor = it.Len() + 1
}

// Len implements the Iterator interface.
func (it *iterator4Slice) Len() int {
func (it *Iterator4Slice) Len() int {
return len(it.rows)
}

// Reset iterator.rows and cursor.
func (it *Iterator4Slice) Reset(rows []Row) {
it.rows = rows
it.cursor = 0
}

// Error returns none-nil error if anything wrong happens during the iteration.
func (*iterator4Slice) Error() error {
func (*Iterator4Slice) Error() error {
return nil
}

Expand Down