Skip to content

Commit

Permalink
executor: decrease the memory usage of hashTable in HashJoinExec (#11832
Browse files Browse the repository at this point in the history
)
  • Loading branch information
SunRunAway authored Aug 29, 2019
1 parent 39f83c1 commit bdbaeb4
Show file tree
Hide file tree
Showing 12 changed files with 732 additions and 253 deletions.
206 changes: 197 additions & 9 deletions executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,22 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/stringutil"
)

var (
_ Executor = &mockDataSource{}
)

type mockDataSourceParameters struct {
schema *expression.Schema
ndvs []int // number of distinct values on columns[i] and zero represents no limit
orders []bool // columns[i] should be ordered if orders[i] is true
rows int // number of rows the DataSource should output
ctx sessionctx.Context
schema *expression.Schema
genDataFunc func(row int, typ *types.FieldType) interface{}
ndvs []int // number of distinct values on columns[i] and zero represents no limit
orders []bool // columns[i] should be ordered if orders[i] is true
rows int // number of rows the DataSource should output
ctx sessionctx.Context
}

type mockDataSource struct {
Expand All @@ -56,11 +59,21 @@ type mockDataSource struct {

func (mds *mockDataSource) genColDatums(col int) (results []interface{}) {
typ := mds.retFieldTypes[col]
order := mds.p.orders[col]
order := false
if col < len(mds.p.orders) {
order = mds.p.orders[col]
}
rows := mds.p.rows
NDV := mds.p.ndvs[col]
NDV := 0
if col < len(mds.p.ndvs) {
NDV = mds.p.ndvs[col]
}
results = make([]interface{}, 0, rows)
if NDV == 0 {
if mds.p.genDataFunc != nil {
for i := 0; i < rows; i++ {
results = append(results, mds.p.genDataFunc(i, typ))
}
} else if NDV == 0 {
for i := 0; i < rows; i++ {
results = append(results, mds.randDatum(typ))
}
Expand Down Expand Up @@ -184,7 +197,7 @@ func (a aggTestCase) columns() []*expression.Column {
}

func (a aggTestCase) String() string {
return fmt.Sprintf("(execType:%v, aggFunc:%v, ndv:%v, hasDistinct:%v, rows:%v, concruuency:%v)",
return fmt.Sprintf("(execType:%v, aggFunc:%v, ndv:%v, hasDistinct:%v, rows:%v, concurrency:%v)",
a.execType, a.aggFunc, a.groupByNDV, a.hasDistinct, a.rows, a.concurrency)
}

Expand Down Expand Up @@ -503,3 +516,178 @@ func BenchmarkWindowFunctions(b *testing.B) {
})
}
}

type hashJoinTestCase struct {
rows int
concurrency int
ctx sessionctx.Context
keyIdx []int
}

func (tc hashJoinTestCase) columns() []*expression.Column {
return []*expression.Column{
{Index: 0, RetType: types.NewFieldType(mysql.TypeLonglong)},
{Index: 1, RetType: types.NewFieldType(mysql.TypeVarString)},
}
}

func (tc hashJoinTestCase) String() string {
return fmt.Sprintf("(rows:%v, concurency:%v, joinKeyIdx: %v)",
tc.rows, tc.concurrency, tc.keyIdx)
}

func defaultHashJoinTestCase() *hashJoinTestCase {
ctx := mock.NewContext()
ctx.GetSessionVars().InitChunkSize = variable.DefInitChunkSize
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(nil, -1)
tc := &hashJoinTestCase{rows: 100000, concurrency: 4, ctx: ctx, keyIdx: []int{0, 1}}
return tc
}

func prepare4Join(testCase *hashJoinTestCase, innerExec, outerExec Executor) *HashJoinExec {
cols0 := testCase.columns()
cols1 := testCase.columns()
joinSchema := expression.NewSchema(cols0...)
joinSchema.Append(cols1...)
joinKeys := make([]*expression.Column, 0, len(testCase.keyIdx))
for _, keyIdx := range testCase.keyIdx {
joinKeys = append(joinKeys, cols0[keyIdx])
}
e := &HashJoinExec{
baseExecutor: newBaseExecutor(testCase.ctx, joinSchema, stringutil.StringerStr("HashJoin"), innerExec, outerExec),
concurrency: uint(testCase.concurrency),
joinType: 0, // InnerJoin
isOuterJoin: false,
innerKeys: joinKeys,
outerKeys: joinKeys,
innerExec: innerExec,
outerExec: outerExec,
}
defaultValues := make([]types.Datum, e.innerExec.Schema().Len())
lhsTypes, rhsTypes := retTypes(innerExec), retTypes(outerExec)
e.joiners = make([]joiner, e.concurrency)
for i := uint(0); i < e.concurrency; i++ {
e.joiners[i] = newJoiner(testCase.ctx, e.joinType, true, defaultValues,
nil, lhsTypes, rhsTypes)
}
return e
}

func benchmarkHashJoinExecWithCase(b *testing.B, casTest *hashJoinTestCase) {
opt := mockDataSourceParameters{
schema: expression.NewSchema(casTest.columns()...),
rows: casTest.rows,
ctx: casTest.ctx,
genDataFunc: func(row int, typ *types.FieldType) interface{} {
switch typ.Tp {
case mysql.TypeLong, mysql.TypeLonglong:
return int64(row)
case mysql.TypeVarString:
return rawData
default:
panic("not implement")
}
},
}
dataSource1 := buildMockDataSource(opt)
dataSource2 := buildMockDataSource(opt)

b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
exec := prepare4Join(casTest, dataSource1, dataSource2)
tmpCtx := context.Background()
chk := newFirstChunk(exec)
dataSource1.prepareChunks()
dataSource2.prepareChunks()

b.StartTimer()
if err := exec.Open(tmpCtx); err != nil {
b.Fatal(err)
}
for {
if err := exec.Next(tmpCtx, chk); err != nil {
b.Fatal(err)
}
if chk.NumRows() == 0 {
break
}
}

if err := exec.Close(); err != nil {
b.Fatal(err)
}
b.StopTimer()
}
}

func BenchmarkHashJoinExec(b *testing.B) {
b.ReportAllocs()
cas := defaultHashJoinTestCase()
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkHashJoinExecWithCase(b, cas)
})

cas.keyIdx = []int{0}
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkHashJoinExecWithCase(b, cas)
})
}

func benchmarkBuildHashTableForList(b *testing.B, casTest *hashJoinTestCase) {
opt := mockDataSourceParameters{
schema: expression.NewSchema(casTest.columns()...),
rows: casTest.rows,
ctx: casTest.ctx,
genDataFunc: func(row int, typ *types.FieldType) interface{} {
switch typ.Tp {
case mysql.TypeLong, mysql.TypeLonglong:
return int64(row)
case mysql.TypeVarString:
return rawData
default:
panic("not implement")
}
},
}
dataSource1 := buildMockDataSource(opt)
dataSource2 := buildMockDataSource(opt)

dataSource1.prepareChunks()
exec := prepare4Join(casTest, dataSource1, dataSource2)
tmpCtx := context.Background()
if err := exec.Open(tmpCtx); err != nil {
b.Fatal(err)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
innerResultCh := make(chan *chunk.Chunk, 1)
go func() {
for _, chk := range dataSource1.genData {
innerResultCh <- chk
}
close(innerResultCh)
}()

b.StartTimer()
if err := exec.buildHashTableForList(innerResultCh); err != nil {
b.Fatal(err)
}
b.StopTimer()
}
}

func BenchmarkBuildHashTableForList(b *testing.B) {
b.ReportAllocs()
cas := defaultHashJoinTestCase()
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkBuildHashTableForList(b, cas)
})

cas.keyIdx = []int{0}
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkBuildHashTableForList(b, cas)
})
}
106 changes: 106 additions & 0 deletions executor/hash_table.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"github.com/pingcap/tidb/util/chunk"
)

const maxEntrySliceLen = 8 * 1024

type entry struct {
ptr chunk.RowPtr
next entryAddr
}

type entryStore struct {
slices [][]entry
sliceIdx uint32
sliceLen uint32
}

func (es *entryStore) put(e entry) entryAddr {
if es.sliceLen == maxEntrySliceLen {
es.slices = append(es.slices, make([]entry, 0, maxEntrySliceLen))
es.sliceLen = 0
es.sliceIdx++
}
addr := entryAddr{sliceIdx: es.sliceIdx, offset: es.sliceLen}
es.slices[es.sliceIdx] = append(es.slices[es.sliceIdx], e)
es.sliceLen++
return addr
}

func (es *entryStore) get(addr entryAddr) entry {
return es.slices[addr.sliceIdx][addr.offset]
}

type entryAddr struct {
sliceIdx uint32
offset uint32
}

var nullEntryAddr = entryAddr{}

// rowHashMap stores multiple rowPtr of rows for a given key with minimum GC overhead.
// A given key can store multiple values.
// It is not thread-safe, should only be used in one goroutine.
type rowHashMap struct {
entryStore entryStore
hashTable map[uint64]entryAddr
length int
}

// newRowHashMap creates a new rowHashMap.
func newRowHashMap() *rowHashMap {
m := new(rowHashMap)
// TODO(fengliyuan): initialize the size of map from the estimated row count for better performance.
m.hashTable = make(map[uint64]entryAddr)
m.entryStore.slices = [][]entry{make([]entry, 0, 64)}
// Reserve the first empty entry, so entryAddr{} can represent nullEntryAddr.
m.entryStore.put(entry{})
return m
}

// Put puts the key/rowPtr pairs to the rowHashMap, multiple rowPtrs are stored in a list.
func (m *rowHashMap) Put(hashKey uint64, rowPtr chunk.RowPtr) {
oldEntryAddr := m.hashTable[hashKey]
e := entry{
ptr: rowPtr,
next: oldEntryAddr,
}
newEntryAddr := m.entryStore.put(e)
m.hashTable[hashKey] = newEntryAddr
m.length++
}

// Get gets the values of the "key" and appends them to "values".
func (m *rowHashMap) Get(hashKey uint64) (rowPtrs []chunk.RowPtr) {
entryAddr := m.hashTable[hashKey]
for entryAddr != nullEntryAddr {
e := m.entryStore.get(entryAddr)
entryAddr = e.next
rowPtrs = append(rowPtrs, e.ptr)
}
// Keep the order of input.
for i := 0; i < len(rowPtrs)/2; i++ {
j := len(rowPtrs) - 1 - i
rowPtrs[i], rowPtrs[j] = rowPtrs[j], rowPtrs[i]
}
return
}

// Len returns the number of rowPtrs in the rowHashMap, the number of keys may be less than Len
// if the same key is put more than once.
func (m *rowHashMap) Len() int { return m.length }
Loading

0 comments on commit bdbaeb4

Please sign in to comment.