Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*:Record the time consuming of memory operation of Insert Executor in Runtime Information #19574

Merged
merged 54 commits into from
Sep 23, 2020
Merged
Show file tree
Hide file tree
Changes from 52 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
d4d75a8
init
jyz0309 Aug 28, 2020
7b7b7c8
update trace
jyz0309 Aug 28, 2020
c8287b9
update trace batchCheckAndInsert
jyz0309 Aug 28, 2020
278fc96
check-dev
jyz0309 Aug 28, 2020
6112068
update comment
jyz0309 Aug 28, 2020
ffec1b5
init
jyz0309 Aug 28, 2020
27b4732
update the duration about check and insert
jyz0309 Aug 28, 2020
09959fa
test and make ci good
jyz0309 Aug 28, 2020
5bc2779
get line
jyz0309 Aug 28, 2020
b295c62
init
jyz0309 Sep 1, 2020
b76a420
debug and pass the unit-test
jyz0309 Sep 2, 2020
9182f03
pass the test
jyz0309 Sep 2, 2020
abd58cf
init
jyz0309 Sep 3, 2020
a9bfd2f
init
jyz0309 Sep 8, 2020
9a5dc57
add insertRuntimeStat
jyz0309 Sep 8, 2020
b1a7704
pass check_dev
jyz0309 Sep 9, 2020
56179e3
add GetTime func
jyz0309 Sep 9, 2020
c945b83
avoid panic
jyz0309 Sep 9, 2020
841d1ad
init stats try
jyz0309 Sep 10, 2020
cd1f440
make ci good
jyz0309 Sep 10, 2020
23dc5eb
test before update
jyz0309 Sep 10, 2020
83054f5
test
jyz0309 Sep 10, 2020
c4254c1
Merge branch 'master' into issue-19512
crazycs520 Sep 10, 2020
087e24e
init
jyz0309 Sep 10, 2020
6821421
Merge branch 'issue-19512' of github.com:jyz0309/tidb into issue-19512
jyz0309 Sep 10, 2020
7482a8d
update time way
jyz0309 Sep 11, 2020
2c3bbc6
change var name
jyz0309 Sep 11, 2020
79f6265
Merge branch 'master' into issue-19512
jyz0309 Sep 11, 2020
9f948e3
init
jyz0309 Sep 11, 2020
0fb6abe
update test
jyz0309 Sep 11, 2020
a8aa56b
update
jyz0309 Sep 11, 2020
8229bdf
Merge branch 'master' into issue-19512
jyz0309 Sep 11, 2020
84ad780
make ci happy
jyz0309 Sep 11, 2020
c884b60
Merge branch 'issue-19512' of github.com:jyz0309/tidb into issue-19512
jyz0309 Sep 11, 2020
e2cc549
add method
jyz0309 Sep 12, 2020
20cd97d
add method
jyz0309 Sep 12, 2020
d169dcd
add test
jyz0309 Sep 13, 2020
d5678c7
remove useless line
jyz0309 Sep 13, 2020
fe83e3b
update the String()
jyz0309 Sep 15, 2020
29f5893
Merge branch 'master' into issue-19512
jyz0309 Sep 15, 2020
59cde1b
make ci happy
jyz0309 Sep 15, 2020
91e3483
update the insert ignore
jyz0309 Sep 15, 2020
519c894
Merge branch 'master' into issue-19512
jyz0309 Sep 15, 2020
d3fb5d1
optimize
jyz0309 Sep 16, 2020
4ab1d43
Merge branch 'issue-19512' of github.com:jyz0309/tidb into issue-19512
jyz0309 Sep 16, 2020
158a73f
update test
jyz0309 Sep 16, 2020
a5f4161
to avoid panic
jyz0309 Sep 17, 2020
29dde7c
change var name
jyz0309 Sep 17, 2020
ca4b1e4
Merge branch 'master' into issue-19512
crazycs520 Sep 17, 2020
a49c881
Merge branch 'master' into issue-19512
qw4990 Sep 17, 2020
7ad372e
add replace and insert ignore on duplicate key test
jyz0309 Sep 18, 2020
8b12410
Merge branch 'issue-19512' of github.com:jyz0309/tidb into issue-19512
jyz0309 Sep 18, 2020
648c7e6
Merge branch 'master' into issue-19512
crazycs520 Sep 21, 2020
ad377c1
Merge branch 'master' into issue-19512
qw4990 Sep 23, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6376,7 +6376,27 @@ func (s *testSuite) TestCollectDMLRuntimeStats(c *C) {

tk.MustExec("begin pessimistic")
tk.MustExec("insert ignore into t1 values (9,9)")
c.Assert(getRootStats(), Matches, "time:.*, loops:.*, BatchGet:{num_rpc:.*, total_time:.*}, lock_keys: {time:.*, region:.*, keys:.*, lock_rpc:.*, rpc_count:.*}")
c.Assert(getRootStats(), Matches, "time:.*, loops:.*, prepare:.*, check_insert:{total_time:.*, mem_insert_time:.*, prefetch:.*, rpc:{BatchGet:{num_rpc:.*, total_time:.*}}}.*")
tk.MustExec("rollback")

tk.MustExec("begin pessimistic")
tk.MustExec("insert into t1 values (10,10) on duplicate key update a=a+1")
c.Assert(getRootStats(), Matches, "time:.*, loops:.*, prepare:.*, check_insert:{total_time:.*, mem_insert_time:.*, prefetch:.*, rpc:{BatchGet:{num_rpc:.*, total_time:.*}.*")
tk.MustExec("rollback")

tk.MustExec("begin pessimistic")
tk.MustExec("insert into t1 values (1,2)")
c.Assert(getRootStats(), Matches, "time:.*, loops:.*, prepare:.*, insert:.*")
jyz0309 marked this conversation as resolved.
Show resolved Hide resolved
tk.MustExec("rollback")

tk.MustExec("begin pessimistic")
tk.MustExec("insert ignore into t1 values(11,11) on duplicate key update `a`=`a`+1")
c.Assert(getRootStats(), Matches, "time:.*, loops:.*, prepare:.*, check_insert:{total_time:.*, mem_insert_time:.*, prefetch:.*, rpc:.*}")
tk.MustExec("rollback")

tk.MustExec("begin pessimistic")
tk.MustExec("replace into t1 values (1,4)")
c.Assert(getRootStats(), Matches, "time:.*, loops:.*, prefetch:.*, rpc:.*")
tk.MustExec("rollback")
}

Expand Down
17 changes: 14 additions & 3 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/hex"
"fmt"
"runtime/trace"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/parser/mysql"
Expand Down Expand Up @@ -82,6 +83,8 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
return err
}
} else {
e.collectRuntimeStatsEnabled()
start := time.Now()
for i, row := range rows {
var err error
sizeHintStep := int(sessVars.ShardAllocateStep)
Expand All @@ -99,6 +102,9 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
return err
}
}
if e.stats != nil {
e.stats.checkInsertTime += time.Since(start)
}
}
e.memTracker.Consume(int64(txn.Size() - txnSize))
return nil
Expand Down Expand Up @@ -184,6 +190,7 @@ func (e *InsertExec) updateDupRow(ctx context.Context, txn kv.Transaction, row t
// batchUpdateDupRows updates multi-rows in batch if they are duplicate with rows in table.
func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.Datum) error {
// Get keys need to be checked.
start := time.Now()
toBeCheckedRows, err := getKeysNeedCheck(ctx, e.ctx, e.Table, newRows)
if err != nil {
return err
Expand All @@ -200,13 +207,15 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
defer snapshot.DelOption(kv.CollectRuntimeStats)
}
}

prefetchStart := time.Now()
// Use BatchGet to fill cache.
// It's an optimization and could be removed without affecting correctness.
if err = prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil {
return err
}

if e.stats != nil {
e.stats.prefetch += time.Since(prefetchStart)
}
for i, r := range toBeCheckedRows {
if r.handleKey != nil {
handle, err := tablecodec.DecodeRowKey(r.handleKey.newKey)
Expand Down Expand Up @@ -264,6 +273,9 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
}
}
}
if e.stats != nil {
e.stats.checkInsertTime += time.Since(start)
}
return nil
}

Expand Down Expand Up @@ -333,7 +345,6 @@ func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle kv.Handle, oldRo
// See http://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_values
e.curInsertVals.SetDatums(newRow...)
e.ctx.GetSessionVars().CurrInsertValues = e.curInsertVals.ToRow()

// NOTE: In order to execute the expression inside the column assignment,
// we have to put the value of "oldRow" before "newRow" in "row4Update" to
// be consistent with "Schema4OnDuplicate" in the "Insert" PhysicalPlan.
Expand Down
95 changes: 89 additions & 6 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@
package executor

import (
"bytes"
"context"
"fmt"
"math"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
Expand All @@ -31,6 +35,7 @@ import (
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"go.uber.org/zap"
Expand Down Expand Up @@ -74,7 +79,7 @@ type InsertValues struct {
lazyFillAutoID bool
memTracker *memory.Tracker

stats *runtimeStatsWithSnapshot
stats *insertRuntimeStat
}

type defaultVal struct {
Expand Down Expand Up @@ -901,7 +906,6 @@ func (e *InsertValues) allocAutoRandomID(fieldType *types.FieldType) (int64, err
if err != nil {
return 0, err
}

layout := autoid.NewAutoRandomIDLayout(fieldType, tableInfo.AutoRandomBits)
if tables.OverflowShardBits(autoRandomID, tableInfo.AutoRandomBits, layout.TypeBitsLength, layout.HasSignBit) {
return 0, autoid.ErrAutoRandReadFailed
Expand Down Expand Up @@ -933,8 +937,11 @@ func (e *InsertValues) collectRuntimeStatsEnabled() bool {
if e.runtimeStats != nil {
if e.stats == nil {
snapshotStats := &tikv.SnapshotRuntimeStats{}
e.stats = &runtimeStatsWithSnapshot{
e.stats = &insertRuntimeStat{
BasicRuntimeStats: e.runtimeStats,
SnapshotRuntimeStats: snapshotStats,
prefetch: 0,
checkInsertTime: 0,
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
Expand All @@ -948,7 +955,12 @@ func (e *InsertValues) collectRuntimeStatsEnabled() bool {
func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.Datum, addRecord func(ctx context.Context, row []types.Datum) error) error {
jyz0309 marked this conversation as resolved.
Show resolved Hide resolved
jyz0309 marked this conversation as resolved.
Show resolved Hide resolved
// all the rows will be checked, so it is safe to set BatchCheck = true
e.ctx.GetSessionVars().StmtCtx.BatchCheck = true

if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
jyz0309 marked this conversation as resolved.
Show resolved Hide resolved
span1 := span.Tracer().StartSpan("InsertValues.batchCheckAndInsert", opentracing.ChildOf(span.Context()))
defer span1.Finish()
opentracing.ContextWithSpan(ctx, span1)
}
start := time.Now()
jyz0309 marked this conversation as resolved.
Show resolved Hide resolved
// Get keys need to be checked.
toBeCheckedRows, err := getKeysNeedCheck(ctx, e.ctx, e.Table, rows)
if err != nil {
Expand All @@ -959,18 +971,20 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
if err != nil {
return err
}

if e.collectRuntimeStatsEnabled() {
if snapshot := txn.GetSnapshot(); snapshot != nil {
snapshot.SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
defer snapshot.DelOption(kv.CollectRuntimeStats)
}
}

prefetchStart := time.Now()
// Fill cache using BatchGet, the following Get requests don't need to visit TiKV.
if _, err = prefetchUniqueIndices(ctx, txn, toBeCheckedRows); err != nil {
return err
}
if e.stats != nil {
e.stats.prefetch += time.Since(prefetchStart)
}

// append warnings and get no duplicated error rows
for i, r := range toBeCheckedRows {
Expand All @@ -997,6 +1011,7 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
return err
}
}

qw4990 marked this conversation as resolved.
Show resolved Hide resolved
// If row was checked with no duplicate keys,
// it should be add to values map for the further row check.
// There may be duplicate keys inside the insert statement.
Expand All @@ -1008,6 +1023,9 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
}
}
}
if e.stats != nil {
e.stats.checkInsertTime += time.Since(start)
}
return nil
}

Expand All @@ -1034,3 +1052,68 @@ func (e *InsertValues) addRecordWithAutoIDHint(ctx context.Context, row []types.
}
return nil
}

type insertRuntimeStat struct {
*execdetails.BasicRuntimeStats
*tikv.SnapshotRuntimeStats
checkInsertTime time.Duration
prefetch time.Duration
}

func (e *insertRuntimeStat) String() string {
if e.checkInsertTime == 0 {
// For replace statement.
if e.prefetch > 0 && e.SnapshotRuntimeStats != nil {
return fmt.Sprintf("prefetch: %v, rpc:{%v}", e.prefetch, e.SnapshotRuntimeStats.String())
}
return ""
}
buf := bytes.NewBuffer(make([]byte, 0, 32))
buf.WriteString(fmt.Sprintf("prepare:%v, ", time.Duration(e.BasicRuntimeStats.GetTime())-e.checkInsertTime))
if e.prefetch > 0 {
buf.WriteString(fmt.Sprintf("check_insert:{total_time:%v, mem_insert_time:%v, prefetch:%v", e.checkInsertTime, e.checkInsertTime-e.prefetch, e.prefetch))
if e.SnapshotRuntimeStats != nil {
buf.WriteString(fmt.Sprintf(", rpc:{%s}", e.SnapshotRuntimeStats.String()))
}
buf.WriteString("}")
} else {
buf.WriteString(fmt.Sprintf("insert:%v", e.checkInsertTime))
}
return buf.String()
}

// Clone implements the RuntimeStats interface.
func (e *insertRuntimeStat) Clone() execdetails.RuntimeStats {
newRs := &insertRuntimeStat{
checkInsertTime: e.checkInsertTime,
prefetch: e.prefetch,
}
if e.SnapshotRuntimeStats != nil {
snapshotStats := e.SnapshotRuntimeStats.Clone()
newRs.SnapshotRuntimeStats = snapshotStats.(*tikv.SnapshotRuntimeStats)
}
return newRs
}

// Merge implements the RuntimeStats interface.
func (e *insertRuntimeStat) Merge(other execdetails.RuntimeStats) {
tmp, ok := other.(*insertRuntimeStat)
if !ok {
return
}
if tmp.SnapshotRuntimeStats != nil {
if e.SnapshotRuntimeStats == nil {
snapshotStats := tmp.SnapshotRuntimeStats.Clone()
e.SnapshotRuntimeStats = snapshotStats.(*tikv.SnapshotRuntimeStats)
} else {
e.SnapshotRuntimeStats.Merge(tmp.SnapshotRuntimeStats)
}
}
e.prefetch += tmp.prefetch
e.checkInsertTime += tmp.checkInsertTime
}

// Tp implements the RuntimeStats interface.
func (e *insertRuntimeStat) Tp() int {
return execdetails.TpInsertRuntimeStat
}
7 changes: 5 additions & 2 deletions executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"runtime/trace"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/parser/mysql"
Expand Down Expand Up @@ -200,13 +201,15 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error {
defer snapshot.DelOption(kv.CollectRuntimeStats)
}
}

prefetchStart := time.Now()
// Use BatchGet to fill cache.
// It's an optimization and could be removed without affecting correctness.
if err = prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil {
return err
}

if e.stats != nil {
e.stats.prefetch = time.Since(prefetchStart)
}
e.ctx.GetSessionVars().StmtCtx.AddRecordRows(uint64(len(newRows)))
for _, r := range toBeCheckedRows {
err = e.replaceRow(ctx, r)
Expand Down
3 changes: 2 additions & 1 deletion store/tikv/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ func (s *Scanner) startTS() uint64 {
}

func (s *Scanner) resolveCurrentLock(bo *Backoffer, current *pb.KvPair) error {
val, err := s.snapshot.get(bo, current.Key)
ctx := context.Background()
val, err := s.snapshot.get(ctx, bo, current.Key)
if err != nil {
return errors.Trace(err)
}
Expand Down
15 changes: 7 additions & 8 deletions store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,19 +317,14 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll

// Get gets the value for key k from snapshot.
func (s *tikvSnapshot) Get(ctx context.Context, k kv.Key) ([]byte, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("tikvSnapshot.get", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}

defer func(start time.Time) {
tikvTxnCmdHistogramWithGet.Observe(time.Since(start).Seconds())
}(time.Now())

ctx = context.WithValue(ctx, txnStartKey, s.version.Ver)
bo := NewBackofferWithVars(ctx, getMaxBackoff, s.vars)
val, err := s.get(bo, k)
val, err := s.get(ctx, bo, k)
s.recordBackoffInfo(bo)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -345,7 +340,7 @@ func (s *tikvSnapshot) Get(ctx context.Context, k kv.Key) ([]byte, error) {
return val, nil
}

func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) {
func (s *tikvSnapshot) get(ctx context.Context, bo *Backoffer, k kv.Key) ([]byte, error) {
// Check the cached values first.
s.mu.RLock()
if s.mu.cached != nil {
Expand All @@ -356,7 +351,11 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) {
}
}
s.mu.RUnlock()

if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
jyz0309 marked this conversation as resolved.
Show resolved Hide resolved
span1 := span.Tracer().StartSpan("tikvSnapshot.get", opentracing.ChildOf(span.Context()))
defer span1.Finish()
opentracing.ContextWithSpan(ctx, span1)
}
failpoint.Inject("snapshot-get-cache-fail", func(_ failpoint.Value) {
if bo.ctx.Value("TestSnapshotCache") != nil {
panic("cache miss")
Expand Down
7 changes: 7 additions & 0 deletions util/execdetails/execdetails.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,8 @@ const (
TpJoinRuntimeStats
// TpSelectResultRuntimeStats is the tp for SelectResultRuntimeStats.
TpSelectResultRuntimeStats
// TpInsertRuntimeStat is the tp for InsertRuntimeStat
TpInsertRuntimeStat
)

// RuntimeStats is used to express the executor runtime information.
Expand Down Expand Up @@ -548,6 +550,11 @@ func (e *BasicRuntimeStats) String() string {
return fmt.Sprintf("time:%v, loops:%d", time.Duration(e.consume), e.loop)
}

// GetTime get the int64 total time
func (e *BasicRuntimeStats) GetTime() int64 {
return e.consume
}

// RuntimeStatsColl collects executors's execution info.
type RuntimeStatsColl struct {
mu sync.Mutex
Expand Down