From 95e13afa13958767d107f731274b5afe1679cf49 Mon Sep 17 00:00:00 2001 From: ekexium Date: Wed, 22 Jun 2022 19:10:38 +0800 Subject: [PATCH] *: track the memory usage in Insert/Update/Delete executors (#34097) close pingcap/tidb#34096 --- executor/delete.go | 15 ++++-- executor/update.go | 29 +++++++---- kv/key.go | 93 ++++++++++++++++++++++++++++++++-- kv/key_test.go | 81 +++++++++++++++++++++++++++++ statistics/row_sampler.go | 2 +- util/hack/hack.go | 5 ++ util/set/mem_aware_map.go | 72 ++++++++++++++++++++++++++ util/set/mem_aware_map_test.go | 80 +++++++++++++++++++++++++++++ 8 files changed, 357 insertions(+), 20 deletions(-) create mode 100644 util/set/mem_aware_map.go create mode 100644 util/set/mem_aware_map_test.go diff --git a/executor/delete.go b/executor/delete.go index d72eae827d9e2..5e759e12db25c 100644 --- a/executor/delete.go +++ b/executor/delete.go @@ -158,20 +158,26 @@ func (e *DeleteExec) doBatchDelete(ctx context.Context) error { } func (e *DeleteExec) composeTblRowMap(tblRowMap tableRowMapType, colPosInfos []plannercore.TblColPosInfo, joinedRow []types.Datum) error { - // iterate all the joined tables, and got the copresonding rows in joinedRow. + // iterate all the joined tables, and got the corresponding rows in joinedRow. for _, info := range colPosInfos { if unmatchedOuterRow(info, joinedRow) { continue } if tblRowMap[info.TblID] == nil { - tblRowMap[info.TblID] = kv.NewHandleMap() + tblRowMap[info.TblID] = kv.NewMemAwareHandleMap[[]types.Datum]() } handle, err := info.HandleCols.BuildHandleByDatums(joinedRow) if err != nil { return err } // tblRowMap[info.TblID][handle] hold the row datas binding to this table and this handle. - tblRowMap[info.TblID].Set(handle, joinedRow[info.Start:info.End]) + _, exist := tblRowMap[info.TblID].Get(handle) + memDelta := tblRowMap[info.TblID].Set(handle, joinedRow[info.Start:info.End]) + if !exist { + memDelta += types.EstimatedMemUsage(joinedRow, 1) + memDelta += int64(handle.ExtraMemSize()) + } + e.memTracker.Consume(memDelta) } return nil } @@ -240,6 +246,7 @@ func (e *DeleteExec) removeRow(ctx sessionctx.Context, t table.Table, h kv.Handl // Close implements the Executor Close interface. func (e *DeleteExec) Close() error { + defer e.memTracker.ReplaceBytesUsed(0) return e.children[0].Close() } @@ -254,4 +261,4 @@ func (e *DeleteExec) Open(ctx context.Context) error { // tableRowMapType is a map for unique (Table, Row) pair. key is the tableID. // the key in map[int64]Row is the joined table handle, which represent a unique reference row. // the value in map[int64]Row is the deleting row. -type tableRowMapType map[int64]*kv.HandleMap +type tableRowMapType map[int64]*kv.MemAwareHandleMap[[]types.Datum] diff --git a/executor/update.go b/executor/update.go index faf5b1d15e1bf..196f737aa057f 100644 --- a/executor/update.go +++ b/executor/update.go @@ -42,11 +42,11 @@ type UpdateExec struct { // updatedRowKeys is a map for unique (TableAlias, handle) pair. // The value is true if the row is changed, or false otherwise - updatedRowKeys map[int]*kv.HandleMap + updatedRowKeys map[int]*kv.MemAwareHandleMap[bool] tblID2table map[int64]table.Table // mergedRowData is a map for unique (Table, handle) pair. // The value is cached table row - mergedRowData map[int64]*kv.HandleMap + mergedRowData map[int64]*kv.MemAwareHandleMap[[]types.Datum] multiUpdateOnSameTable map[int64]bool matched uint64 // a counter of matched rows during update @@ -71,7 +71,7 @@ type UpdateExec struct { // prepare `handles`, `tableUpdatable`, `changed` to avoid re-computations. func (e *UpdateExec) prepare(row []types.Datum) (err error) { if e.updatedRowKeys == nil { - e.updatedRowKeys = make(map[int]*kv.HandleMap) + e.updatedRowKeys = make(map[int]*kv.MemAwareHandleMap[bool]) } e.handles = e.handles[:0] e.tableUpdatable = e.tableUpdatable[:0] @@ -79,7 +79,7 @@ func (e *UpdateExec) prepare(row []types.Datum) (err error) { e.matches = e.matches[:0] for _, content := range e.tblColPosInfos { if e.updatedRowKeys[content.Start] == nil { - e.updatedRowKeys[content.Start] = kv.NewHandleMap() + e.updatedRowKeys[content.Start] = kv.NewMemAwareHandleMap[bool]() } handle, err := content.HandleCols.BuildHandleByDatums(row) if err != nil { @@ -102,7 +102,7 @@ func (e *UpdateExec) prepare(row []types.Datum) (err error) { changed, ok := e.updatedRowKeys[content.Start].Get(handle) if ok { - e.changed = append(e.changed, changed.(bool)) + e.changed = append(e.changed, changed) e.matches = append(e.matches, false) } else { e.changed = append(e.changed, false) @@ -114,7 +114,7 @@ func (e *UpdateExec) prepare(row []types.Datum) (err error) { func (e *UpdateExec) merge(row, newData []types.Datum, mergeGenerated bool) error { if e.mergedRowData == nil { - e.mergedRowData = make(map[int64]*kv.HandleMap) + e.mergedRowData = make(map[int64]*kv.MemAwareHandleMap[[]types.Datum]) } var mergedData []types.Datum // merge updates from and into mergedRowData @@ -135,13 +135,13 @@ func (e *UpdateExec) merge(row, newData []types.Datum, mergeGenerated bool) erro flags := e.assignFlag[content.Start:content.End] if e.mergedRowData[content.TblID] == nil { - e.mergedRowData[content.TblID] = kv.NewHandleMap() + e.mergedRowData[content.TblID] = kv.NewMemAwareHandleMap[[]types.Datum]() } tbl := e.tblID2table[content.TblID] oldData := row[content.Start:content.End] newTableData := newData[content.Start:content.End] if v, ok := e.mergedRowData[content.TblID].Get(handle); ok { - mergedData = v.([]types.Datum) + mergedData = v for i, flag := range flags { if tbl.WritableCols()[i].IsGenerated() != mergeGenerated { continue @@ -156,7 +156,10 @@ func (e *UpdateExec) merge(row, newData []types.Datum, mergeGenerated bool) erro } else { mergedData = append([]types.Datum{}, newTableData...) } - e.mergedRowData[content.TblID].Set(handle, mergedData) + + memDelta := e.mergedRowData[content.TblID].Set(handle, mergedData) + memDelta += types.EstimatedMemUsage(mergedData, 1) + int64(handle.ExtraMemSize()) + e.memTracker.Consume(memDelta) } return nil } @@ -190,7 +193,12 @@ func (e *UpdateExec) exec(ctx context.Context, schema *expression.Schema, row, n // Update row changed, err1 := updateRecord(ctx, e.ctx, handle, oldData, newTableData, flags, tbl, false, e.memTracker) if err1 == nil { - e.updatedRowKeys[content.Start].Set(handle, changed) + _, exist := e.updatedRowKeys[content.Start].Get(handle) + memDelta := e.updatedRowKeys[content.Start].Set(handle, changed) + if !exist { + memDelta += int64(handle.ExtraMemSize()) + } + e.memTracker.Consume(memDelta) continue } @@ -426,6 +434,7 @@ func (e *UpdateExec) Close() error { txn.GetSnapshot().SetOption(kv.CollectRuntimeStats, nil) } } + defer e.memTracker.ReplaceBytesUsed(0) return e.children[0].Close() } diff --git a/kv/key.go b/kv/key.go index 3e68b5fc80dd7..561cc2a03fd78 100644 --- a/kv/key.go +++ b/kv/key.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/set" ) // Key represents high-level Key type. @@ -158,9 +159,15 @@ type Handle interface { // String implements the fmt.Stringer interface. String() string // MemUsage returns the memory usage of a handle. - MemUsage() int64 + MemUsage() uint64 + // ExtraMemSize returns the memory usage of objects that are pointed to by the Handle. + ExtraMemSize() uint64 } +var _ Handle = IntHandle(0) +var _ Handle = &CommonHandle{} +var _ Handle = PartitionHandle{} + // IntHandle implement the Handle interface for int64 type handle. type IntHandle int64 @@ -231,10 +238,15 @@ func (ih IntHandle) String() string { } // MemUsage implements the Handle interface. -func (ih IntHandle) MemUsage() int64 { +func (ih IntHandle) MemUsage() uint64 { return 8 } +// ExtraMemSize implements the Handle interface. +func (ih IntHandle) ExtraMemSize() uint64 { + return 0 +} + // CommonHandle implements the Handle interface for non-int64 type handle. type CommonHandle struct { encoded []byte @@ -355,8 +367,15 @@ func (ch *CommonHandle) String() string { } // MemUsage implements the Handle interface. -func (ch *CommonHandle) MemUsage() int64 { - return int64(cap(ch.encoded)) + int64(cap(ch.colEndOffsets))*2 +func (ch *CommonHandle) MemUsage() uint64 { + // 48 is used by the 2 slice fields. + return 48 + ch.ExtraMemSize() +} + +// ExtraMemSize implements the Handle interface. +func (ch *CommonHandle) ExtraMemSize() uint64 { + // colEndOffsets is a slice of uint16. + return uint64(cap(ch.encoded) + cap(ch.colEndOffsets)*2) } // HandleMap is the map for Handle. @@ -431,6 +450,65 @@ func (m *HandleMap) Range(fn func(h Handle, val interface{}) bool) { } } +// MemAwareHandleMap is similar to HandleMap, but it's aware of its memory usage and doesn't support delete. +// It only tracks the actual sizes. Objects that are pointed to by the key or value are not tracked. +// Those should be tracked by the caller. +type MemAwareHandleMap[V any] struct { + ints set.MemAwareMap[int64, V] + strs set.MemAwareMap[string, strHandleValue[V]] +} + +type strHandleValue[V any] struct { + h Handle + val V +} + +// NewMemAwareHandleMap creates a new map for handle. +func NewMemAwareHandleMap[V any]() *MemAwareHandleMap[V] { + // Initialize the two maps to avoid checking nil. + return &MemAwareHandleMap[V]{ + ints: set.NewMemAwareMap[int64, V](), + strs: set.NewMemAwareMap[string, strHandleValue[V]](), + } +} + +// Get gets a value by a Handle. +func (m *MemAwareHandleMap[V]) Get(h Handle) (v V, ok bool) { + if h.IsInt() { + v, ok = m.ints.Get(h.IntValue()) + } else { + var strVal strHandleValue[V] + strVal, ok = m.strs.Get(string(h.Encoded())) + v = strVal.val + } + return +} + +// Set sets a value with a Handle. +func (m *MemAwareHandleMap[V]) Set(h Handle, val V) int64 { + if h.IsInt() { + return m.ints.Set(h.IntValue(), val) + } + return m.strs.Set(string(h.Encoded()), strHandleValue[V]{ + h: h, + val: val, + }) +} + +// Range iterates the MemAwareHandleMap with fn, the fn returns true to continue, returns false to stop. +func (m *MemAwareHandleMap[V]) Range(fn func(h Handle, val interface{}) bool) { + for h, val := range m.ints.M { + if !fn(IntHandle(h), val) { + return + } + } + for _, strVal := range m.strs.M { + if !fn(strVal.h, strVal.val) { + return + } + } +} + // PartitionHandle combines a handle and a PartitionID, used to location a row in partitioned table. // Now only used in global index. // TODO: support PartitionHandle in HandleMap. @@ -470,6 +548,11 @@ func (ph PartitionHandle) Compare(h Handle) int { } // MemUsage implements the Handle interface. -func (ph PartitionHandle) MemUsage() int64 { +func (ph PartitionHandle) MemUsage() uint64 { return ph.Handle.MemUsage() + 8 } + +// ExtraMemSize implements the Handle interface. +func (ph PartitionHandle) ExtraMemSize() uint64 { + return ph.Handle.ExtraMemSize() +} diff --git a/kv/key_test.go b/kv/key_test.go index 3d3ee3ce5fb1a..af45999d5e5bc 100644 --- a/kv/key_test.go +++ b/kv/key_test.go @@ -17,6 +17,7 @@ package kv_test import ( "bytes" "errors" + "strconv" "testing" "time" @@ -221,3 +222,83 @@ func BenchmarkIsPoint(b *testing.B) { kr.IsPoint() } } + +var result int + +var inputs = []struct { + input int +}{ + {input: 1}, + {input: 100}, + {input: 10000}, + {input: 1000000}, +} + +func memAwareIntMap(size int, handles []Handle) int { + var x int + m := NewMemAwareHandleMap[int]() + for j := 0; j < size; j++ { + m.Set(handles[j], j) + } + for j := 0; j < size; j++ { + x, _ = m.Get(handles[j]) + } + return x +} + +func nativeIntMap(size int, handles []Handle) int { + var x int + m := make(map[Handle]int) + for j := 0; j < size; j++ { + m[handles[j]] = j + } + + for j := 0; j < size; j++ { + x = m[handles[j]] + } + return x +} + +func BenchmarkMemAwareHandleMap(b *testing.B) { + var sc stmtctx.StatementContext + for _, s := range inputs { + handles := make([]Handle, s.input) + for i := 0; i < s.input; i++ { + if i%2 == 0 { + handles[i] = IntHandle(i) + } else { + handleBytes, _ := codec.EncodeKey(&sc, nil, types.NewIntDatum(int64(i))) + handles[i], _ = NewCommonHandle(handleBytes) + } + } + b.Run("MemAwareIntMap_"+strconv.Itoa(s.input), func(b *testing.B) { + var x int + for i := 0; i < b.N; i++ { + x = memAwareIntMap(s.input, handles) + } + result = x + }) + } +} + +func BenchmarkNativeHandleMap(b *testing.B) { + var sc stmtctx.StatementContext + for _, s := range inputs { + handles := make([]Handle, s.input) + for i := 0; i < s.input; i++ { + if i%2 == 0 { + handles[i] = IntHandle(i) + } else { + handleBytes, _ := codec.EncodeKey(&sc, nil, types.NewIntDatum(int64(i))) + handles[i], _ = NewCommonHandle(handleBytes) + } + } + b.Run("NativeIntMap_"+strconv.Itoa(s.input), func(b *testing.B) { + var x int + for i := 0; i < b.N; i++ { + x = nativeIntMap(s.input, handles) + } + result = x + }) + } +} diff --git a/statistics/row_sampler.go b/statistics/row_sampler.go index d091f2be818fb..c80af0b980c79 100644 --- a/statistics/row_sampler.go +++ b/statistics/row_sampler.go @@ -78,7 +78,7 @@ func (i ReservoirRowSampleItem) MemUsage() (sum int64) { sum += col.MemUsage() } if i.Handle != nil { - sum += i.Handle.MemUsage() + sum += int64(i.Handle.MemUsage()) } return sum } diff --git a/util/hack/hack.go b/util/hack/hack.go index f4a2f79f894ab..8e586485ee9bd 100644 --- a/util/hack/hack.go +++ b/util/hack/hack.go @@ -76,3 +76,8 @@ const ( // DefBucketMemoryUsageForSetInt64 = bucketSize*(1+unsafe.Sizeof(int64) + unsafe.Sizeof(struct{}))+2*ptrSize DefBucketMemoryUsageForSetInt64 = (8*(1+8+0) + 16) / 2 * 3 ) + +// EstimateBucketMemoryUsage returns the estimated memory usage of a bucket in a map. +func EstimateBucketMemoryUsage[K comparable, V any]() uint64 { + return (8*(1+uint64(unsafe.Sizeof(*new(K))+unsafe.Sizeof(*new(V)))) + 16) / 2 * 3 +} diff --git a/util/set/mem_aware_map.go b/util/set/mem_aware_map.go new file mode 100644 index 0000000000000..da1cb227af306 --- /dev/null +++ b/util/set/mem_aware_map.go @@ -0,0 +1,72 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package set + +import ( + "math" + + "github.com/pingcap/tidb/util/hack" +) + +// MemAwareMap is a map which is aware of its memory usage. It's adapted from SetWithMemoryUsage. +// It doesn't support delete. +// The estimate usage of memory is usually smaller than the real usage. +// According to experiments with SetWithMemoryUsage, 2/3 * estimated usage <= real usage <= estimated usage. +type MemAwareMap[K comparable, V any] struct { + M map[K]V // it's public, when callers want to directly access it, e.g. use in a for-range-loop + bInMap int64 + bucketMemoryUsage uint64 +} + +// EstimateMapSize returns the estimated size of the map. It doesn't include the dynamic part, e.g. objects pointed to by pointers in the map. +// len(map) <= load_factor * 2^bInMap. bInMap = ceil(log2(len(map)/load_factor)). +// memory = bucketSize * 2^bInMap +func EstimateMapSize(length int, bucketSize uint64) uint64 { + if length == 0 { + return 0 + } + bInMap := uint64(math.Ceil(math.Log2(float64(length) * hack.LoadFactorDen / hack.LoadFactorNum))) + return bucketSize * uint64(1< (1<