Skip to content

Commit

Permalink
executor: show back-off details in slow log (pingcap#13770)
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored and XiaTianliang committed Dec 21, 2019
1 parent 3afa113 commit 25736c8
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 24 deletions.
26 changes: 18 additions & 8 deletions domain/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,23 +317,33 @@ func (*testSuite) TestT(c *C) {

res := dom.ShowSlowQuery(&ast.ShowSlow{Tp: ast.ShowSlowTop, Count: 2})
c.Assert(res, HasLen, 2)
c.Assert(*res[0], Equals, SlowQueryInfo{SQL: "bbb", Duration: 3 * time.Second})
c.Assert(*res[1], Equals, SlowQueryInfo{SQL: "ccc", Duration: 2 * time.Second})
c.Assert(res[0].SQL, Equals, "bbb")
c.Assert(res[0].Duration, Equals, 3*time.Second)
c.Assert(res[1].SQL, Equals, "ccc")
c.Assert(res[1].Duration, Equals, 2*time.Second)

res = dom.ShowSlowQuery(&ast.ShowSlow{Tp: ast.ShowSlowTop, Count: 2, Kind: ast.ShowSlowKindInternal})
c.Assert(res, HasLen, 1)
c.Assert(*res[0], Equals, SlowQueryInfo{SQL: "aaa", Duration: time.Second, Internal: true})
c.Assert(res[0].SQL, Equals, "aaa")
c.Assert(res[0].Duration, Equals, time.Second)
c.Assert(res[0].Internal, Equals, true)

res = dom.ShowSlowQuery(&ast.ShowSlow{Tp: ast.ShowSlowTop, Count: 4, Kind: ast.ShowSlowKindAll})
c.Assert(res, HasLen, 3)
c.Assert(*res[0], Equals, SlowQueryInfo{SQL: "bbb", Duration: 3 * time.Second})
c.Assert(*res[1], Equals, SlowQueryInfo{SQL: "ccc", Duration: 2 * time.Second})
c.Assert(*res[2], Equals, SlowQueryInfo{SQL: "aaa", Duration: time.Second, Internal: true})
c.Assert(res[0].SQL, Equals, "bbb")
c.Assert(res[0].Duration, Equals, 3*time.Second)
c.Assert(res[1].SQL, Equals, "ccc")
c.Assert(res[1].Duration, Equals, 2*time.Second)
c.Assert(res[2].SQL, Equals, "aaa")
c.Assert(res[2].Duration, Equals, time.Second)
c.Assert(res[2].Internal, Equals, true)

res = dom.ShowSlowQuery(&ast.ShowSlow{Tp: ast.ShowSlowRecent, Count: 2})
c.Assert(res, HasLen, 2)
c.Assert(*res[0], Equals, SlowQueryInfo{SQL: "ccc", Duration: 2 * time.Second})
c.Assert(*res[1], Equals, SlowQueryInfo{SQL: "bbb", Duration: 3 * time.Second})
c.Assert(res[0].SQL, Equals, "ccc")
c.Assert(res[0].Duration, Equals, 2*time.Second)
c.Assert(res[1].SQL, Equals, "bbb")
c.Assert(res[1].Duration, Equals, 3*time.Second)

metrics.PanicCounter.Reset()
// Since the stats lease is 0 now, so create a new ticker will panic.
Expand Down
28 changes: 14 additions & 14 deletions domain/topn_slow_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,28 +116,28 @@ func (t *testTopNSlowQuerySuite) TestQueue(c *C) {
q.Append(&SlowQueryInfo{SQL: "ccc"})

query := q.recent.Query(1)
c.Assert(*query[0], Equals, SlowQueryInfo{SQL: "ccc"})
c.Assert(query[0].SQL, Equals, "ccc")
query = q.recent.Query(2)
c.Assert(*query[0], Equals, SlowQueryInfo{SQL: "ccc"})
c.Assert(*query[1], Equals, SlowQueryInfo{SQL: "bbb"})
c.Assert(query[0].SQL, Equals, "ccc")
c.Assert(query[1].SQL, Equals, "bbb")
query = q.recent.Query(6)
c.Assert(*query[0], Equals, SlowQueryInfo{SQL: "ccc"})
c.Assert(*query[1], Equals, SlowQueryInfo{SQL: "bbb"})
c.Assert(*query[2], Equals, SlowQueryInfo{SQL: "aaa"})
c.Assert(query[0].SQL, Equals, "ccc")
c.Assert(query[1].SQL, Equals, "bbb")
c.Assert(query[2].SQL, Equals, "aaa")

q.Append(&SlowQueryInfo{SQL: "ddd"})
q.Append(&SlowQueryInfo{SQL: "eee"})
q.Append(&SlowQueryInfo{SQL: "fff"})
q.Append(&SlowQueryInfo{SQL: "ggg"})

query = q.recent.Query(3)
c.Assert(*query[0], Equals, SlowQueryInfo{SQL: "ggg"})
c.Assert(*query[1], Equals, SlowQueryInfo{SQL: "fff"})
c.Assert(*query[2], Equals, SlowQueryInfo{SQL: "eee"})
c.Assert(query[0].SQL, Equals, "ggg")
c.Assert(query[1].SQL, Equals, "fff")
c.Assert(query[2].SQL, Equals, "eee")
query = q.recent.Query(6)
c.Assert(*query[0], Equals, SlowQueryInfo{SQL: "ggg"})
c.Assert(*query[1], Equals, SlowQueryInfo{SQL: "fff"})
c.Assert(*query[2], Equals, SlowQueryInfo{SQL: "eee"})
c.Assert(*query[3], Equals, SlowQueryInfo{SQL: "ddd"})
c.Assert(*query[4], Equals, SlowQueryInfo{SQL: "ccc"})
c.Assert(query[0].SQL, Equals, "ggg")
c.Assert(query[1].SQL, Equals, "fff")
c.Assert(query[2].SQL, Equals, "eee")
c.Assert(query[3].SQL, Equals, "ddd")
c.Assert(query[4].SQL, Equals, "ccc")
}
56 changes: 55 additions & 1 deletion sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,15 @@ func (sc *StatementContext) CopTasksDetails() *CopTasksDetails {
sc.mu.Lock()
defer sc.mu.Unlock()
n := len(sc.mu.allExecDetails)
d := &CopTasksDetails{NumCopTasks: n}
d := &CopTasksDetails{
NumCopTasks: n,
MaxBackoffTime: make(map[string]time.Duration),
AvgBackoffTime: make(map[string]time.Duration),
P90BackoffTime: make(map[string]time.Duration),
TotBackoffTime: make(map[string]time.Duration),
TotBackoffTimes: make(map[string]int),
MaxBackoffAddress: make(map[string]string),
}
if n == 0 {
return d
}
Expand All @@ -540,6 +548,45 @@ func (sc *StatementContext) CopTasksDetails() *CopTasksDetails {
d.P90WaitTime = sc.mu.allExecDetails[n*9/10].WaitTime
d.MaxWaitTime = sc.mu.allExecDetails[n-1].WaitTime
d.MaxWaitAddress = sc.mu.allExecDetails[n-1].CalleeAddress

// calculate backoff details
type backoffItem struct {
callee string
sleepTime time.Duration
times int
}
backoffInfo := make(map[string][]backoffItem)
for _, ed := range sc.mu.allExecDetails {
for backoff := range ed.BackoffTimes {
backoffInfo[backoff] = append(backoffInfo[backoff], backoffItem{
callee: ed.CalleeAddress,
sleepTime: ed.BackoffSleep[backoff],
times: ed.BackoffTimes[backoff],
})
}
}
for backoff, items := range backoffInfo {
if len(items) == 0 {
continue
}
sort.Slice(items, func(i, j int) bool {
return items[i].sleepTime < items[j].sleepTime
})
n := len(items)
d.MaxBackoffAddress[backoff] = items[n-1].callee
d.MaxBackoffTime[backoff] = items[n-1].sleepTime
d.P90BackoffTime[backoff] = items[n*9/10].sleepTime

var totalTime time.Duration
totalTimes := 0
for _, it := range items {
totalTime += it.sleepTime
totalTimes += it.times
}
d.AvgBackoffTime[backoff] = totalTime / time.Duration(n)
d.TotBackoffTime[backoff] = totalTime
d.TotBackoffTimes[backoff] = totalTimes
}
return d
}

Expand All @@ -556,6 +603,13 @@ type CopTasksDetails struct {
P90WaitTime time.Duration
MaxWaitAddress string
MaxWaitTime time.Duration

MaxBackoffTime map[string]time.Duration
MaxBackoffAddress map[string]string
AvgBackoffTime map[string]time.Duration
P90BackoffTime map[string]time.Duration
TotBackoffTime map[string]time.Duration
TotBackoffTimes map[string]int
}

// ToZapFields wraps the CopTasksDetails as zap.Fileds.
Expand Down
15 changes: 15 additions & 0 deletions sessionctx/stmtctx/stmtctx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,18 @@ var _ = Suite(&stmtctxSuit{})

func (s *stmtctxSuit) TestCopTasksDetails(c *C) {
ctx := new(stmtctx.StatementContext)
backoffs := []string{"tikvRPC", "pdRPC", "regionMiss"}
for i := 0; i < 100; i++ {
d := &execdetails.ExecDetails{
CalleeAddress: fmt.Sprintf("%v", i+1),
ProcessTime: time.Second * time.Duration(i+1),
WaitTime: time.Millisecond * time.Duration(i+1),
BackoffSleep: make(map[string]time.Duration),
BackoffTimes: make(map[string]int),
}
for _, backoff := range backoffs {
d.BackoffSleep[backoff] = time.Millisecond * 100 * time.Duration(i+1)
d.BackoffTimes[backoff] = i + 1
}
ctx.MergeExecDetails(d, nil)
}
Expand All @@ -53,6 +60,14 @@ func (s *stmtctxSuit) TestCopTasksDetails(c *C) {
c.Assert(d.MaxWaitAddress, Equals, "100")
fields := d.ToZapFields()
c.Assert(len(fields), Equals, 9)
for _, backoff := range backoffs {
c.Assert(d.MaxBackoffAddress[backoff], Equals, "100")
c.Assert(d.MaxBackoffTime[backoff], Equals, 100*time.Millisecond*100)
c.Assert(d.P90BackoffTime[backoff], Equals, time.Millisecond*100*91)
c.Assert(d.AvgBackoffTime[backoff], Equals, time.Millisecond*100*101/2)
c.Assert(d.TotBackoffTimes[backoff], Equals, 101*50)
c.Assert(d.TotBackoffTime[backoff], Equals, 101*50*100*time.Millisecond)
}
}

func (s *stmtctxSuit) TestStatementContextPushDownFLags(c *C) {
Expand Down
29 changes: 28 additions & 1 deletion sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"bytes"
"crypto/tls"
"fmt"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -1186,6 +1187,8 @@ const (
SlowLogCopWaitMax = "Cop_wait_max"
// SlowLogCopWaitAddr is the address of TiKV where the cop-task which cost wait process time run.
SlowLogCopWaitAddr = "Cop_wait_addr"
// SlowLogCopBackoffPrefix contains backoff information.
SlowLogCopBackoffPrefix = "Cop_backoff_"
// SlowLogMemMax is the max number bytes of memory used in this statement.
SlowLogMemMax = "Mem_max"
// SlowLogPrepared is used to indicate whether this sql execute in prepare.
Expand Down Expand Up @@ -1299,14 +1302,27 @@ func (s *SessionVars) SlowLogFormat(logItems *SlowQueryLogItems) string {
if logItems.CopTasks != nil {
writeSlowLogItem(&buf, SlowLogNumCopTasksStr, strconv.FormatInt(int64(logItems.CopTasks.NumCopTasks), 10))
if logItems.CopTasks.NumCopTasks > 0 {
// make the result stable
backoffs := make([]string, 0, 3)
for backoff := range logItems.CopTasks.TotBackoffTimes {
backoffs = append(backoffs, backoff)
}
sort.Strings(backoffs)

if logItems.CopTasks.NumCopTasks == 1 {
buf.WriteString(SlowLogRowPrefixStr + fmt.Sprintf("%v%v%v %v%v%v",
SlowLogCopProcAvg, SlowLogSpaceMarkStr, logItems.CopTasks.AvgProcessTime.Seconds(),
SlowLogCopProcAddr, SlowLogSpaceMarkStr, logItems.CopTasks.MaxProcessAddress) + "\n")
buf.WriteString(SlowLogRowPrefixStr + fmt.Sprintf("%v%v%v %v%v%v",
SlowLogCopWaitAvg, SlowLogSpaceMarkStr, logItems.CopTasks.AvgWaitTime.Seconds(),
SlowLogCopWaitAddr, SlowLogSpaceMarkStr, logItems.CopTasks.MaxWaitAddress) + "\n")

for _, backoff := range backoffs {
backoffPrefix := SlowLogCopBackoffPrefix + backoff + "_"
buf.WriteString(SlowLogRowPrefixStr + fmt.Sprintf("%v%v%v %v%v%v\n",
backoffPrefix+"total_times", SlowLogSpaceMarkStr, logItems.CopTasks.TotBackoffTimes[backoff],
backoffPrefix+"total_time", SlowLogSpaceMarkStr, logItems.CopTasks.TotBackoffTime[backoff].Seconds(),
))
}
} else {
buf.WriteString(SlowLogRowPrefixStr + fmt.Sprintf("%v%v%v %v%v%v %v%v%v %v%v%v",
SlowLogCopProcAvg, SlowLogSpaceMarkStr, logItems.CopTasks.AvgProcessTime.Seconds(),
Expand All @@ -1318,6 +1334,17 @@ func (s *SessionVars) SlowLogFormat(logItems *SlowQueryLogItems) string {
SlowLogCopWaitP90, SlowLogSpaceMarkStr, logItems.CopTasks.P90WaitTime.Seconds(),
SlowLogCopWaitMax, SlowLogSpaceMarkStr, logItems.CopTasks.MaxWaitTime.Seconds(),
SlowLogCopWaitAddr, SlowLogSpaceMarkStr, logItems.CopTasks.MaxWaitAddress) + "\n")
for _, backoff := range backoffs {
backoffPrefix := SlowLogCopBackoffPrefix + backoff + "_"
buf.WriteString(SlowLogRowPrefixStr + fmt.Sprintf("%v%v%v %v%v%v %v%v%v %v%v%v %v%v%v %v%v%v\n",
backoffPrefix+"total_times", SlowLogSpaceMarkStr, logItems.CopTasks.TotBackoffTimes[backoff],
backoffPrefix+"total_time", SlowLogSpaceMarkStr, logItems.CopTasks.TotBackoffTime[backoff].Seconds(),
backoffPrefix+"max_time", SlowLogSpaceMarkStr, logItems.CopTasks.MaxBackoffTime[backoff].Seconds(),
backoffPrefix+"max_addr", SlowLogSpaceMarkStr, logItems.CopTasks.MaxBackoffAddress[backoff],
backoffPrefix+"avg_time", SlowLogSpaceMarkStr, logItems.CopTasks.AvgBackoffTime[backoff].Seconds(),
backoffPrefix+"p90_time", SlowLogSpaceMarkStr, logItems.CopTasks.P90BackoffTime[backoff].Seconds(),
))
}
}
}
}
Expand Down
20 changes: 20 additions & 0 deletions sessionctx/variable/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,24 @@ func (*testSessionSuite) TestSlowLogFormat(c *C) {
P90WaitTime: time.Millisecond * 20,
MaxWaitTime: time.Millisecond * 30,
MaxWaitAddress: "10.6.131.79",
MaxBackoffTime: make(map[string]time.Duration),
AvgBackoffTime: make(map[string]time.Duration),
P90BackoffTime: make(map[string]time.Duration),
TotBackoffTime: make(map[string]time.Duration),
TotBackoffTimes: make(map[string]int),
MaxBackoffAddress: make(map[string]string),
}

backoffs := []string{"rpcTiKV", "rpcPD", "regionMiss"}
for _, backoff := range backoffs {
copTasks.MaxBackoffTime[backoff] = time.Millisecond * 200
copTasks.MaxBackoffAddress[backoff] = "127.0.0.1"
copTasks.AvgBackoffTime[backoff] = time.Millisecond * 200
copTasks.P90BackoffTime[backoff] = time.Millisecond * 200
copTasks.TotBackoffTime[backoff] = time.Millisecond * 200
copTasks.TotBackoffTimes[backoff] = 200
}

var memMax int64 = 2333
resultString := `# Txn_start_ts: 406649736972468225
# User: root@192.168.0.1
Expand All @@ -168,6 +185,9 @@ func (*testSessionSuite) TestSlowLogFormat(c *C) {
# Num_cop_tasks: 10
# Cop_proc_avg: 1 Cop_proc_p90: 2 Cop_proc_max: 3 Cop_proc_addr: 10.6.131.78
# Cop_wait_avg: 0.01 Cop_wait_p90: 0.02 Cop_wait_max: 0.03 Cop_wait_addr: 10.6.131.79
# Cop_backoff_regionMiss_total_times: 200 Cop_backoff_regionMiss_total_time: 0.2 Cop_backoff_regionMiss_max_time: 0.2 Cop_backoff_regionMiss_max_addr: 127.0.0.1 Cop_backoff_regionMiss_avg_time: 0.2 Cop_backoff_regionMiss_p90_time: 0.2
# Cop_backoff_rpcPD_total_times: 200 Cop_backoff_rpcPD_total_time: 0.2 Cop_backoff_rpcPD_max_time: 0.2 Cop_backoff_rpcPD_max_addr: 127.0.0.1 Cop_backoff_rpcPD_avg_time: 0.2 Cop_backoff_rpcPD_p90_time: 0.2
# Cop_backoff_rpcTiKV_total_times: 200 Cop_backoff_rpcTiKV_total_time: 0.2 Cop_backoff_rpcTiKV_max_time: 0.2 Cop_backoff_rpcTiKV_max_addr: 127.0.0.1 Cop_backoff_rpcTiKV_avg_time: 0.2 Cop_backoff_rpcTiKV_p90_time: 0.2
# Mem_max: 2333
# Prepared: true
# Has_more_results: true
Expand Down
11 changes: 11 additions & 0 deletions store/tikv/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,9 @@ type Backoffer struct {
types []fmt.Stringer
vars *kv.Variables
noop bool

backoffSleepMS map[backoffType]int
backoffTimes map[backoffType]int
}

type txnStartCtxKeyType struct{}
Expand Down Expand Up @@ -332,6 +335,14 @@ func (b *Backoffer) BackoffWithMaxSleep(typ backoffType, maxSleepMs int, err err
realSleep := f(b.ctx, maxSleepMs)
backoffDuration.Observe(float64(realSleep) / 1000)
b.totalSleep += realSleep
if b.backoffSleepMS == nil {
b.backoffSleepMS = make(map[backoffType]int)
}
b.backoffSleepMS[typ] += realSleep
if b.backoffTimes == nil {
b.backoffTimes = make(map[backoffType]int)
}
b.backoffTimes[typ]++

var startTs interface{}
if ts := b.ctx.Value(txnStartKey); ts != nil {
Expand Down
6 changes: 6 additions & 0 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -921,6 +921,12 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon
resp.detail = new(execdetails.ExecDetails)
}
resp.detail.BackoffTime = time.Duration(bo.totalSleep) * time.Millisecond
resp.detail.BackoffSleep, resp.detail.BackoffTimes = make(map[string]time.Duration), make(map[string]int)
for backoff := range bo.backoffTimes {
backoffName := backoff.String()
resp.detail.BackoffTimes[backoffName] = bo.backoffTimes[backoff]
resp.detail.BackoffSleep[backoffName] = time.Duration(bo.backoffSleepMS[backoff]) * time.Millisecond
}
if rpcCtx != nil {
resp.detail.CalleeAddress = rpcCtx.Addr
}
Expand Down
2 changes: 2 additions & 0 deletions util/execdetails/execdetails.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type ExecDetails struct {
ProcessTime time.Duration
WaitTime time.Duration
BackoffTime time.Duration
BackoffSleep map[string]time.Duration
BackoffTimes map[string]int
RequestCount int
TotalKeys int64
ProcessedKeys int64
Expand Down

0 comments on commit 25736c8

Please sign in to comment.