From 8a4d45706872141fe37ba5ea35d6b1833ffb4cad Mon Sep 17 00:00:00 2001 From: Shenghui Wu <793703860@qq.com> Date: Tue, 7 Jun 2022 16:42:29 +0800 Subject: [PATCH] executor: reuse iterator4Slice and Row/RowPtrs slice in HashJoin. (#34878) ref pingcap/tidb#33877 --- executor/hash_table.go | 15 ++++++++------- executor/hash_table_test.go | 2 +- executor/join.go | 20 +++++++++++++++++--- util/chunk/iterator.go | 23 ++++++++++++++++++++++- 4 files changed, 48 insertions(+), 12 deletions(-) diff --git a/executor/hash_table.go b/executor/hash_table.go index adf6f65832770..cd108fabb5031 100644 --- a/executor/hash_table.go +++ b/executor/hash_table.go @@ -112,23 +112,24 @@ func (c *hashRowContainer) ShallowCopy() *hashRowContainer { // GetMatchedRowsAndPtrs get matched rows and Ptrs from probeRow. It can be called // in multiple goroutines while each goroutine should keep its own // h and buf. -func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk.Row, hCtx *hashContext) (matched []chunk.Row, matchedPtrs []chunk.RowPtr, err error) { +func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk.Row, hCtx *hashContext, matched []chunk.Row, matchedPtrs []chunk.RowPtr) ([]chunk.Row, []chunk.RowPtr, error) { + var err error innerPtrs := c.hashTable.Get(probeKey) if len(innerPtrs) == 0 { - return + return nil, nil, err } - matched = make([]chunk.Row, 0, len(innerPtrs)) + matched = matched[:0] var matchedRow chunk.Row - matchedPtrs = make([]chunk.RowPtr, 0, len(innerPtrs)) + matchedPtrs = matchedPtrs[:0] for _, ptr := range innerPtrs { matchedRow, err = c.rowContainer.GetRow(ptr) if err != nil { - return + return nil, nil, err } var ok bool ok, err = c.matchJoinKey(matchedRow, probeRow, hCtx) if err != nil { - return + return nil, nil, err } if !ok { atomic.AddInt64(&c.stat.probeCollision, 1) @@ -137,7 +138,7 @@ func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk matched = append(matched, matchedRow) matchedPtrs = append(matchedPtrs, ptr) } - return + return matched, matchedPtrs, err } // matchJoinKey checks if join keys of buildRow and probeRow are logically equal. diff --git a/executor/hash_table_test.go b/executor/hash_table_test.go index e365f0165b84a..f5e70291efee3 100644 --- a/executor/hash_table_test.go +++ b/executor/hash_table_test.go @@ -158,7 +158,7 @@ func testHashRowContainer(t *testing.T, hashFunc func() hash.Hash64, spill bool) } probeCtx.hasNull = make([]bool, 1) probeCtx.hashVals = append(hCtx.hashVals, hashFunc()) - matched, _, err := rowContainer.GetMatchedRowsAndPtrs(hCtx.hashVals[1].Sum64(), probeRow, probeCtx) + matched, _, err := rowContainer.GetMatchedRowsAndPtrs(hCtx.hashVals[1].Sum64(), probeRow, probeCtx, nil, nil) require.NoError(t, err) require.Equal(t, 2, len(matched)) require.Equal(t, chk0.GetRow(1).GetDatumRow(colTypes), matched[0].GetDatumRow(colTypes)) diff --git a/executor/join.go b/executor/join.go index 702a5cf23632c..b2076d459d82c 100644 --- a/executor/join.go +++ b/executor/join.go @@ -94,6 +94,10 @@ type HashJoinExec struct { finished atomic.Value stats *hashJoinRuntimeStats + + // We pre-alloc and reuse the Rows and RowPtrs for each probe goroutine, to avoid allocation frequently + buildSideRows [][]chunk.Row + buildSideRowPtrs [][]chunk.RowPtr } // probeChkResource stores the result of the join probe side fetch worker, @@ -148,7 +152,8 @@ func (e *HashJoinExec) Close() error { terror.Call(e.rowContainer.Close) } e.outerMatchedStatus = e.outerMatchedStatus[:0] - + e.buildSideRows = nil + e.buildSideRowPtrs = nil if e.stats != nil && e.rowContainer != nil { e.stats.hashStat = *e.rowContainer.stat } @@ -328,6 +333,9 @@ func (e *HashJoinExec) initializeForProbe() { // e.joinResultCh is for transmitting the join result chunks to the main // thread. e.joinResultCh = make(chan *hashjoinWorkerResult, e.concurrency+1) + + e.buildSideRows = make([][]chunk.Row, e.concurrency) + e.buildSideRowPtrs = make([][]chunk.RowPtr, e.concurrency) } func (e *HashJoinExec) fetchAndProbeHashTable(ctx context.Context) { @@ -487,7 +495,9 @@ func (e *HashJoinExec) runJoinWorker(workerID uint, probeKeyColIdx []int) { } func (e *HashJoinExec) joinMatchedProbeSideRow2ChunkForOuterHashJoin(workerID uint, probeKey uint64, probeSideRow chunk.Row, hCtx *hashContext, rowContainer *hashRowContainer, joinResult *hashjoinWorkerResult) (bool, *hashjoinWorkerResult) { - buildSideRows, rowsPtrs, err := rowContainer.GetMatchedRowsAndPtrs(probeKey, probeSideRow, hCtx) + var err error + e.buildSideRows[workerID], e.buildSideRowPtrs[workerID], err = rowContainer.GetMatchedRowsAndPtrs(probeKey, probeSideRow, hCtx, e.buildSideRows[workerID], e.buildSideRowPtrs[workerID]) + buildSideRows, rowsPtrs := e.buildSideRows[workerID], e.buildSideRowPtrs[workerID] if err != nil { joinResult.err = err return false, joinResult @@ -497,6 +507,7 @@ func (e *HashJoinExec) joinMatchedProbeSideRow2ChunkForOuterHashJoin(workerID ui } iter := chunk.NewIterator4Slice(buildSideRows) + defer chunk.FreeIterator(iter) var outerMatchStatus []outerRowStatusFlag rowIdx, ok := 0, false for iter.Begin(); iter.Current() != iter.End(); { @@ -523,7 +534,9 @@ func (e *HashJoinExec) joinMatchedProbeSideRow2ChunkForOuterHashJoin(workerID ui } func (e *HashJoinExec) joinMatchedProbeSideRow2Chunk(workerID uint, probeKey uint64, probeSideRow chunk.Row, hCtx *hashContext, rowContainer *hashRowContainer, joinResult *hashjoinWorkerResult) (bool, *hashjoinWorkerResult) { - buildSideRows, _, err := rowContainer.GetMatchedRowsAndPtrs(probeKey, probeSideRow, hCtx) + var err error + e.buildSideRows[workerID], e.buildSideRowPtrs[workerID], err = rowContainer.GetMatchedRowsAndPtrs(probeKey, probeSideRow, hCtx, e.buildSideRows[workerID], e.buildSideRowPtrs[workerID]) + buildSideRows := e.buildSideRows[workerID] if err != nil { joinResult.err = err return false, joinResult @@ -533,6 +546,7 @@ func (e *HashJoinExec) joinMatchedProbeSideRow2Chunk(workerID uint, probeKey uin return true, joinResult } iter := chunk.NewIterator4Slice(buildSideRows) + defer chunk.FreeIterator(iter) hasMatch, hasNull, ok := false, false, false for iter.Begin(); iter.Current() != iter.End(); { matched, isNull, err := e.joiners[workerID].tryToMatchInners(probeSideRow, iter, joinResult.chk) diff --git a/util/chunk/iterator.go b/util/chunk/iterator.go index ed9905b916a93..e62ef08212349 100644 --- a/util/chunk/iterator.go +++ b/util/chunk/iterator.go @@ -14,6 +14,8 @@ package chunk +import "sync" + var ( _ Iterator = (*Iterator4Chunk)(nil) _ Iterator = (*iterator4RowPtr)(nil) @@ -23,6 +25,22 @@ var ( _ Iterator = (*multiIterator)(nil) ) +var ( + iterator4SlicePool = &sync.Pool{New: func() any { return new(iterator4Slice) }} +) + +// FreeIterator try to free and reuse the iterator. +func FreeIterator(it any) { + switch it := it.(type) { + case *iterator4Slice: + it.rows = nil + it.cursor = 0 + iterator4SlicePool.Put(it) + default: + // Do Nothing. + } +} + // Iterator is used to iterate a number of rows. // // for row := it.Begin(); row != it.End(); row = it.Next() { @@ -53,7 +71,10 @@ type Iterator interface { // NewIterator4Slice returns a Iterator for Row slice. func NewIterator4Slice(rows []Row) Iterator { - return &iterator4Slice{rows: rows} + it := iterator4SlicePool.Get().(*iterator4Slice) + it.rows = rows + it.cursor = 0 + return it } type iterator4Slice struct {