Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: track the memory usage in Insert/Update/Delete executors #34097

Merged
merged 23 commits into from
Jun 22, 2022
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions executor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,20 +158,26 @@ func (e *DeleteExec) doBatchDelete(ctx context.Context) error {
}

func (e *DeleteExec) composeTblRowMap(tblRowMap tableRowMapType, colPosInfos []plannercore.TblColPosInfo, joinedRow []types.Datum) error {
// iterate all the joined tables, and got the copresonding rows in joinedRow.
// iterate all the joined tables, and got the corresponding rows in joinedRow.
for _, info := range colPosInfos {
if unmatchedOuterRow(info, joinedRow) {
continue
}
if tblRowMap[info.TblID] == nil {
tblRowMap[info.TblID] = kv.NewHandleMap()
tblRowMap[info.TblID] = kv.NewMemAwareHandleMap[[]types.Datum]()
}
handle, err := info.HandleCols.BuildHandleByDatums(joinedRow)
if err != nil {
return err
}
// tblRowMap[info.TblID][handle] hold the row datas binding to this table and this handle.
tblRowMap[info.TblID].Set(handle, joinedRow[info.Start:info.End])
_, exist := tblRowMap[info.TblID].Get(handle)
memDelta := tblRowMap[info.TblID].Set(handle, joinedRow[info.Start:info.End])
if !exist {
memDelta += types.EstimatedMemUsage(joinedRow, 1)
wshwsh12 marked this conversation as resolved.
Show resolved Hide resolved
memDelta += int64(handle.ExtraMemSize())
}
e.memTracker.Consume(memDelta)
}
return nil
}
Expand Down Expand Up @@ -240,6 +246,7 @@ func (e *DeleteExec) removeRow(ctx sessionctx.Context, t table.Table, h kv.Handl

// Close implements the Executor Close interface.
func (e *DeleteExec) Close() error {
defer e.memTracker.ReplaceBytesUsed(0)
return e.children[0].Close()
}

Expand All @@ -254,4 +261,4 @@ func (e *DeleteExec) Open(ctx context.Context) error {
// tableRowMapType is a map for unique (Table, Row) pair. key is the tableID.
// the key in map[int64]Row is the joined table handle, which represent a unique reference row.
// the value in map[int64]Row is the deleting row.
type tableRowMapType map[int64]*kv.HandleMap
type tableRowMapType map[int64]*kv.MemAwareHandleMap[[]types.Datum]
29 changes: 19 additions & 10 deletions executor/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ type UpdateExec struct {

// updatedRowKeys is a map for unique (TableAlias, handle) pair.
// The value is true if the row is changed, or false otherwise
updatedRowKeys map[int]*kv.HandleMap
updatedRowKeys map[int]*kv.MemAwareHandleMap[bool]
tblID2table map[int64]table.Table
// mergedRowData is a map for unique (Table, handle) pair.
// The value is cached table row
mergedRowData map[int64]*kv.HandleMap
mergedRowData map[int64]*kv.MemAwareHandleMap[[]types.Datum]
multiUpdateOnSameTable map[int64]bool

matched uint64 // a counter of matched rows during update
Expand All @@ -71,15 +71,15 @@ type UpdateExec struct {
// prepare `handles`, `tableUpdatable`, `changed` to avoid re-computations.
func (e *UpdateExec) prepare(row []types.Datum) (err error) {
if e.updatedRowKeys == nil {
e.updatedRowKeys = make(map[int]*kv.HandleMap)
e.updatedRowKeys = make(map[int]*kv.MemAwareHandleMap[bool])
}
e.handles = e.handles[:0]
e.tableUpdatable = e.tableUpdatable[:0]
e.changed = e.changed[:0]
e.matches = e.matches[:0]
for _, content := range e.tblColPosInfos {
if e.updatedRowKeys[content.Start] == nil {
e.updatedRowKeys[content.Start] = kv.NewHandleMap()
e.updatedRowKeys[content.Start] = kv.NewMemAwareHandleMap[bool]()
}
handle, err := content.HandleCols.BuildHandleByDatums(row)
if err != nil {
Expand All @@ -102,7 +102,7 @@ func (e *UpdateExec) prepare(row []types.Datum) (err error) {

changed, ok := e.updatedRowKeys[content.Start].Get(handle)
if ok {
e.changed = append(e.changed, changed.(bool))
e.changed = append(e.changed, changed)
e.matches = append(e.matches, false)
} else {
e.changed = append(e.changed, false)
Expand All @@ -114,7 +114,7 @@ func (e *UpdateExec) prepare(row []types.Datum) (err error) {

func (e *UpdateExec) merge(row, newData []types.Datum, mergeGenerated bool) error {
if e.mergedRowData == nil {
e.mergedRowData = make(map[int64]*kv.HandleMap)
e.mergedRowData = make(map[int64]*kv.MemAwareHandleMap[[]types.Datum])
}
var mergedData []types.Datum
// merge updates from and into mergedRowData
Expand All @@ -135,13 +135,13 @@ func (e *UpdateExec) merge(row, newData []types.Datum, mergeGenerated bool) erro
flags := e.assignFlag[content.Start:content.End]

if e.mergedRowData[content.TblID] == nil {
e.mergedRowData[content.TblID] = kv.NewHandleMap()
e.mergedRowData[content.TblID] = kv.NewMemAwareHandleMap[[]types.Datum]()
}
tbl := e.tblID2table[content.TblID]
oldData := row[content.Start:content.End]
newTableData := newData[content.Start:content.End]
if v, ok := e.mergedRowData[content.TblID].Get(handle); ok {
mergedData = v.([]types.Datum)
mergedData = v
for i, flag := range flags {
if tbl.WritableCols()[i].IsGenerated() != mergeGenerated {
continue
Expand All @@ -156,7 +156,10 @@ func (e *UpdateExec) merge(row, newData []types.Datum, mergeGenerated bool) erro
} else {
mergedData = append([]types.Datum{}, newTableData...)
}
e.mergedRowData[content.TblID].Set(handle, mergedData)

memDelta := e.mergedRowData[content.TblID].Set(handle, mergedData)
memDelta += types.EstimatedMemUsage(mergedData, 1) + int64(handle.ExtraMemSize())
e.memTracker.Consume(memDelta)
}
return nil
}
Expand Down Expand Up @@ -190,7 +193,12 @@ func (e *UpdateExec) exec(ctx context.Context, schema *expression.Schema, row, n
// Update row
changed, err1 := updateRecord(ctx, e.ctx, handle, oldData, newTableData, flags, tbl, false, e.memTracker)
if err1 == nil {
e.updatedRowKeys[content.Start].Set(handle, changed)
_, exist := e.updatedRowKeys[content.Start].Get(handle)
memDelta := e.updatedRowKeys[content.Start].Set(handle, changed)
if !exist {
memDelta += int64(handle.ExtraMemSize())
}
e.memTracker.Consume(memDelta)
continue
}

Expand Down Expand Up @@ -426,6 +434,7 @@ func (e *UpdateExec) Close() error {
txn.GetSnapshot().SetOption(kv.CollectRuntimeStats, nil)
}
}
defer e.memTracker.ReplaceBytesUsed(0)
return e.children[0].Close()
}

Expand Down
92 changes: 87 additions & 5 deletions kv/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/set"
)

// Key represents high-level Key type.
Expand Down Expand Up @@ -158,9 +159,15 @@ type Handle interface {
// String implements the fmt.Stringer interface.
String() string
// MemUsage returns the memory usage of a handle.
MemUsage() int64
MemUsage() uint64
// ExtraMemSize returns the dynamic size of memory occupied by the handle, e.g. slices.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does dynamic mean? How do we use this func?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dynamic means the part of memory is not a fixed number, i.e. slices or pointers. Do you have any suggestions on improving the comment?
We use it with the mem-aware map, since the map only counts the actual size of the keys and values, but doesn't contain the objects that are referenced by pointers.

ExtraMemSize() uint64
}

var _ Handle = IntHandle(0)
var _ Handle = &CommonHandle{}
var _ Handle = PartitionHandle{}

// IntHandle implement the Handle interface for int64 type handle.
type IntHandle int64

Expand Down Expand Up @@ -231,10 +238,15 @@ func (ih IntHandle) String() string {
}

// MemUsage implements the Handle interface.
func (ih IntHandle) MemUsage() int64 {
func (ih IntHandle) MemUsage() uint64 {
return 8
}

// ExtraMemSize implements the Handle interface.
func (ih IntHandle) ExtraMemSize() uint64 {
return 0
}

// CommonHandle implements the Handle interface for non-int64 type handle.
type CommonHandle struct {
encoded []byte
Expand Down Expand Up @@ -355,8 +367,14 @@ func (ch *CommonHandle) String() string {
}

// MemUsage implements the Handle interface.
func (ch *CommonHandle) MemUsage() int64 {
return int64(cap(ch.encoded)) + int64(cap(ch.colEndOffsets))*2
func (ch *CommonHandle) MemUsage() uint64 {
// 48 is used by the 2 slice fields.
return 48 + ch.ExtraMemSize()
ekexium marked this conversation as resolved.
Show resolved Hide resolved
}

// ExtraMemSize implements the Handle interface.
func (ch *CommonHandle) ExtraMemSize() uint64 {
return uint64(len(ch.encoded) + len(ch.colEndOffsets)*2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1.cap(ch.encoded) and cap(ch.colEndOffsets)?
2. Why do we need to *2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

colEndOffsets is an array of uint16.

}

// HandleMap is the map for Handle.
Expand Down Expand Up @@ -431,6 +449,65 @@ func (m *HandleMap) Range(fn func(h Handle, val interface{}) bool) {
}
}

// MemAwareHandleMap is similar to HandleMap, but it's aware of its memory usage and doesn't support delete.
// It only tracks the actual sizes. Objects that are pointed to by the key or value are not tracked.
// Those should be tracked by the caller.
type MemAwareHandleMap[V any] struct {
ints set.MemAwareMap[int64, V]
strs set.MemAwareMap[string, strHandleValue[V]]
}

type strHandleValue[V any] struct {
h Handle
val V
}

// NewMemAwareHandleMap creates a new map for handle.
func NewMemAwareHandleMap[V any]() *MemAwareHandleMap[V] {
// Initialize the two maps to avoid checking nil.
return &MemAwareHandleMap[V]{
ints: set.NewMemAwareMap[int64, V](),
strs: set.NewMemAwareMap[string, strHandleValue[V]](),
}
}

// Get gets a value by a Handle.
func (m *MemAwareHandleMap[V]) Get(h Handle) (v V, ok bool) {
if h.IsInt() {
v, ok = m.ints.Get(h.IntValue())
} else {
var strVal strHandleValue[V]
strVal, ok = m.strs.Get(string(h.Encoded()))
v = strVal.val
}
return
}

// Set sets a value with a Handle.
func (m *MemAwareHandleMap[V]) Set(h Handle, val V) int64 {
if h.IsInt() {
return m.ints.Set(h.IntValue(), val)
}
return m.strs.Set(string(h.Encoded()), strHandleValue[V]{
h: h,
val: val,
})
}

// Range iterates the MemAwareHandleMap with fn, the fn returns true to continue, returns false to stop.
func (m *MemAwareHandleMap[V]) Range(fn func(h Handle, val interface{}) bool) {
for h, val := range m.ints.M {
if !fn(IntHandle(h), val) {
return
}
}
for _, strVal := range m.strs.M {
if !fn(strVal.h, strVal.val) {
return
}
}
}

// PartitionHandle combines a handle and a PartitionID, used to location a row in partitioned table.
// Now only used in global index.
// TODO: support PartitionHandle in HandleMap.
Expand Down Expand Up @@ -470,6 +547,11 @@ func (ph PartitionHandle) Compare(h Handle) int {
}

// MemUsage implements the Handle interface.
func (ph PartitionHandle) MemUsage() int64 {
func (ph PartitionHandle) MemUsage() uint64 {
return ph.Handle.MemUsage() + 8
}

// ExtraMemUsage implements the Handle interface.
func (ph PartitionHandle) ExtraMemUsage() uint64 {
ekexium marked this conversation as resolved.
Show resolved Hide resolved
return ph.Handle.ExtraMemSize()
}
2 changes: 1 addition & 1 deletion statistics/row_sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (i ReservoirRowSampleItem) MemUsage() (sum int64) {
sum += col.MemUsage()
}
if i.Handle != nil {
sum += i.Handle.MemUsage()
sum += int64(i.Handle.MemUsage())
}
return sum
}
Expand Down
5 changes: 5 additions & 0 deletions util/hack/hack.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,8 @@ const (
// DefBucketMemoryUsageForSetInt64 = bucketSize*(1+unsafe.Sizeof(int64) + unsafe.Sizeof(struct{}))+2*ptrSize
DefBucketMemoryUsageForSetInt64 = (8*(1+8+0) + 16) / 2 * 3
)

// EstimateBucketMemoryUsage returns the estimated memory usage of a bucket in a map.
func EstimateBucketMemoryUsage[K comparable, V any]() uint64 {
return (8*(1+uint64(unsafe.Sizeof(*new(K))+unsafe.Sizeof(*new(V)))) + 16) / 2 * 3
}
72 changes: 72 additions & 0 deletions util/set/mem_aware_map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package set
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add an individual package like map or tiMap to put these files?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Map and set are highly related. And map is reserved so I think leaving them in set is fine.


import (
"math"

"github.com/pingcap/tidb/util/hack"
)

// MemAwareMap is a map which is aware of its memory usage. It's adapted from SetWithMemoryUsage.
// It doesn't support delete.
// The estimate usage of memory is usually smaller than the real usage.
// According to experiments with SetWithMemoryUsage, in worst case the maximum bias is 50%, i.e. real usage <= 1.5 * estimated usage.
ekexium marked this conversation as resolved.
Show resolved Hide resolved
type MemAwareMap[K comparable, V any] struct {
M map[K]V // it's public, when callers want to directly access it, e.g. use in a for-range-loop
bInMap int64
bucketMemoryUsage uint64
}

// EstimateMapSize returns the estimated size of the map. It doesn't include the dynamic part, e.g. objects pointed to by pointers in the map.
// len(map) <= load_factor * 2^bInMap. bInMap = ceil(log2(len(map)/load_factor)).
// memory = bucketSize * 2^bInMap
func EstimateMapSize(length int, bucketSize uint64) uint64 {
if length == 0 {
return 0
}
bInMap := uint64(math.Ceil(math.Log2(float64(length) * hack.LoadFactorDen / hack.LoadFactorNum)))
return bucketSize * uint64(1<<bInMap)
}
wshwsh12 marked this conversation as resolved.
Show resolved Hide resolved

// NewMemAwareMap creates a new MemAwareMap.
func NewMemAwareMap[K comparable, V any]() MemAwareMap[K, V] {
return MemAwareMap[K, V]{
M: make(map[K]V),
bInMap: 0,
bucketMemoryUsage: hack.EstimateBucketMemoryUsage[K, V](),
}
}

// Get the value of the key.
func (m *MemAwareMap[K, V]) Get(k K) (v V, ok bool) {
v, ok = m.M[k]
return
}

// Set the value of the key.
func (m *MemAwareMap[K, V]) Set(k K, v V) (memDelta int64) {
m.M[k] = v
if len(m.M) > (1<<m.bInMap)*hack.LoadFactorNum/hack.LoadFactorDen {
memDelta = int64(m.bucketMemoryUsage * (1 << m.bInMap))
m.bInMap++
}
return memDelta
}

// Len returns the number of elements in the map.
func (m *MemAwareMap[K, V]) Len() int {
return len(m.M)
}
Loading