Skip to content

Commit

Permalink
*:Record the time consuming of memory operation of Insert Executor in…
Browse files Browse the repository at this point in the history
… Runtime Information (#19574) (#20187)
  • Loading branch information
ti-srebot authored Oct 22, 2020
1 parent a0515bc commit 73563b0
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 16 deletions.
25 changes: 25 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6067,6 +6067,31 @@ func (s *testSuite) TestCollectDMLRuntimeStats(c *C) {
tk.MustQuery("select * from t1 for update").Check(testkit.Rows("5 6", "7 7"))
c.Assert(getRootStats(), Matches, "time.*lock_keys.*time.* region.* keys.* lock_rpc:.* rpc_count.*")
tk.MustExec("rollback")

tk.MustExec("begin pessimistic")
tk.MustExec("insert ignore into t1 values (9,9)")
c.Assert(getRootStats(), Matches, "time:.*, loops:.*, prepare:.*, check_insert:{total_time:.*, mem_insert_time:.*, prefetch:.*, rpc:{BatchGet:{num_rpc:.*, total_time:.*}}}.*")
tk.MustExec("rollback")

tk.MustExec("begin pessimistic")
tk.MustExec("insert into t1 values (10,10) on duplicate key update a=a+1")
c.Assert(getRootStats(), Matches, "time:.*, loops:.*, prepare:.*, check_insert:{total_time:.*, mem_insert_time:.*, prefetch:.*, rpc:{BatchGet:{num_rpc:.*, total_time:.*}.*")
tk.MustExec("rollback")

tk.MustExec("begin pessimistic")
tk.MustExec("insert into t1 values (1,2)")
c.Assert(getRootStats(), Matches, "time:.*, loops:.*, prepare:.*, insert:.*")
tk.MustExec("rollback")

tk.MustExec("begin pessimistic")
tk.MustExec("insert ignore into t1 values(11,11) on duplicate key update `a`=`a`+1")
c.Assert(getRootStats(), Matches, "time:.*, loops:.*, prepare:.*, check_insert:{total_time:.*, mem_insert_time:.*, prefetch:.*, rpc:.*}")
tk.MustExec("rollback")

tk.MustExec("begin pessimistic")
tk.MustExec("replace into t1 values (1,4)")
c.Assert(getRootStats(), Matches, "time:.*, loops:.*, prefetch:.*, rpc:.*")
tk.MustExec("rollback")
}

func (s *testSuite) TestIssue13758(c *C) {
Expand Down
16 changes: 14 additions & 2 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/hex"
"fmt"
"runtime/trace"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/parser/mysql"
Expand Down Expand Up @@ -82,6 +83,8 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
return err
}
} else {
e.collectRuntimeStatsEnabled()
start := time.Now()
for i, row := range rows {
var err error
if i == 0 {
Expand All @@ -93,6 +96,9 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
return err
}
}
if e.stats != nil {
e.stats.CheckInsertTime += time.Since(start)
}
}
e.memTracker.Consume(int64(txn.Size() - txnSize))
return nil
Expand Down Expand Up @@ -178,6 +184,7 @@ func (e *InsertExec) updateDupRow(ctx context.Context, txn kv.Transaction, row t
// batchUpdateDupRows updates multi-rows in batch if they are duplicate with rows in table.
func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.Datum) error {
// Get keys need to be checked.
start := time.Now()
toBeCheckedRows, err := getKeysNeedCheck(ctx, e.ctx, e.Table, newRows)
if err != nil {
return err
Expand All @@ -194,13 +201,15 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
defer snapshot.DelOption(kv.CollectRuntimeStats)
}
}

PrefetchStart := time.Now()
// Use BatchGet to fill cache.
// It's an optimization and could be removed without affecting correctness.
if err = prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil {
return err
}

if e.stats != nil {
e.stats.Prefetch += time.Since(PrefetchStart)
}
for i, r := range toBeCheckedRows {
if r.handleKey != nil {
handle, err := tablecodec.DecodeRowKey(r.handleKey.newKey)
Expand Down Expand Up @@ -258,6 +267,9 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
}
}
}
if e.stats != nil {
e.stats.CheckInsertTime += time.Since(start)
}
return nil
}

Expand Down
106 changes: 102 additions & 4 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@
package executor

import (
"bytes"
"context"
"fmt"
"math"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
Expand All @@ -31,6 +35,7 @@ import (
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"go.uber.org/zap"
Expand Down Expand Up @@ -74,7 +79,7 @@ type InsertValues struct {
lazyFillAutoID bool
memTracker *memory.Tracker

stats *runtimeStatsWithSnapshot
stats *InsertRuntimeStat
}

type defaultVal struct {
Expand Down Expand Up @@ -904,7 +909,6 @@ func (e *InsertValues) allocAutoRandomID(fieldType *types.FieldType) (int64, err
if err != nil {
return 0, err
}

layout := autoid.NewAutoRandomIDLayout(fieldType, tableInfo.AutoRandomBits)
if tables.OverflowShardBits(autoRandomID, tableInfo.AutoRandomBits, layout.TypeBitsLength, layout.HasSignBit) {
return 0, autoid.ErrAutoRandReadFailed
Expand Down Expand Up @@ -936,8 +940,11 @@ func (e *InsertValues) collectRuntimeStatsEnabled() bool {
if e.runtimeStats != nil {
if e.stats == nil {
snapshotStats := &tikv.SnapshotRuntimeStats{}
e.stats = &runtimeStatsWithSnapshot{
e.stats = &InsertRuntimeStat{
BasicRuntimeStats: e.runtimeStats,
SnapshotRuntimeStats: snapshotStats,
Prefetch: 0,
CheckInsertTime: 0,
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
Expand All @@ -951,7 +958,12 @@ func (e *InsertValues) collectRuntimeStatsEnabled() bool {
func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.Datum, addRecord func(ctx context.Context, row []types.Datum) (int64, error)) error {
// all the rows will be checked, so it is safe to set BatchCheck = true
e.ctx.GetSessionVars().StmtCtx.BatchCheck = true

if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("InsertValues.batchCheckAndInsert", opentracing.ChildOf(span.Context()))
defer span1.Finish()
opentracing.ContextWithSpan(ctx, span1)
}
start := time.Now()
// Get keys need to be checked.
toBeCheckedRows, err := getKeysNeedCheck(ctx, e.ctx, e.Table, rows)
if err != nil {
Expand All @@ -969,11 +981,15 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
defer snapshot.DelOption(kv.CollectRuntimeStats)
}
}
PrefetchStart := time.Now()

// Fill cache using BatchGet, the following Get requests don't need to visit TiKV.
if _, err = prefetchUniqueIndices(ctx, txn, toBeCheckedRows); err != nil {
return err
}
if e.stats != nil {
e.stats.Prefetch += time.Since(PrefetchStart)
}

// append warnings and get no duplicated error rows
for i, r := range toBeCheckedRows {
Expand All @@ -1000,6 +1016,7 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
return err
}
}

// If row was checked with no duplicate keys,
// it should be add to values map for the further row check.
// There may be duplicate keys inside the insert statement.
Expand All @@ -1011,6 +1028,9 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
}
}
}
if e.stats != nil {
e.stats.CheckInsertTime += time.Since(start)
}
return nil
}

Expand Down Expand Up @@ -1041,3 +1061,81 @@ func (e *InsertValues) addRecordWithAutoIDHint(ctx context.Context, row []types.
}
return h, nil
}

// InsertRuntimeStat record the runtime information
type InsertRuntimeStat struct {
*execdetails.BasicRuntimeStats
*tikv.SnapshotRuntimeStats
CheckInsertTime time.Duration
Prefetch time.Duration
}

func (e *InsertRuntimeStat) String() string {
if e.CheckInsertTime == 0 {
// For replace statement.
if e.Prefetch > 0 && e.SnapshotRuntimeStats != nil {
return fmt.Sprintf("prefetch: %v, rpc:{%v}", e.Prefetch, e.SnapshotRuntimeStats.String())
}
return ""
}
buf := bytes.NewBuffer(make([]byte, 0, 32))
buf.WriteString(fmt.Sprintf("prepare:%v, ", time.Duration(e.BasicRuntimeStats.GetTime())-e.CheckInsertTime))
if e.Prefetch > 0 {
buf.WriteString(fmt.Sprintf("check_insert:{total_time:%v, mem_insert_time:%v, prefetch:%v", e.CheckInsertTime, e.CheckInsertTime-e.Prefetch, e.Prefetch))
if e.SnapshotRuntimeStats != nil {
buf.WriteString(fmt.Sprintf(", rpc:{%s}", e.SnapshotRuntimeStats.String()))
}
buf.WriteString("}")
} else {
buf.WriteString(fmt.Sprintf("insert:%v", e.CheckInsertTime))
}
return buf.String()
}

// Clone implements the RuntimeStats interface.
func (e *InsertRuntimeStat) Clone() execdetails.RuntimeStats {
newRs := &InsertRuntimeStat{
CheckInsertTime: e.CheckInsertTime,
Prefetch: e.Prefetch,
}
if e.SnapshotRuntimeStats != nil {
snapshotStats := e.SnapshotRuntimeStats.Clone()
newRs.SnapshotRuntimeStats = snapshotStats.(*tikv.SnapshotRuntimeStats)
}
if e.BasicRuntimeStats != nil {
basicStats := e.BasicRuntimeStats.Clone()
newRs.BasicRuntimeStats = basicStats.(*execdetails.BasicRuntimeStats)
}
return newRs
}

// Merge implements the RuntimeStats interface.
func (e *InsertRuntimeStat) Merge(other execdetails.RuntimeStats) {
tmp, ok := other.(*InsertRuntimeStat)
if !ok {
return
}
if tmp.SnapshotRuntimeStats != nil {
if e.SnapshotRuntimeStats == nil {
snapshotStats := tmp.SnapshotRuntimeStats.Clone()
e.SnapshotRuntimeStats = snapshotStats.(*tikv.SnapshotRuntimeStats)
} else {
e.SnapshotRuntimeStats.Merge(tmp.SnapshotRuntimeStats)
}
}
if tmp.BasicRuntimeStats != nil {
if e.BasicRuntimeStats == nil {
basicStats := tmp.BasicRuntimeStats.Clone()
e.BasicRuntimeStats = basicStats.(*execdetails.BasicRuntimeStats)
} else {
e.BasicRuntimeStats.Merge(tmp.BasicRuntimeStats)
}
}
e.Prefetch += tmp.Prefetch
e.CheckInsertTime += tmp.CheckInsertTime
}

// Tp implements the RuntimeStats interface.
func (e *InsertRuntimeStat) Tp() int {
return execdetails.TpInsertRuntimeStat
}
17 changes: 17 additions & 0 deletions executor/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ import (
"strconv"
"strings"
"sync"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testutil"
)
Expand Down Expand Up @@ -1148,3 +1151,17 @@ func (s *testSuite9) TestIssue16366(c *C) {
c.Assert(err, NotNil)
c.Assert(strings.Contains(err.Error(), "Duplicate entry '0' for key 'PRIMARY'"), IsTrue, Commentf("%v", err))
}

func (s *testSuite9) TestInsertRuntimeStat(c *C) {
stats := &executor.InsertRuntimeStat{
BasicRuntimeStats: &execdetails.BasicRuntimeStats{},
SnapshotRuntimeStats: nil,
CheckInsertTime: 2 * time.Second,
Prefetch: 1 * time.Second,
}
stats.BasicRuntimeStats.Record(5*time.Second, 1)
c.Assert(stats.String(), Equals, "prepare:3s, check_insert:{total_time:2s, mem_insert_time:1s, prefetch:1s}")
c.Assert(stats.String(), Equals, stats.Clone().String())
stats.Merge(stats.Clone())
c.Assert(stats.String(), Equals, "prepare:6s, check_insert:{total_time:4s, mem_insert_time:2s, prefetch:2s}")
}
6 changes: 5 additions & 1 deletion executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"runtime/trace"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/parser/mysql"
Expand Down Expand Up @@ -201,13 +202,16 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error {
defer snapshot.DelOption(kv.CollectRuntimeStats)
}
}
prefetchStart := time.Now()

// Use BatchGet to fill cache.
// It's an optimization and could be removed without affecting correctness.
if err = prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil {
return err
}

if e.stats != nil {
e.stats.Prefetch = time.Since(prefetchStart)
}
e.ctx.GetSessionVars().StmtCtx.AddRecordRows(uint64(len(newRows)))
for _, r := range toBeCheckedRows {
err = e.replaceRow(ctx, r)
Expand Down
3 changes: 2 additions & 1 deletion store/tikv/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ func (s *Scanner) startTS() uint64 {
}

func (s *Scanner) resolveCurrentLock(bo *Backoffer, current *pb.KvPair) error {
val, err := s.snapshot.get(bo, kv.Key(current.Key))
ctx := context.Background()
val, err := s.snapshot.get(ctx, bo, current.Key)
if err != nil {
return errors.Trace(err)
}
Expand Down
15 changes: 7 additions & 8 deletions store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,19 +323,14 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll

// Get gets the value for key k from snapshot.
func (s *tikvSnapshot) Get(ctx context.Context, k kv.Key) ([]byte, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("tikvSnapshot.get", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}

defer func(start time.Time) {
tikvTxnCmdHistogramWithGet.Observe(time.Since(start).Seconds())
}(time.Now())

ctx = context.WithValue(ctx, txnStartKey, s.version.Ver)
bo := NewBackofferWithVars(ctx, getMaxBackoff, s.vars)
val, err := s.get(bo, k)
val, err := s.get(ctx, bo, k)
s.recordBackoffInfo(bo)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -351,7 +346,7 @@ func (s *tikvSnapshot) Get(ctx context.Context, k kv.Key) ([]byte, error) {
return val, nil
}

func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) {
func (s *tikvSnapshot) get(ctx context.Context, bo *Backoffer, k kv.Key) ([]byte, error) {
// Check the cached values first.
s.mu.RLock()
if s.mu.cached != nil {
Expand All @@ -361,7 +356,11 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) {
}
}
s.mu.RUnlock()

if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("tikvSnapshot.get", opentracing.ChildOf(span.Context()))
defer span1.Finish()
opentracing.ContextWithSpan(ctx, span1)
}
failpoint.Inject("snapshot-get-cache-fail", func(_ failpoint.Value) {
if bo.ctx.Value("TestSnapshotCache") != nil {
panic("cache miss")
Expand Down
Loading

0 comments on commit 73563b0

Please sign in to comment.