Skip to content

Commit

Permalink
*: track the memory usage in Insert/Update/Delete executors (#34097)
Browse files Browse the repository at this point in the history
close #34096
  • Loading branch information
ekexium authored Jun 22, 2022
1 parent fc833a3 commit 95e13af
Show file tree
Hide file tree
Showing 8 changed files with 357 additions and 20 deletions.
15 changes: 11 additions & 4 deletions executor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,20 +158,26 @@ func (e *DeleteExec) doBatchDelete(ctx context.Context) error {
}

func (e *DeleteExec) composeTblRowMap(tblRowMap tableRowMapType, colPosInfos []plannercore.TblColPosInfo, joinedRow []types.Datum) error {
// iterate all the joined tables, and got the copresonding rows in joinedRow.
// iterate all the joined tables, and got the corresponding rows in joinedRow.
for _, info := range colPosInfos {
if unmatchedOuterRow(info, joinedRow) {
continue
}
if tblRowMap[info.TblID] == nil {
tblRowMap[info.TblID] = kv.NewHandleMap()
tblRowMap[info.TblID] = kv.NewMemAwareHandleMap[[]types.Datum]()
}
handle, err := info.HandleCols.BuildHandleByDatums(joinedRow)
if err != nil {
return err
}
// tblRowMap[info.TblID][handle] hold the row datas binding to this table and this handle.
tblRowMap[info.TblID].Set(handle, joinedRow[info.Start:info.End])
_, exist := tblRowMap[info.TblID].Get(handle)
memDelta := tblRowMap[info.TblID].Set(handle, joinedRow[info.Start:info.End])
if !exist {
memDelta += types.EstimatedMemUsage(joinedRow, 1)
memDelta += int64(handle.ExtraMemSize())
}
e.memTracker.Consume(memDelta)
}
return nil
}
Expand Down Expand Up @@ -240,6 +246,7 @@ func (e *DeleteExec) removeRow(ctx sessionctx.Context, t table.Table, h kv.Handl

// Close implements the Executor Close interface.
func (e *DeleteExec) Close() error {
defer e.memTracker.ReplaceBytesUsed(0)
return e.children[0].Close()
}

Expand All @@ -254,4 +261,4 @@ func (e *DeleteExec) Open(ctx context.Context) error {
// tableRowMapType is a map for unique (Table, Row) pair. key is the tableID.
// the key in map[int64]Row is the joined table handle, which represent a unique reference row.
// the value in map[int64]Row is the deleting row.
type tableRowMapType map[int64]*kv.HandleMap
type tableRowMapType map[int64]*kv.MemAwareHandleMap[[]types.Datum]
29 changes: 19 additions & 10 deletions executor/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ type UpdateExec struct {

// updatedRowKeys is a map for unique (TableAlias, handle) pair.
// The value is true if the row is changed, or false otherwise
updatedRowKeys map[int]*kv.HandleMap
updatedRowKeys map[int]*kv.MemAwareHandleMap[bool]
tblID2table map[int64]table.Table
// mergedRowData is a map for unique (Table, handle) pair.
// The value is cached table row
mergedRowData map[int64]*kv.HandleMap
mergedRowData map[int64]*kv.MemAwareHandleMap[[]types.Datum]
multiUpdateOnSameTable map[int64]bool

matched uint64 // a counter of matched rows during update
Expand All @@ -71,15 +71,15 @@ type UpdateExec struct {
// prepare `handles`, `tableUpdatable`, `changed` to avoid re-computations.
func (e *UpdateExec) prepare(row []types.Datum) (err error) {
if e.updatedRowKeys == nil {
e.updatedRowKeys = make(map[int]*kv.HandleMap)
e.updatedRowKeys = make(map[int]*kv.MemAwareHandleMap[bool])
}
e.handles = e.handles[:0]
e.tableUpdatable = e.tableUpdatable[:0]
e.changed = e.changed[:0]
e.matches = e.matches[:0]
for _, content := range e.tblColPosInfos {
if e.updatedRowKeys[content.Start] == nil {
e.updatedRowKeys[content.Start] = kv.NewHandleMap()
e.updatedRowKeys[content.Start] = kv.NewMemAwareHandleMap[bool]()
}
handle, err := content.HandleCols.BuildHandleByDatums(row)
if err != nil {
Expand All @@ -102,7 +102,7 @@ func (e *UpdateExec) prepare(row []types.Datum) (err error) {

changed, ok := e.updatedRowKeys[content.Start].Get(handle)
if ok {
e.changed = append(e.changed, changed.(bool))
e.changed = append(e.changed, changed)
e.matches = append(e.matches, false)
} else {
e.changed = append(e.changed, false)
Expand All @@ -114,7 +114,7 @@ func (e *UpdateExec) prepare(row []types.Datum) (err error) {

func (e *UpdateExec) merge(row, newData []types.Datum, mergeGenerated bool) error {
if e.mergedRowData == nil {
e.mergedRowData = make(map[int64]*kv.HandleMap)
e.mergedRowData = make(map[int64]*kv.MemAwareHandleMap[[]types.Datum])
}
var mergedData []types.Datum
// merge updates from and into mergedRowData
Expand All @@ -135,13 +135,13 @@ func (e *UpdateExec) merge(row, newData []types.Datum, mergeGenerated bool) erro
flags := e.assignFlag[content.Start:content.End]

if e.mergedRowData[content.TblID] == nil {
e.mergedRowData[content.TblID] = kv.NewHandleMap()
e.mergedRowData[content.TblID] = kv.NewMemAwareHandleMap[[]types.Datum]()
}
tbl := e.tblID2table[content.TblID]
oldData := row[content.Start:content.End]
newTableData := newData[content.Start:content.End]
if v, ok := e.mergedRowData[content.TblID].Get(handle); ok {
mergedData = v.([]types.Datum)
mergedData = v
for i, flag := range flags {
if tbl.WritableCols()[i].IsGenerated() != mergeGenerated {
continue
Expand All @@ -156,7 +156,10 @@ func (e *UpdateExec) merge(row, newData []types.Datum, mergeGenerated bool) erro
} else {
mergedData = append([]types.Datum{}, newTableData...)
}
e.mergedRowData[content.TblID].Set(handle, mergedData)

memDelta := e.mergedRowData[content.TblID].Set(handle, mergedData)
memDelta += types.EstimatedMemUsage(mergedData, 1) + int64(handle.ExtraMemSize())
e.memTracker.Consume(memDelta)
}
return nil
}
Expand Down Expand Up @@ -190,7 +193,12 @@ func (e *UpdateExec) exec(ctx context.Context, schema *expression.Schema, row, n
// Update row
changed, err1 := updateRecord(ctx, e.ctx, handle, oldData, newTableData, flags, tbl, false, e.memTracker)
if err1 == nil {
e.updatedRowKeys[content.Start].Set(handle, changed)
_, exist := e.updatedRowKeys[content.Start].Get(handle)
memDelta := e.updatedRowKeys[content.Start].Set(handle, changed)
if !exist {
memDelta += int64(handle.ExtraMemSize())
}
e.memTracker.Consume(memDelta)
continue
}

Expand Down Expand Up @@ -426,6 +434,7 @@ func (e *UpdateExec) Close() error {
txn.GetSnapshot().SetOption(kv.CollectRuntimeStats, nil)
}
}
defer e.memTracker.ReplaceBytesUsed(0)
return e.children[0].Close()
}

Expand Down
93 changes: 88 additions & 5 deletions kv/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/set"
)

// Key represents high-level Key type.
Expand Down Expand Up @@ -158,9 +159,15 @@ type Handle interface {
// String implements the fmt.Stringer interface.
String() string
// MemUsage returns the memory usage of a handle.
MemUsage() int64
MemUsage() uint64
// ExtraMemSize returns the memory usage of objects that are pointed to by the Handle.
ExtraMemSize() uint64
}

var _ Handle = IntHandle(0)
var _ Handle = &CommonHandle{}
var _ Handle = PartitionHandle{}

// IntHandle implement the Handle interface for int64 type handle.
type IntHandle int64

Expand Down Expand Up @@ -231,10 +238,15 @@ func (ih IntHandle) String() string {
}

// MemUsage implements the Handle interface.
func (ih IntHandle) MemUsage() int64 {
func (ih IntHandle) MemUsage() uint64 {
return 8
}

// ExtraMemSize implements the Handle interface.
func (ih IntHandle) ExtraMemSize() uint64 {
return 0
}

// CommonHandle implements the Handle interface for non-int64 type handle.
type CommonHandle struct {
encoded []byte
Expand Down Expand Up @@ -355,8 +367,15 @@ func (ch *CommonHandle) String() string {
}

// MemUsage implements the Handle interface.
func (ch *CommonHandle) MemUsage() int64 {
return int64(cap(ch.encoded)) + int64(cap(ch.colEndOffsets))*2
func (ch *CommonHandle) MemUsage() uint64 {
// 48 is used by the 2 slice fields.
return 48 + ch.ExtraMemSize()
}

// ExtraMemSize implements the Handle interface.
func (ch *CommonHandle) ExtraMemSize() uint64 {
// colEndOffsets is a slice of uint16.
return uint64(cap(ch.encoded) + cap(ch.colEndOffsets)*2)
}

// HandleMap is the map for Handle.
Expand Down Expand Up @@ -431,6 +450,65 @@ func (m *HandleMap) Range(fn func(h Handle, val interface{}) bool) {
}
}

// MemAwareHandleMap is similar to HandleMap, but it's aware of its memory usage and doesn't support delete.
// It only tracks the actual sizes. Objects that are pointed to by the key or value are not tracked.
// Those should be tracked by the caller.
type MemAwareHandleMap[V any] struct {
ints set.MemAwareMap[int64, V]
strs set.MemAwareMap[string, strHandleValue[V]]
}

type strHandleValue[V any] struct {
h Handle
val V
}

// NewMemAwareHandleMap creates a new map for handle.
func NewMemAwareHandleMap[V any]() *MemAwareHandleMap[V] {
// Initialize the two maps to avoid checking nil.
return &MemAwareHandleMap[V]{
ints: set.NewMemAwareMap[int64, V](),
strs: set.NewMemAwareMap[string, strHandleValue[V]](),
}
}

// Get gets a value by a Handle.
func (m *MemAwareHandleMap[V]) Get(h Handle) (v V, ok bool) {
if h.IsInt() {
v, ok = m.ints.Get(h.IntValue())
} else {
var strVal strHandleValue[V]
strVal, ok = m.strs.Get(string(h.Encoded()))
v = strVal.val
}
return
}

// Set sets a value with a Handle.
func (m *MemAwareHandleMap[V]) Set(h Handle, val V) int64 {
if h.IsInt() {
return m.ints.Set(h.IntValue(), val)
}
return m.strs.Set(string(h.Encoded()), strHandleValue[V]{
h: h,
val: val,
})
}

// Range iterates the MemAwareHandleMap with fn, the fn returns true to continue, returns false to stop.
func (m *MemAwareHandleMap[V]) Range(fn func(h Handle, val interface{}) bool) {
for h, val := range m.ints.M {
if !fn(IntHandle(h), val) {
return
}
}
for _, strVal := range m.strs.M {
if !fn(strVal.h, strVal.val) {
return
}
}
}

// PartitionHandle combines a handle and a PartitionID, used to location a row in partitioned table.
// Now only used in global index.
// TODO: support PartitionHandle in HandleMap.
Expand Down Expand Up @@ -470,6 +548,11 @@ func (ph PartitionHandle) Compare(h Handle) int {
}

// MemUsage implements the Handle interface.
func (ph PartitionHandle) MemUsage() int64 {
func (ph PartitionHandle) MemUsage() uint64 {
return ph.Handle.MemUsage() + 8
}

// ExtraMemSize implements the Handle interface.
func (ph PartitionHandle) ExtraMemSize() uint64 {
return ph.Handle.ExtraMemSize()
}
81 changes: 81 additions & 0 deletions kv/key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package kv_test
import (
"bytes"
"errors"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -221,3 +222,83 @@ func BenchmarkIsPoint(b *testing.B) {
kr.IsPoint()
}
}

var result int

var inputs = []struct {
input int
}{
{input: 1},
{input: 100},
{input: 10000},
{input: 1000000},
}

func memAwareIntMap(size int, handles []Handle) int {
var x int
m := NewMemAwareHandleMap[int]()
for j := 0; j < size; j++ {
m.Set(handles[j], j)
}
for j := 0; j < size; j++ {
x, _ = m.Get(handles[j])
}
return x
}

func nativeIntMap(size int, handles []Handle) int {
var x int
m := make(map[Handle]int)
for j := 0; j < size; j++ {
m[handles[j]] = j
}

for j := 0; j < size; j++ {
x = m[handles[j]]
}
return x
}

func BenchmarkMemAwareHandleMap(b *testing.B) {
var sc stmtctx.StatementContext
for _, s := range inputs {
handles := make([]Handle, s.input)
for i := 0; i < s.input; i++ {
if i%2 == 0 {
handles[i] = IntHandle(i)
} else {
handleBytes, _ := codec.EncodeKey(&sc, nil, types.NewIntDatum(int64(i)))
handles[i], _ = NewCommonHandle(handleBytes)
}
}
b.Run("MemAwareIntMap_"+strconv.Itoa(s.input), func(b *testing.B) {
var x int
for i := 0; i < b.N; i++ {
x = memAwareIntMap(s.input, handles)
}
result = x
})
}
}

func BenchmarkNativeHandleMap(b *testing.B) {
var sc stmtctx.StatementContext
for _, s := range inputs {
handles := make([]Handle, s.input)
for i := 0; i < s.input; i++ {
if i%2 == 0 {
handles[i] = IntHandle(i)
} else {
handleBytes, _ := codec.EncodeKey(&sc, nil, types.NewIntDatum(int64(i)))
handles[i], _ = NewCommonHandle(handleBytes)
}
}
b.Run("NativeIntMap_"+strconv.Itoa(s.input), func(b *testing.B) {
var x int
for i := 0; i < b.N; i++ {
x = nativeIntMap(s.input, handles)
}
result = x
})
}
}
2 changes: 1 addition & 1 deletion statistics/row_sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (i ReservoirRowSampleItem) MemUsage() (sum int64) {
sum += col.MemUsage()
}
if i.Handle != nil {
sum += i.Handle.MemUsage()
sum += int64(i.Handle.MemUsage())
}
return sum
}
Expand Down
5 changes: 5 additions & 0 deletions util/hack/hack.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,8 @@ const (
// DefBucketMemoryUsageForSetInt64 = bucketSize*(1+unsafe.Sizeof(int64) + unsafe.Sizeof(struct{}))+2*ptrSize
DefBucketMemoryUsageForSetInt64 = (8*(1+8+0) + 16) / 2 * 3
)

// EstimateBucketMemoryUsage returns the estimated memory usage of a bucket in a map.
func EstimateBucketMemoryUsage[K comparable, V any]() uint64 {
return (8*(1+uint64(unsafe.Sizeof(*new(K))+unsafe.Sizeof(*new(V)))) + 16) / 2 * 3
}
Loading

0 comments on commit 95e13af

Please sign in to comment.