Skip to content

Commit

Permalink
cherry pick pingcap#19574 to release-4.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
jyz0309 authored and ti-srebot committed Sep 23, 2020
1 parent d1a152f commit 387c135
Show file tree
Hide file tree
Showing 7 changed files with 278 additions and 13 deletions.
80 changes: 80 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6003,6 +6003,86 @@ func (s *testSuite) TestIssue19372(c *C) {
tk.MustQuery("select (select t2.c_str from t2 where t2.c_str <= t1.c_str and t2.c_int in (1, 2) order by t2.c_str limit 1) x from t1 order by c_int;").Check(testkit.Rows("a", "a", "a"))
}

<<<<<<< HEAD
=======
func (s *testSerialSuite1) TestCollectCopRuntimeStats(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test;")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1 (a int, b int)")
tk.MustExec("set tidb_enable_collect_execution_info=1;")
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/tikvStoreRespResult", `return(true)`), IsNil)
rows := tk.MustQuery("explain analyze select * from t1").Rows()
c.Assert(len(rows), Equals, 2)
explain := fmt.Sprintf("%v", rows[0])
c.Assert(explain, Matches, ".*rpc_num: 2, .*regionMiss:.*")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/tikvStoreRespResult"), IsNil)
}

func (s *testSuite) TestCollectDMLRuntimeStats(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1 (a int, b int, unique index (a))")

testSQLs := []string{
"insert ignore into t1 values (5,5);",
"insert into t1 values (5,5) on duplicate key update a=a+1;",
"replace into t1 values (5,6),(6,7)",
"update t1 set a=a+1 where a=6;",
}

getRootStats := func() string {
info := tk.Se.ShowProcess()
c.Assert(info, NotNil)
p, ok := info.Plan.(plannercore.Plan)
c.Assert(ok, IsTrue)
stats := tk.Se.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(p.ID())
return stats.String()
}
for _, sql := range testSQLs {
tk.MustExec(sql)
c.Assert(getRootStats(), Matches, "time.*loops.*Get.*num_rpc.*total_time.*")
}

// Test for lock keys stats.
tk.MustExec("begin pessimistic")
tk.MustExec("update t1 set b=b+1")
c.Assert(getRootStats(), Matches, "time.*lock_keys.*time.* region.* keys.* lock_rpc:.* rpc_count.*")
tk.MustExec("rollback")

tk.MustExec("begin pessimistic")
tk.MustQuery("select * from t1 for update").Check(testkit.Rows("5 6", "7 7"))
c.Assert(getRootStats(), Matches, "time.*lock_keys.*time.* region.* keys.* lock_rpc:.* rpc_count.*")
tk.MustExec("rollback")

tk.MustExec("begin pessimistic")
tk.MustExec("insert ignore into t1 values (9,9)")
c.Assert(getRootStats(), Matches, "time:.*, loops:.*, prepare:.*, check_insert:{total_time:.*, mem_insert_time:.*, prefetch:.*, rpc:{BatchGet:{num_rpc:.*, total_time:.*}}}.*")
tk.MustExec("rollback")

tk.MustExec("begin pessimistic")
tk.MustExec("insert into t1 values (10,10) on duplicate key update a=a+1")
c.Assert(getRootStats(), Matches, "time:.*, loops:.*, prepare:.*, check_insert:{total_time:.*, mem_insert_time:.*, prefetch:.*, rpc:{BatchGet:{num_rpc:.*, total_time:.*}.*")
tk.MustExec("rollback")

tk.MustExec("begin pessimistic")
tk.MustExec("insert into t1 values (1,2)")
c.Assert(getRootStats(), Matches, "time:.*, loops:.*, prepare:.*, insert:.*")
tk.MustExec("rollback")

tk.MustExec("begin pessimistic")
tk.MustExec("insert ignore into t1 values(11,11) on duplicate key update `a`=`a`+1")
c.Assert(getRootStats(), Matches, "time:.*, loops:.*, prepare:.*, check_insert:{total_time:.*, mem_insert_time:.*, prefetch:.*, rpc:.*}")
tk.MustExec("rollback")

tk.MustExec("begin pessimistic")
tk.MustExec("replace into t1 values (1,4)")
c.Assert(getRootStats(), Matches, "time:.*, loops:.*, prefetch:.*, rpc:.*")
tk.MustExec("rollback")
}

>>>>>>> bb354b0... *:Record the time consuming of memory operation of Insert Executor in Runtime Information (#19574)
func (s *testSuite) TestIssue13758(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
25 changes: 23 additions & 2 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/hex"
"fmt"
"runtime/trace"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/parser/mysql"
Expand Down Expand Up @@ -82,6 +83,8 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
return err
}
} else {
e.collectRuntimeStatsEnabled()
start := time.Now()
for i, row := range rows {
var err error
if i == 0 {
Expand All @@ -93,6 +96,9 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
return err
}
}
if e.stats != nil {
e.stats.checkInsertTime += time.Since(start)
}
}
e.memTracker.Consume(int64(txn.Size() - txnSize))
return nil
Expand Down Expand Up @@ -178,6 +184,7 @@ func (e *InsertExec) updateDupRow(ctx context.Context, txn kv.Transaction, row t
// batchUpdateDupRows updates multi-rows in batch if they are duplicate with rows in table.
func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.Datum) error {
// Get keys need to be checked.
start := time.Now()
toBeCheckedRows, err := getKeysNeedCheck(ctx, e.ctx, e.Table, newRows)
if err != nil {
return err
Expand All @@ -188,12 +195,24 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
return err
}

<<<<<<< HEAD
=======
if e.collectRuntimeStatsEnabled() {
if snapshot := txn.GetSnapshot(); snapshot != nil {
snapshot.SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
defer snapshot.DelOption(kv.CollectRuntimeStats)
}
}
prefetchStart := time.Now()
>>>>>>> bb354b0... *:Record the time consuming of memory operation of Insert Executor in Runtime Information (#19574)
// Use BatchGet to fill cache.
// It's an optimization and could be removed without affecting correctness.
if err = prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil {
return err
}

if e.stats != nil {
e.stats.prefetch += time.Since(prefetchStart)
}
for i, r := range toBeCheckedRows {
if r.handleKey != nil {
handle, err := tablecodec.DecodeRowKey(r.handleKey.newKey)
Expand Down Expand Up @@ -251,6 +270,9 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
}
}
}
if e.stats != nil {
e.stats.checkInsertTime += time.Since(start)
}
return nil
}

Expand Down Expand Up @@ -320,7 +342,6 @@ func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle int64, oldRow []
// See http://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_values
e.curInsertVals.SetDatums(newRow...)
e.ctx.GetSessionVars().CurrInsertValues = e.curInsertVals.ToRow()

// NOTE: In order to execute the expression inside the column assignment,
// we have to put the value of "oldRow" before "newRow" in "row4Update" to
// be consistent with "Schema4OnDuplicate" in the "Insert" PhysicalPlan.
Expand Down
120 changes: 118 additions & 2 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@
package executor

import (
"bytes"
"context"
"fmt"
"math"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
Expand All @@ -30,6 +34,7 @@ import (
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"go.uber.org/zap"
Expand Down Expand Up @@ -72,6 +77,11 @@ type InsertValues struct {
// https://dev.mysql.com/doc/refman/8.0/en/innodb-auto-increment-handling.html
lazyFillAutoID bool
memTracker *memory.Tracker
<<<<<<< HEAD
=======

stats *insertRuntimeStat
>>>>>>> bb354b0... *:Record the time consuming of memory operation of Insert Executor in Runtime Information (#19574)
}

type defaultVal struct {
Expand Down Expand Up @@ -901,7 +911,6 @@ func (e *InsertValues) allocAutoRandomID(fieldType *types.FieldType) (int64, err
if err != nil {
return 0, err
}

layout := autoid.NewAutoRandomIDLayout(fieldType, tableInfo.AutoRandomBits)
if tables.OverflowShardBits(autoRandomID, tableInfo.AutoRandomBits, layout.TypeBitsLength, layout.HasSignBit) {
return 0, autoid.ErrAutoRandReadFailed
Expand Down Expand Up @@ -929,12 +938,37 @@ func (e *InsertValues) handleWarning(err error) {
sc.AppendWarning(err)
}

<<<<<<< HEAD
=======
func (e *InsertValues) collectRuntimeStatsEnabled() bool {
if e.runtimeStats != nil {
if e.stats == nil {
snapshotStats := &tikv.SnapshotRuntimeStats{}
e.stats = &insertRuntimeStat{
BasicRuntimeStats: e.runtimeStats,
SnapshotRuntimeStats: snapshotStats,
prefetch: 0,
checkInsertTime: 0,
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
return true
}
return false
}

>>>>>>> bb354b0... *:Record the time consuming of memory operation of Insert Executor in Runtime Information (#19574)
// batchCheckAndInsert checks rows with duplicate errors.
// All duplicate rows will be ignored and appended as duplicate warnings.
func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.Datum, addRecord func(ctx context.Context, row []types.Datum) (int64, error)) error {
// all the rows will be checked, so it is safe to set BatchCheck = true
e.ctx.GetSessionVars().StmtCtx.BatchCheck = true

if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("InsertValues.batchCheckAndInsert", opentracing.ChildOf(span.Context()))
defer span1.Finish()
opentracing.ContextWithSpan(ctx, span1)
}
start := time.Now()
// Get keys need to be checked.
toBeCheckedRows, err := getKeysNeedCheck(ctx, e.ctx, e.Table, rows)
if err != nil {
Expand All @@ -945,11 +979,24 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
if err != nil {
return err
}
<<<<<<< HEAD

=======
if e.collectRuntimeStatsEnabled() {
if snapshot := txn.GetSnapshot(); snapshot != nil {
snapshot.SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
defer snapshot.DelOption(kv.CollectRuntimeStats)
}
}
prefetchStart := time.Now()
>>>>>>> bb354b0... *:Record the time consuming of memory operation of Insert Executor in Runtime Information (#19574)
// Fill cache using BatchGet, the following Get requests don't need to visit TiKV.
if _, err = prefetchUniqueIndices(ctx, txn, toBeCheckedRows); err != nil {
return err
}
if e.stats != nil {
e.stats.prefetch += time.Since(prefetchStart)
}

// append warnings and get no duplicated error rows
for i, r := range toBeCheckedRows {
Expand All @@ -976,6 +1023,7 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
return err
}
}

// If row was checked with no duplicate keys,
// it should be add to values map for the further row check.
// There may be duplicate keys inside the insert statement.
Expand All @@ -987,6 +1035,9 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
}
}
}
if e.stats != nil {
e.stats.checkInsertTime += time.Since(start)
}
return nil
}

Expand Down Expand Up @@ -1017,3 +1068,68 @@ func (e *InsertValues) addRecordWithAutoIDHint(ctx context.Context, row []types.
}
return h, nil
}

type insertRuntimeStat struct {
*execdetails.BasicRuntimeStats
*tikv.SnapshotRuntimeStats
checkInsertTime time.Duration
prefetch time.Duration
}

func (e *insertRuntimeStat) String() string {
if e.checkInsertTime == 0 {
// For replace statement.
if e.prefetch > 0 && e.SnapshotRuntimeStats != nil {
return fmt.Sprintf("prefetch: %v, rpc:{%v}", e.prefetch, e.SnapshotRuntimeStats.String())
}
return ""
}
buf := bytes.NewBuffer(make([]byte, 0, 32))
buf.WriteString(fmt.Sprintf("prepare:%v, ", time.Duration(e.BasicRuntimeStats.GetTime())-e.checkInsertTime))
if e.prefetch > 0 {
buf.WriteString(fmt.Sprintf("check_insert:{total_time:%v, mem_insert_time:%v, prefetch:%v", e.checkInsertTime, e.checkInsertTime-e.prefetch, e.prefetch))
if e.SnapshotRuntimeStats != nil {
buf.WriteString(fmt.Sprintf(", rpc:{%s}", e.SnapshotRuntimeStats.String()))
}
buf.WriteString("}")
} else {
buf.WriteString(fmt.Sprintf("insert:%v", e.checkInsertTime))
}
return buf.String()
}

// Clone implements the RuntimeStats interface.
func (e *insertRuntimeStat) Clone() execdetails.RuntimeStats {
newRs := &insertRuntimeStat{
checkInsertTime: e.checkInsertTime,
prefetch: e.prefetch,
}
if e.SnapshotRuntimeStats != nil {
snapshotStats := e.SnapshotRuntimeStats.Clone()
newRs.SnapshotRuntimeStats = snapshotStats.(*tikv.SnapshotRuntimeStats)
}
return newRs
}

// Merge implements the RuntimeStats interface.
func (e *insertRuntimeStat) Merge(other execdetails.RuntimeStats) {
tmp, ok := other.(*insertRuntimeStat)
if !ok {
return
}
if tmp.SnapshotRuntimeStats != nil {
if e.SnapshotRuntimeStats == nil {
snapshotStats := tmp.SnapshotRuntimeStats.Clone()
e.SnapshotRuntimeStats = snapshotStats.(*tikv.SnapshotRuntimeStats)
} else {
e.SnapshotRuntimeStats.Merge(tmp.SnapshotRuntimeStats)
}
}
e.prefetch += tmp.prefetch
e.checkInsertTime += tmp.checkInsertTime
}

// Tp implements the RuntimeStats interface.
func (e *insertRuntimeStat) Tp() int {
return execdetails.TpInsertRuntimeStat
}
15 changes: 14 additions & 1 deletion executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"runtime/trace"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/parser/mysql"
Expand Down Expand Up @@ -195,12 +196,24 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error {
}
txnSize := txn.Size()

<<<<<<< HEAD
=======
if e.collectRuntimeStatsEnabled() {
if snapshot := txn.GetSnapshot(); snapshot != nil {
snapshot.SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
defer snapshot.DelOption(kv.CollectRuntimeStats)
}
}
prefetchStart := time.Now()
>>>>>>> bb354b0... *:Record the time consuming of memory operation of Insert Executor in Runtime Information (#19574)
// Use BatchGet to fill cache.
// It's an optimization and could be removed without affecting correctness.
if err = prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil {
return err
}

if e.stats != nil {
e.stats.prefetch = time.Since(prefetchStart)
}
e.ctx.GetSessionVars().StmtCtx.AddRecordRows(uint64(len(newRows)))
for _, r := range toBeCheckedRows {
err = e.replaceRow(ctx, r)
Expand Down
Loading

0 comments on commit 387c135

Please sign in to comment.