Skip to content

Commit

Permalink
*:Record the time consuming of memory operation of Insert Executor in…
Browse files Browse the repository at this point in the history
… Runtime Information (#19574)
  • Loading branch information
jyz0309 authored Sep 23, 2020
1 parent 92a04ef commit bb354b0
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 21 deletions.
22 changes: 21 additions & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6374,7 +6374,27 @@ func (s *testSuite) TestCollectDMLRuntimeStats(c *C) {

tk.MustExec("begin pessimistic")
tk.MustExec("insert ignore into t1 values (9,9)")
c.Assert(getRootStats(), Matches, "time:.*, loops:.*, BatchGet:{num_rpc:.*, total_time:.*}, lock_keys: {time:.*, region:.*, keys:.*, lock_rpc:.*, rpc_count:.*}")
c.Assert(getRootStats(), Matches, "time:.*, loops:.*, prepare:.*, check_insert:{total_time:.*, mem_insert_time:.*, prefetch:.*, rpc:{BatchGet:{num_rpc:.*, total_time:.*}}}.*")
tk.MustExec("rollback")

tk.MustExec("begin pessimistic")
tk.MustExec("insert into t1 values (10,10) on duplicate key update a=a+1")
c.Assert(getRootStats(), Matches, "time:.*, loops:.*, prepare:.*, check_insert:{total_time:.*, mem_insert_time:.*, prefetch:.*, rpc:{BatchGet:{num_rpc:.*, total_time:.*}.*")
tk.MustExec("rollback")

tk.MustExec("begin pessimistic")
tk.MustExec("insert into t1 values (1,2)")
c.Assert(getRootStats(), Matches, "time:.*, loops:.*, prepare:.*, insert:.*")
tk.MustExec("rollback")

tk.MustExec("begin pessimistic")
tk.MustExec("insert ignore into t1 values(11,11) on duplicate key update `a`=`a`+1")
c.Assert(getRootStats(), Matches, "time:.*, loops:.*, prepare:.*, check_insert:{total_time:.*, mem_insert_time:.*, prefetch:.*, rpc:.*}")
tk.MustExec("rollback")

tk.MustExec("begin pessimistic")
tk.MustExec("replace into t1 values (1,4)")
c.Assert(getRootStats(), Matches, "time:.*, loops:.*, prefetch:.*, rpc:.*")
tk.MustExec("rollback")
}

Expand Down
17 changes: 14 additions & 3 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/hex"
"fmt"
"runtime/trace"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/parser/mysql"
Expand Down Expand Up @@ -82,6 +83,8 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
return err
}
} else {
e.collectRuntimeStatsEnabled()
start := time.Now()
for i, row := range rows {
var err error
sizeHintStep := int(sessVars.ShardAllocateStep)
Expand All @@ -99,6 +102,9 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
return err
}
}
if e.stats != nil {
e.stats.checkInsertTime += time.Since(start)
}
}
e.memTracker.Consume(int64(txn.Size() - txnSize))
return nil
Expand Down Expand Up @@ -184,6 +190,7 @@ func (e *InsertExec) updateDupRow(ctx context.Context, txn kv.Transaction, row t
// batchUpdateDupRows updates multi-rows in batch if they are duplicate with rows in table.
func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.Datum) error {
// Get keys need to be checked.
start := time.Now()
toBeCheckedRows, err := getKeysNeedCheck(ctx, e.ctx, e.Table, newRows)
if err != nil {
return err
Expand All @@ -200,13 +207,15 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
defer snapshot.DelOption(kv.CollectRuntimeStats)
}
}

prefetchStart := time.Now()
// Use BatchGet to fill cache.
// It's an optimization and could be removed without affecting correctness.
if err = prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil {
return err
}

if e.stats != nil {
e.stats.prefetch += time.Since(prefetchStart)
}
for i, r := range toBeCheckedRows {
if r.handleKey != nil {
handle, err := tablecodec.DecodeRowKey(r.handleKey.newKey)
Expand Down Expand Up @@ -264,6 +273,9 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
}
}
}
if e.stats != nil {
e.stats.checkInsertTime += time.Since(start)
}
return nil
}

Expand Down Expand Up @@ -333,7 +345,6 @@ func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle kv.Handle, oldRo
// See http://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_values
e.curInsertVals.SetDatums(newRow...)
e.ctx.GetSessionVars().CurrInsertValues = e.curInsertVals.ToRow()

// NOTE: In order to execute the expression inside the column assignment,
// we have to put the value of "oldRow" before "newRow" in "row4Update" to
// be consistent with "Schema4OnDuplicate" in the "Insert" PhysicalPlan.
Expand Down
95 changes: 89 additions & 6 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@
package executor

import (
"bytes"
"context"
"fmt"
"math"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
Expand All @@ -31,6 +35,7 @@ import (
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"go.uber.org/zap"
Expand Down Expand Up @@ -74,7 +79,7 @@ type InsertValues struct {
lazyFillAutoID bool
memTracker *memory.Tracker

stats *runtimeStatsWithSnapshot
stats *insertRuntimeStat
}

type defaultVal struct {
Expand Down Expand Up @@ -901,7 +906,6 @@ func (e *InsertValues) allocAutoRandomID(fieldType *types.FieldType) (int64, err
if err != nil {
return 0, err
}

layout := autoid.NewAutoRandomIDLayout(fieldType, tableInfo.AutoRandomBits)
if tables.OverflowShardBits(autoRandomID, tableInfo.AutoRandomBits, layout.TypeBitsLength, layout.HasSignBit) {
return 0, autoid.ErrAutoRandReadFailed
Expand Down Expand Up @@ -933,8 +937,11 @@ func (e *InsertValues) collectRuntimeStatsEnabled() bool {
if e.runtimeStats != nil {
if e.stats == nil {
snapshotStats := &tikv.SnapshotRuntimeStats{}
e.stats = &runtimeStatsWithSnapshot{
e.stats = &insertRuntimeStat{
BasicRuntimeStats: e.runtimeStats,
SnapshotRuntimeStats: snapshotStats,
prefetch: 0,
checkInsertTime: 0,
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
Expand All @@ -948,7 +955,12 @@ func (e *InsertValues) collectRuntimeStatsEnabled() bool {
func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.Datum, addRecord func(ctx context.Context, row []types.Datum) error) error {
// all the rows will be checked, so it is safe to set BatchCheck = true
e.ctx.GetSessionVars().StmtCtx.BatchCheck = true

if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("InsertValues.batchCheckAndInsert", opentracing.ChildOf(span.Context()))
defer span1.Finish()
opentracing.ContextWithSpan(ctx, span1)
}
start := time.Now()
// Get keys need to be checked.
toBeCheckedRows, err := getKeysNeedCheck(ctx, e.ctx, e.Table, rows)
if err != nil {
Expand All @@ -959,18 +971,20 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
if err != nil {
return err
}

if e.collectRuntimeStatsEnabled() {
if snapshot := txn.GetSnapshot(); snapshot != nil {
snapshot.SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
defer snapshot.DelOption(kv.CollectRuntimeStats)
}
}

prefetchStart := time.Now()
// Fill cache using BatchGet, the following Get requests don't need to visit TiKV.
if _, err = prefetchUniqueIndices(ctx, txn, toBeCheckedRows); err != nil {
return err
}
if e.stats != nil {
e.stats.prefetch += time.Since(prefetchStart)
}

// append warnings and get no duplicated error rows
for i, r := range toBeCheckedRows {
Expand All @@ -997,6 +1011,7 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
return err
}
}

// If row was checked with no duplicate keys,
// it should be add to values map for the further row check.
// There may be duplicate keys inside the insert statement.
Expand All @@ -1008,6 +1023,9 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
}
}
}
if e.stats != nil {
e.stats.checkInsertTime += time.Since(start)
}
return nil
}

Expand All @@ -1034,3 +1052,68 @@ func (e *InsertValues) addRecordWithAutoIDHint(ctx context.Context, row []types.
}
return nil
}

type insertRuntimeStat struct {
*execdetails.BasicRuntimeStats
*tikv.SnapshotRuntimeStats
checkInsertTime time.Duration
prefetch time.Duration
}

func (e *insertRuntimeStat) String() string {
if e.checkInsertTime == 0 {
// For replace statement.
if e.prefetch > 0 && e.SnapshotRuntimeStats != nil {
return fmt.Sprintf("prefetch: %v, rpc:{%v}", e.prefetch, e.SnapshotRuntimeStats.String())
}
return ""
}
buf := bytes.NewBuffer(make([]byte, 0, 32))
buf.WriteString(fmt.Sprintf("prepare:%v, ", time.Duration(e.BasicRuntimeStats.GetTime())-e.checkInsertTime))
if e.prefetch > 0 {
buf.WriteString(fmt.Sprintf("check_insert:{total_time:%v, mem_insert_time:%v, prefetch:%v", e.checkInsertTime, e.checkInsertTime-e.prefetch, e.prefetch))
if e.SnapshotRuntimeStats != nil {
buf.WriteString(fmt.Sprintf(", rpc:{%s}", e.SnapshotRuntimeStats.String()))
}
buf.WriteString("}")
} else {
buf.WriteString(fmt.Sprintf("insert:%v", e.checkInsertTime))
}
return buf.String()
}

// Clone implements the RuntimeStats interface.
func (e *insertRuntimeStat) Clone() execdetails.RuntimeStats {
newRs := &insertRuntimeStat{
checkInsertTime: e.checkInsertTime,
prefetch: e.prefetch,
}
if e.SnapshotRuntimeStats != nil {
snapshotStats := e.SnapshotRuntimeStats.Clone()
newRs.SnapshotRuntimeStats = snapshotStats.(*tikv.SnapshotRuntimeStats)
}
return newRs
}

// Merge implements the RuntimeStats interface.
func (e *insertRuntimeStat) Merge(other execdetails.RuntimeStats) {
tmp, ok := other.(*insertRuntimeStat)
if !ok {
return
}
if tmp.SnapshotRuntimeStats != nil {
if e.SnapshotRuntimeStats == nil {
snapshotStats := tmp.SnapshotRuntimeStats.Clone()
e.SnapshotRuntimeStats = snapshotStats.(*tikv.SnapshotRuntimeStats)
} else {
e.SnapshotRuntimeStats.Merge(tmp.SnapshotRuntimeStats)
}
}
e.prefetch += tmp.prefetch
e.checkInsertTime += tmp.checkInsertTime
}

// Tp implements the RuntimeStats interface.
func (e *insertRuntimeStat) Tp() int {
return execdetails.TpInsertRuntimeStat
}
7 changes: 5 additions & 2 deletions executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"runtime/trace"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/parser/mysql"
Expand Down Expand Up @@ -200,13 +201,15 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error {
defer snapshot.DelOption(kv.CollectRuntimeStats)
}
}

prefetchStart := time.Now()
// Use BatchGet to fill cache.
// It's an optimization and could be removed without affecting correctness.
if err = prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil {
return err
}

if e.stats != nil {
e.stats.prefetch = time.Since(prefetchStart)
}
e.ctx.GetSessionVars().StmtCtx.AddRecordRows(uint64(len(newRows)))
for _, r := range toBeCheckedRows {
err = e.replaceRow(ctx, r)
Expand Down
3 changes: 2 additions & 1 deletion store/tikv/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ func (s *Scanner) startTS() uint64 {
}

func (s *Scanner) resolveCurrentLock(bo *Backoffer, current *pb.KvPair) error {
val, err := s.snapshot.get(bo, current.Key)
ctx := context.Background()
val, err := s.snapshot.get(ctx, bo, current.Key)
if err != nil {
return errors.Trace(err)
}
Expand Down
15 changes: 7 additions & 8 deletions store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,19 +317,14 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll

// Get gets the value for key k from snapshot.
func (s *tikvSnapshot) Get(ctx context.Context, k kv.Key) ([]byte, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("tikvSnapshot.get", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}

defer func(start time.Time) {
tikvTxnCmdHistogramWithGet.Observe(time.Since(start).Seconds())
}(time.Now())

ctx = context.WithValue(ctx, txnStartKey, s.version.Ver)
bo := NewBackofferWithVars(ctx, getMaxBackoff, s.vars)
val, err := s.get(bo, k)
val, err := s.get(ctx, bo, k)
s.recordBackoffInfo(bo)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -345,7 +340,7 @@ func (s *tikvSnapshot) Get(ctx context.Context, k kv.Key) ([]byte, error) {
return val, nil
}

func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) {
func (s *tikvSnapshot) get(ctx context.Context, bo *Backoffer, k kv.Key) ([]byte, error) {
// Check the cached values first.
s.mu.RLock()
if s.mu.cached != nil {
Expand All @@ -356,7 +351,11 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) {
}
}
s.mu.RUnlock()

if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("tikvSnapshot.get", opentracing.ChildOf(span.Context()))
defer span1.Finish()
opentracing.ContextWithSpan(ctx, span1)
}
failpoint.Inject("snapshot-get-cache-fail", func(_ failpoint.Value) {
if bo.ctx.Value("TestSnapshotCache") != nil {
panic("cache miss")
Expand Down
7 changes: 7 additions & 0 deletions util/execdetails/execdetails.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,8 @@ const (
TpJoinRuntimeStats
// TpSelectResultRuntimeStats is the tp for SelectResultRuntimeStats.
TpSelectResultRuntimeStats
// TpInsertRuntimeStat is the tp for InsertRuntimeStat
TpInsertRuntimeStat
)

// RuntimeStats is used to express the executor runtime information.
Expand Down Expand Up @@ -548,6 +550,11 @@ func (e *BasicRuntimeStats) String() string {
return fmt.Sprintf("time:%v, loops:%d", time.Duration(e.consume), e.loop)
}

// GetTime get the int64 total time
func (e *BasicRuntimeStats) GetTime() int64 {
return e.consume
}

// RuntimeStatsColl collects executors's execution info.
type RuntimeStatsColl struct {
mu sync.Mutex
Expand Down

0 comments on commit bb354b0

Please sign in to comment.