Skip to content

Commit

Permalink
executor: reuse iterator4Slice and Row/RowPtrs slice in HashJoin. (#3…
Browse files Browse the repository at this point in the history
  • Loading branch information
wshwsh12 authored Jun 7, 2022
1 parent 30f7037 commit 8a4d457
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 12 deletions.
15 changes: 8 additions & 7 deletions executor/hash_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,23 +112,24 @@ func (c *hashRowContainer) ShallowCopy() *hashRowContainer {
// GetMatchedRowsAndPtrs get matched rows and Ptrs from probeRow. It can be called
// in multiple goroutines while each goroutine should keep its own
// h and buf.
func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk.Row, hCtx *hashContext) (matched []chunk.Row, matchedPtrs []chunk.RowPtr, err error) {
func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk.Row, hCtx *hashContext, matched []chunk.Row, matchedPtrs []chunk.RowPtr) ([]chunk.Row, []chunk.RowPtr, error) {
var err error
innerPtrs := c.hashTable.Get(probeKey)
if len(innerPtrs) == 0 {
return
return nil, nil, err
}
matched = make([]chunk.Row, 0, len(innerPtrs))
matched = matched[:0]
var matchedRow chunk.Row
matchedPtrs = make([]chunk.RowPtr, 0, len(innerPtrs))
matchedPtrs = matchedPtrs[:0]
for _, ptr := range innerPtrs {
matchedRow, err = c.rowContainer.GetRow(ptr)
if err != nil {
return
return nil, nil, err
}
var ok bool
ok, err = c.matchJoinKey(matchedRow, probeRow, hCtx)
if err != nil {
return
return nil, nil, err
}
if !ok {
atomic.AddInt64(&c.stat.probeCollision, 1)
Expand All @@ -137,7 +138,7 @@ func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk
matched = append(matched, matchedRow)
matchedPtrs = append(matchedPtrs, ptr)
}
return
return matched, matchedPtrs, err
}

// matchJoinKey checks if join keys of buildRow and probeRow are logically equal.
Expand Down
2 changes: 1 addition & 1 deletion executor/hash_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func testHashRowContainer(t *testing.T, hashFunc func() hash.Hash64, spill bool)
}
probeCtx.hasNull = make([]bool, 1)
probeCtx.hashVals = append(hCtx.hashVals, hashFunc())
matched, _, err := rowContainer.GetMatchedRowsAndPtrs(hCtx.hashVals[1].Sum64(), probeRow, probeCtx)
matched, _, err := rowContainer.GetMatchedRowsAndPtrs(hCtx.hashVals[1].Sum64(), probeRow, probeCtx, nil, nil)
require.NoError(t, err)
require.Equal(t, 2, len(matched))
require.Equal(t, chk0.GetRow(1).GetDatumRow(colTypes), matched[0].GetDatumRow(colTypes))
Expand Down
20 changes: 17 additions & 3 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ type HashJoinExec struct {
finished atomic.Value

stats *hashJoinRuntimeStats

// We pre-alloc and reuse the Rows and RowPtrs for each probe goroutine, to avoid allocation frequently
buildSideRows [][]chunk.Row
buildSideRowPtrs [][]chunk.RowPtr
}

// probeChkResource stores the result of the join probe side fetch worker,
Expand Down Expand Up @@ -148,7 +152,8 @@ func (e *HashJoinExec) Close() error {
terror.Call(e.rowContainer.Close)
}
e.outerMatchedStatus = e.outerMatchedStatus[:0]

e.buildSideRows = nil
e.buildSideRowPtrs = nil
if e.stats != nil && e.rowContainer != nil {
e.stats.hashStat = *e.rowContainer.stat
}
Expand Down Expand Up @@ -328,6 +333,9 @@ func (e *HashJoinExec) initializeForProbe() {
// e.joinResultCh is for transmitting the join result chunks to the main
// thread.
e.joinResultCh = make(chan *hashjoinWorkerResult, e.concurrency+1)

e.buildSideRows = make([][]chunk.Row, e.concurrency)
e.buildSideRowPtrs = make([][]chunk.RowPtr, e.concurrency)
}

func (e *HashJoinExec) fetchAndProbeHashTable(ctx context.Context) {
Expand Down Expand Up @@ -487,7 +495,9 @@ func (e *HashJoinExec) runJoinWorker(workerID uint, probeKeyColIdx []int) {
}

func (e *HashJoinExec) joinMatchedProbeSideRow2ChunkForOuterHashJoin(workerID uint, probeKey uint64, probeSideRow chunk.Row, hCtx *hashContext, rowContainer *hashRowContainer, joinResult *hashjoinWorkerResult) (bool, *hashjoinWorkerResult) {
buildSideRows, rowsPtrs, err := rowContainer.GetMatchedRowsAndPtrs(probeKey, probeSideRow, hCtx)
var err error
e.buildSideRows[workerID], e.buildSideRowPtrs[workerID], err = rowContainer.GetMatchedRowsAndPtrs(probeKey, probeSideRow, hCtx, e.buildSideRows[workerID], e.buildSideRowPtrs[workerID])
buildSideRows, rowsPtrs := e.buildSideRows[workerID], e.buildSideRowPtrs[workerID]
if err != nil {
joinResult.err = err
return false, joinResult
Expand All @@ -497,6 +507,7 @@ func (e *HashJoinExec) joinMatchedProbeSideRow2ChunkForOuterHashJoin(workerID ui
}

iter := chunk.NewIterator4Slice(buildSideRows)
defer chunk.FreeIterator(iter)
var outerMatchStatus []outerRowStatusFlag
rowIdx, ok := 0, false
for iter.Begin(); iter.Current() != iter.End(); {
Expand All @@ -523,7 +534,9 @@ func (e *HashJoinExec) joinMatchedProbeSideRow2ChunkForOuterHashJoin(workerID ui
}
func (e *HashJoinExec) joinMatchedProbeSideRow2Chunk(workerID uint, probeKey uint64, probeSideRow chunk.Row, hCtx *hashContext,
rowContainer *hashRowContainer, joinResult *hashjoinWorkerResult) (bool, *hashjoinWorkerResult) {
buildSideRows, _, err := rowContainer.GetMatchedRowsAndPtrs(probeKey, probeSideRow, hCtx)
var err error
e.buildSideRows[workerID], e.buildSideRowPtrs[workerID], err = rowContainer.GetMatchedRowsAndPtrs(probeKey, probeSideRow, hCtx, e.buildSideRows[workerID], e.buildSideRowPtrs[workerID])
buildSideRows := e.buildSideRows[workerID]
if err != nil {
joinResult.err = err
return false, joinResult
Expand All @@ -533,6 +546,7 @@ func (e *HashJoinExec) joinMatchedProbeSideRow2Chunk(workerID uint, probeKey uin
return true, joinResult
}
iter := chunk.NewIterator4Slice(buildSideRows)
defer chunk.FreeIterator(iter)
hasMatch, hasNull, ok := false, false, false
for iter.Begin(); iter.Current() != iter.End(); {
matched, isNull, err := e.joiners[workerID].tryToMatchInners(probeSideRow, iter, joinResult.chk)
Expand Down
23 changes: 22 additions & 1 deletion util/chunk/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package chunk

import "sync"

var (
_ Iterator = (*Iterator4Chunk)(nil)
_ Iterator = (*iterator4RowPtr)(nil)
Expand All @@ -23,6 +25,22 @@ var (
_ Iterator = (*multiIterator)(nil)
)

var (
iterator4SlicePool = &sync.Pool{New: func() any { return new(iterator4Slice) }}
)

// FreeIterator try to free and reuse the iterator.
func FreeIterator(it any) {
switch it := it.(type) {
case *iterator4Slice:
it.rows = nil
it.cursor = 0
iterator4SlicePool.Put(it)
default:
// Do Nothing.
}
}

// Iterator is used to iterate a number of rows.
//
// for row := it.Begin(); row != it.End(); row = it.Next() {
Expand Down Expand Up @@ -53,7 +71,10 @@ type Iterator interface {

// NewIterator4Slice returns a Iterator for Row slice.
func NewIterator4Slice(rows []Row) Iterator {
return &iterator4Slice{rows: rows}
it := iterator4SlicePool.Get().(*iterator4Slice)
it.rows = rows
it.cursor = 0
return it
}

type iterator4Slice struct {
Expand Down

0 comments on commit 8a4d457

Please sign in to comment.