From 73563b0fd0bb50575d28aacb4e4ee01473afe007 Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Thu, 22 Oct 2020 13:07:34 +0800 Subject: [PATCH] *:Record the time consuming of memory operation of Insert Executor in Runtime Information (#19574) (#20187) --- executor/executor_test.go | 25 ++++++++ executor/insert.go | 16 ++++- executor/insert_common.go | 106 ++++++++++++++++++++++++++++++-- executor/insert_test.go | 17 +++++ executor/replace.go | 6 +- store/tikv/scan.go | 3 +- store/tikv/snapshot.go | 15 +++-- util/execdetails/execdetails.go | 7 +++ 8 files changed, 179 insertions(+), 16 deletions(-) diff --git a/executor/executor_test.go b/executor/executor_test.go index 53ef80339ce59..d55ff0428014a 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -6067,6 +6067,31 @@ func (s *testSuite) TestCollectDMLRuntimeStats(c *C) { tk.MustQuery("select * from t1 for update").Check(testkit.Rows("5 6", "7 7")) c.Assert(getRootStats(), Matches, "time.*lock_keys.*time.* region.* keys.* lock_rpc:.* rpc_count.*") tk.MustExec("rollback") + + tk.MustExec("begin pessimistic") + tk.MustExec("insert ignore into t1 values (9,9)") + c.Assert(getRootStats(), Matches, "time:.*, loops:.*, prepare:.*, check_insert:{total_time:.*, mem_insert_time:.*, prefetch:.*, rpc:{BatchGet:{num_rpc:.*, total_time:.*}}}.*") + tk.MustExec("rollback") + + tk.MustExec("begin pessimistic") + tk.MustExec("insert into t1 values (10,10) on duplicate key update a=a+1") + c.Assert(getRootStats(), Matches, "time:.*, loops:.*, prepare:.*, check_insert:{total_time:.*, mem_insert_time:.*, prefetch:.*, rpc:{BatchGet:{num_rpc:.*, total_time:.*}.*") + tk.MustExec("rollback") + + tk.MustExec("begin pessimistic") + tk.MustExec("insert into t1 values (1,2)") + c.Assert(getRootStats(), Matches, "time:.*, loops:.*, prepare:.*, insert:.*") + tk.MustExec("rollback") + + tk.MustExec("begin pessimistic") + tk.MustExec("insert ignore into t1 values(11,11) on duplicate key update `a`=`a`+1") + c.Assert(getRootStats(), Matches, "time:.*, loops:.*, prepare:.*, check_insert:{total_time:.*, mem_insert_time:.*, prefetch:.*, rpc:.*}") + tk.MustExec("rollback") + + tk.MustExec("begin pessimistic") + tk.MustExec("replace into t1 values (1,4)") + c.Assert(getRootStats(), Matches, "time:.*, loops:.*, prefetch:.*, rpc:.*") + tk.MustExec("rollback") } func (s *testSuite) TestIssue13758(c *C) { diff --git a/executor/insert.go b/executor/insert.go index 22ff24bd15e78..7913142a55cef 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -18,6 +18,7 @@ import ( "encoding/hex" "fmt" "runtime/trace" + "time" "github.com/opentracing/opentracing-go" "github.com/pingcap/parser/mysql" @@ -82,6 +83,8 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error { return err } } else { + e.collectRuntimeStatsEnabled() + start := time.Now() for i, row := range rows { var err error if i == 0 { @@ -93,6 +96,9 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error { return err } } + if e.stats != nil { + e.stats.CheckInsertTime += time.Since(start) + } } e.memTracker.Consume(int64(txn.Size() - txnSize)) return nil @@ -178,6 +184,7 @@ func (e *InsertExec) updateDupRow(ctx context.Context, txn kv.Transaction, row t // batchUpdateDupRows updates multi-rows in batch if they are duplicate with rows in table. func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.Datum) error { // Get keys need to be checked. + start := time.Now() toBeCheckedRows, err := getKeysNeedCheck(ctx, e.ctx, e.Table, newRows) if err != nil { return err @@ -194,13 +201,15 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D defer snapshot.DelOption(kv.CollectRuntimeStats) } } - + PrefetchStart := time.Now() // Use BatchGet to fill cache. // It's an optimization and could be removed without affecting correctness. if err = prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil { return err } - + if e.stats != nil { + e.stats.Prefetch += time.Since(PrefetchStart) + } for i, r := range toBeCheckedRows { if r.handleKey != nil { handle, err := tablecodec.DecodeRowKey(r.handleKey.newKey) @@ -258,6 +267,9 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D } } } + if e.stats != nil { + e.stats.CheckInsertTime += time.Since(start) + } return nil } diff --git a/executor/insert_common.go b/executor/insert_common.go index 0df7ba25558a8..58d2fbc7d998f 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -14,9 +14,13 @@ package executor import ( + "bytes" "context" + "fmt" "math" + "time" + "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" @@ -31,6 +35,7 @@ import ( "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" "go.uber.org/zap" @@ -74,7 +79,7 @@ type InsertValues struct { lazyFillAutoID bool memTracker *memory.Tracker - stats *runtimeStatsWithSnapshot + stats *InsertRuntimeStat } type defaultVal struct { @@ -904,7 +909,6 @@ func (e *InsertValues) allocAutoRandomID(fieldType *types.FieldType) (int64, err if err != nil { return 0, err } - layout := autoid.NewAutoRandomIDLayout(fieldType, tableInfo.AutoRandomBits) if tables.OverflowShardBits(autoRandomID, tableInfo.AutoRandomBits, layout.TypeBitsLength, layout.HasSignBit) { return 0, autoid.ErrAutoRandReadFailed @@ -936,8 +940,11 @@ func (e *InsertValues) collectRuntimeStatsEnabled() bool { if e.runtimeStats != nil { if e.stats == nil { snapshotStats := &tikv.SnapshotRuntimeStats{} - e.stats = &runtimeStatsWithSnapshot{ + e.stats = &InsertRuntimeStat{ + BasicRuntimeStats: e.runtimeStats, SnapshotRuntimeStats: snapshotStats, + Prefetch: 0, + CheckInsertTime: 0, } e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) } @@ -951,7 +958,12 @@ func (e *InsertValues) collectRuntimeStatsEnabled() bool { func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.Datum, addRecord func(ctx context.Context, row []types.Datum) (int64, error)) error { // all the rows will be checked, so it is safe to set BatchCheck = true e.ctx.GetSessionVars().StmtCtx.BatchCheck = true - + if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("InsertValues.batchCheckAndInsert", opentracing.ChildOf(span.Context())) + defer span1.Finish() + opentracing.ContextWithSpan(ctx, span1) + } + start := time.Now() // Get keys need to be checked. toBeCheckedRows, err := getKeysNeedCheck(ctx, e.ctx, e.Table, rows) if err != nil { @@ -969,11 +981,15 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D defer snapshot.DelOption(kv.CollectRuntimeStats) } } + PrefetchStart := time.Now() // Fill cache using BatchGet, the following Get requests don't need to visit TiKV. if _, err = prefetchUniqueIndices(ctx, txn, toBeCheckedRows); err != nil { return err } + if e.stats != nil { + e.stats.Prefetch += time.Since(PrefetchStart) + } // append warnings and get no duplicated error rows for i, r := range toBeCheckedRows { @@ -1000,6 +1016,7 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D return err } } + // If row was checked with no duplicate keys, // it should be add to values map for the further row check. // There may be duplicate keys inside the insert statement. @@ -1011,6 +1028,9 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D } } } + if e.stats != nil { + e.stats.CheckInsertTime += time.Since(start) + } return nil } @@ -1041,3 +1061,81 @@ func (e *InsertValues) addRecordWithAutoIDHint(ctx context.Context, row []types. } return h, nil } + +// InsertRuntimeStat record the runtime information +type InsertRuntimeStat struct { + *execdetails.BasicRuntimeStats + *tikv.SnapshotRuntimeStats + CheckInsertTime time.Duration + Prefetch time.Duration +} + +func (e *InsertRuntimeStat) String() string { + if e.CheckInsertTime == 0 { + // For replace statement. + if e.Prefetch > 0 && e.SnapshotRuntimeStats != nil { + return fmt.Sprintf("prefetch: %v, rpc:{%v}", e.Prefetch, e.SnapshotRuntimeStats.String()) + } + return "" + } + buf := bytes.NewBuffer(make([]byte, 0, 32)) + buf.WriteString(fmt.Sprintf("prepare:%v, ", time.Duration(e.BasicRuntimeStats.GetTime())-e.CheckInsertTime)) + if e.Prefetch > 0 { + buf.WriteString(fmt.Sprintf("check_insert:{total_time:%v, mem_insert_time:%v, prefetch:%v", e.CheckInsertTime, e.CheckInsertTime-e.Prefetch, e.Prefetch)) + if e.SnapshotRuntimeStats != nil { + buf.WriteString(fmt.Sprintf(", rpc:{%s}", e.SnapshotRuntimeStats.String())) + } + buf.WriteString("}") + } else { + buf.WriteString(fmt.Sprintf("insert:%v", e.CheckInsertTime)) + } + return buf.String() +} + +// Clone implements the RuntimeStats interface. +func (e *InsertRuntimeStat) Clone() execdetails.RuntimeStats { + newRs := &InsertRuntimeStat{ + CheckInsertTime: e.CheckInsertTime, + Prefetch: e.Prefetch, + } + if e.SnapshotRuntimeStats != nil { + snapshotStats := e.SnapshotRuntimeStats.Clone() + newRs.SnapshotRuntimeStats = snapshotStats.(*tikv.SnapshotRuntimeStats) + } + if e.BasicRuntimeStats != nil { + basicStats := e.BasicRuntimeStats.Clone() + newRs.BasicRuntimeStats = basicStats.(*execdetails.BasicRuntimeStats) + } + return newRs +} + +// Merge implements the RuntimeStats interface. +func (e *InsertRuntimeStat) Merge(other execdetails.RuntimeStats) { + tmp, ok := other.(*InsertRuntimeStat) + if !ok { + return + } + if tmp.SnapshotRuntimeStats != nil { + if e.SnapshotRuntimeStats == nil { + snapshotStats := tmp.SnapshotRuntimeStats.Clone() + e.SnapshotRuntimeStats = snapshotStats.(*tikv.SnapshotRuntimeStats) + } else { + e.SnapshotRuntimeStats.Merge(tmp.SnapshotRuntimeStats) + } + } + if tmp.BasicRuntimeStats != nil { + if e.BasicRuntimeStats == nil { + basicStats := tmp.BasicRuntimeStats.Clone() + e.BasicRuntimeStats = basicStats.(*execdetails.BasicRuntimeStats) + } else { + e.BasicRuntimeStats.Merge(tmp.BasicRuntimeStats) + } + } + e.Prefetch += tmp.Prefetch + e.CheckInsertTime += tmp.CheckInsertTime +} + +// Tp implements the RuntimeStats interface. +func (e *InsertRuntimeStat) Tp() int { + return execdetails.TpInsertRuntimeStat +} diff --git a/executor/insert_test.go b/executor/insert_test.go index 24d8a12184c3f..e2176761dc09e 100644 --- a/executor/insert_test.go +++ b/executor/insert_test.go @@ -18,13 +18,16 @@ import ( "strconv" "strings" "sync" + "time" . "github.com/pingcap/check" "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/testkit" "github.com/pingcap/tidb/util/testutil" ) @@ -1148,3 +1151,17 @@ func (s *testSuite9) TestIssue16366(c *C) { c.Assert(err, NotNil) c.Assert(strings.Contains(err.Error(), "Duplicate entry '0' for key 'PRIMARY'"), IsTrue, Commentf("%v", err)) } + +func (s *testSuite9) TestInsertRuntimeStat(c *C) { + stats := &executor.InsertRuntimeStat{ + BasicRuntimeStats: &execdetails.BasicRuntimeStats{}, + SnapshotRuntimeStats: nil, + CheckInsertTime: 2 * time.Second, + Prefetch: 1 * time.Second, + } + stats.BasicRuntimeStats.Record(5*time.Second, 1) + c.Assert(stats.String(), Equals, "prepare:3s, check_insert:{total_time:2s, mem_insert_time:1s, prefetch:1s}") + c.Assert(stats.String(), Equals, stats.Clone().String()) + stats.Merge(stats.Clone()) + c.Assert(stats.String(), Equals, "prepare:6s, check_insert:{total_time:4s, mem_insert_time:2s, prefetch:2s}") +} diff --git a/executor/replace.go b/executor/replace.go index a3f798a524aaa..450d96c117403 100644 --- a/executor/replace.go +++ b/executor/replace.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "runtime/trace" + "time" "github.com/pingcap/errors" "github.com/pingcap/parser/mysql" @@ -201,13 +202,16 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error { defer snapshot.DelOption(kv.CollectRuntimeStats) } } + prefetchStart := time.Now() // Use BatchGet to fill cache. // It's an optimization and could be removed without affecting correctness. if err = prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil { return err } - + if e.stats != nil { + e.stats.Prefetch = time.Since(prefetchStart) + } e.ctx.GetSessionVars().StmtCtx.AddRecordRows(uint64(len(newRows))) for _, r := range toBeCheckedRows { err = e.replaceRow(ctx, r) diff --git a/store/tikv/scan.go b/store/tikv/scan.go index c3cbd6b5bebe1..b4272ac6857c0 100644 --- a/store/tikv/scan.go +++ b/store/tikv/scan.go @@ -144,7 +144,8 @@ func (s *Scanner) startTS() uint64 { } func (s *Scanner) resolveCurrentLock(bo *Backoffer, current *pb.KvPair) error { - val, err := s.snapshot.get(bo, kv.Key(current.Key)) + ctx := context.Background() + val, err := s.snapshot.get(ctx, bo, current.Key) if err != nil { return errors.Trace(err) } diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index 2896a16b19843..6e7a88ac238f4 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -323,11 +323,6 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll // Get gets the value for key k from snapshot. func (s *tikvSnapshot) Get(ctx context.Context, k kv.Key) ([]byte, error) { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("tikvSnapshot.get", opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - } defer func(start time.Time) { tikvTxnCmdHistogramWithGet.Observe(time.Since(start).Seconds()) @@ -335,7 +330,7 @@ func (s *tikvSnapshot) Get(ctx context.Context, k kv.Key) ([]byte, error) { ctx = context.WithValue(ctx, txnStartKey, s.version.Ver) bo := NewBackofferWithVars(ctx, getMaxBackoff, s.vars) - val, err := s.get(bo, k) + val, err := s.get(ctx, bo, k) s.recordBackoffInfo(bo) if err != nil { return nil, errors.Trace(err) @@ -351,7 +346,7 @@ func (s *tikvSnapshot) Get(ctx context.Context, k kv.Key) ([]byte, error) { return val, nil } -func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) { +func (s *tikvSnapshot) get(ctx context.Context, bo *Backoffer, k kv.Key) ([]byte, error) { // Check the cached values first. s.mu.RLock() if s.mu.cached != nil { @@ -361,7 +356,11 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) { } } s.mu.RUnlock() - + if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("tikvSnapshot.get", opentracing.ChildOf(span.Context())) + defer span1.Finish() + opentracing.ContextWithSpan(ctx, span1) + } failpoint.Inject("snapshot-get-cache-fail", func(_ failpoint.Value) { if bo.ctx.Value("TestSnapshotCache") != nil { panic("cache miss") diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go index 3da02d56185a2..5b7c28e2d8909 100644 --- a/util/execdetails/execdetails.go +++ b/util/execdetails/execdetails.go @@ -430,6 +430,8 @@ const ( TpJoinRuntimeStats // TpSelectResultRuntimeStats is the tp for SelectResultRuntimeStats. TpSelectResultRuntimeStats + // TpInsertRuntimeStat is the tp for InsertRuntimeStat + TpInsertRuntimeStat ) // RuntimeStats is used to express the executor runtime information. @@ -548,6 +550,11 @@ func (e *BasicRuntimeStats) String() string { return fmt.Sprintf("time:%v, loops:%d", time.Duration(e.consume), e.loop) } +// GetTime get the int64 total time +func (e *BasicRuntimeStats) GetTime() int64 { + return e.consume +} + // RuntimeStatsColl collects executors's execution info. type RuntimeStatsColl struct { mu sync.Mutex