Skip to content

Commit

Permalink
*: record more rpc runtime information in cop runt ... (pingcap#18916) (
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Sep 3, 2020
1 parent b92dd3f commit f5b1d1f
Show file tree
Hide file tree
Showing 13 changed files with 244 additions and 172 deletions.
6 changes: 0 additions & 6 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tipb/go-tipb"
)
Expand Down Expand Up @@ -422,11 +421,6 @@ func (r *mockResultSubset) GetData() []byte { return r.data }
// GetStartKey implements kv.ResultSubset interface.
func (r *mockResultSubset) GetStartKey() kv.Key { return nil }

// GetExecDetails implements kv.ResultSubset interface.
func (r *mockResultSubset) GetExecDetails() *execdetails.ExecDetails {
return &execdetails.ExecDetails{}
}

// MemSize implements kv.ResultSubset interface.
func (r *mockResultSubset) MemSize() int64 { return int64(cap(r.data)) }

Expand Down
142 changes: 134 additions & 8 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
package distsql

import (
"bytes"
"context"
"fmt"
"sort"
"strconv"
"sync/atomic"
"time"

Expand All @@ -25,6 +29,8 @@ import (
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
Expand Down Expand Up @@ -82,6 +88,8 @@ type selectResult struct {
fetchDuration time.Duration
durationReported bool
memTracker *memory.Tracker

stats *selectResultRuntimeStats
}

func (r *selectResult) Fetch(ctx context.Context) {
Expand Down Expand Up @@ -129,14 +137,18 @@ func (r *selectResult) fetchResp(ctx context.Context) error {
for _, warning := range r.selectResp.Warnings {
sc.AppendWarning(terror.ClassTiKV.Synthesize(terror.ErrCode(warning.Code), warning.Msg))
}
resultDetail := resultSubset.GetExecDetails()
r.updateCopRuntimeStats(ctx, resultDetail, resultSubset.RespTime())
r.feedback.Update(resultSubset.GetStartKey(), r.selectResp.OutputCounts)
r.partialCount++
if resultDetail != nil {
resultDetail.CopTime = duration

hasStats, ok := resultSubset.(CopRuntimeStats)
if ok {
copStats := hasStats.GetCopRuntimeStats()
if copStats != nil {
r.updateCopRuntimeStats(ctx, copStats, resultSubset.RespTime())
copStats.CopTime = duration
sc.MergeExecDetails(&copStats.ExecDetails, nil)
}
}
sc.MergeExecDetails(resultDetail, nil)
if len(r.selectResp.Chunks) != 0 {
break
}
Expand Down Expand Up @@ -232,8 +244,8 @@ func (r *selectResult) readFromChunk(ctx context.Context, chk *chunk.Chunk) erro
return nil
}

func (r *selectResult) updateCopRuntimeStats(ctx context.Context, detail *execdetails.ExecDetails, respTime time.Duration) {
callee := detail.CalleeAddress
func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *tikv.CopRuntimeStats, respTime time.Duration) {
callee := copStats.CalleeAddress
if r.rootPlanID <= 0 || r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl == nil || callee == "" {
return
}
Expand All @@ -244,8 +256,19 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, detail *execde

return
}
if r.stats == nil {
stmtCtx := r.ctx.GetSessionVars().StmtCtx
id := r.rootPlanID
originRuntimeStats := stmtCtx.RuntimeStatsColl.GetRootStats(id)
r.stats = &selectResultRuntimeStats{
RuntimeStats: originRuntimeStats,
backoffSleep: make(map[string]time.Duration),
rpcStat: tikv.NewRegionRequestRuntimeStats(),
}
r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(id, r.stats)
}
r.stats.mergeCopRuntimeStats(copStats, respTime)

r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RecordOneReaderStats(r.rootPlanID, respTime, detail)
for i, detail := range r.selectResp.GetExecutionSummaries() {
if detail != nil && detail.TimeProcessedNs != nil &&
detail.NumProducedRows != nil && detail.NumIterations != nil {
Expand Down Expand Up @@ -289,3 +312,106 @@ func (r *selectResult) Close() error {
}
return r.resp.Close()
}

// CopRuntimeStats is a interface uses to check whether the result has cop runtime stats.
type CopRuntimeStats interface {
// GetCopRuntimeStats gets the cop runtime stats information.
GetCopRuntimeStats() *tikv.CopRuntimeStats
}

type selectResultRuntimeStats struct {
execdetails.RuntimeStats
copRespTime []time.Duration
procKeys []int64
backoffSleep map[string]time.Duration
totalProcessTime time.Duration
totalWaitTime time.Duration
rpcStat tikv.RegionRequestRuntimeStats
}

func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *tikv.CopRuntimeStats, respTime time.Duration) {
s.copRespTime = append(s.copRespTime, respTime)
s.procKeys = append(s.procKeys, copStats.ProcessedKeys)

for k, v := range copStats.BackoffSleep {
s.backoffSleep[k] += v
}
s.totalProcessTime += copStats.ProcessTime
s.totalWaitTime += copStats.WaitTime
s.rpcStat.Merge(copStats.RegionRequestRuntimeStats)
}

func (s *selectResultRuntimeStats) String() string {
buf := bytes.NewBuffer(nil)
if s.RuntimeStats != nil {
buf.WriteString(s.RuntimeStats.String())
}
if len(s.copRespTime) > 0 {
size := len(s.copRespTime)
buf.WriteString(", ")
if size == 1 {
buf.WriteString(fmt.Sprintf("cop_task: {num: 1, max:%v, proc_keys: %v", s.copRespTime[0], s.procKeys[0]))
} else {
sort.Slice(s.copRespTime, func(i, j int) bool {
return s.copRespTime[i] < s.copRespTime[j]
})
vMax, vMin := s.copRespTime[size-1], s.copRespTime[0]
vP95 := s.copRespTime[size*19/20]
sum := 0.0
for _, t := range s.copRespTime {
sum += float64(t)
}
vAvg := time.Duration(sum / float64(size))

sort.Slice(s.procKeys, func(i, j int) bool {
return s.procKeys[i] < s.procKeys[j]
})
keyMax := s.procKeys[size-1]
keyP95 := s.procKeys[size*19/20]
buf.WriteString(fmt.Sprintf("cop_task: {num: %v, max: %v, min: %v, avg: %v, p95: %v", size, vMax, vMin, vAvg, vP95))
if keyMax > 0 {
buf.WriteString(", max_proc_keys: ")
buf.WriteString(strconv.FormatInt(keyMax, 10))
buf.WriteString(", p95_proc_keys: ")
buf.WriteString(strconv.FormatInt(keyP95, 10))
}
if s.totalProcessTime > 0 {
buf.WriteString(", tot_proc: ")
buf.WriteString(s.totalProcessTime.String())
if s.totalWaitTime > 0 {
buf.WriteString(", tot_wait: ")
buf.WriteString(s.totalWaitTime.String())
}
}
}
}
copRPC := s.rpcStat.Stats[tikvrpc.CmdCop]
if copRPC != nil && copRPC.Count > 0 {
delete(s.rpcStat.Stats, tikvrpc.CmdCop)
buf.WriteString(", rpc_num: ")
buf.WriteString(strconv.FormatInt(copRPC.Count, 10))
buf.WriteString(", rpc_time: ")
buf.WriteString(time.Duration(copRPC.Consume).String())
}
buf.WriteString("}")

rpcStatsStr := s.rpcStat.String()
if len(rpcStatsStr) > 0 {
buf.WriteString(", ")
buf.WriteString(rpcStatsStr)
}

if len(s.backoffSleep) > 0 {
buf.WriteString(", backoff{")
idx := 0
for k, d := range s.backoffSleep {
if idx > 0 {
buf.WriteString(", ")
}
idx++
buf.WriteString(fmt.Sprintf("%s: %s", k, d.String()))
}
buf.WriteString("}")
}
return buf.String()
}
7 changes: 4 additions & 3 deletions distsql/select_result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tipb/go-tipb"
Expand All @@ -29,7 +30,7 @@ func (s *testSuite) TestUpdateCopRuntimeStats(c *C) {
sr := selectResult{ctx: ctx}
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl, IsNil)
sr.rootPlanID = 1234
sr.updateCopRuntimeStats(context.Background(), &execdetails.ExecDetails{CalleeAddress: "a"}, 0)
sr.updateCopRuntimeStats(context.Background(), &tikv.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "a"}}, 0)

ctx.GetSessionVars().StmtCtx.RuntimeStatsColl = execdetails.NewRuntimeStatsColl()
t := uint64(1)
Expand All @@ -39,12 +40,12 @@ func (s *testSuite) TestUpdateCopRuntimeStats(c *C) {
},
}
c.Assert(len(sr.selectResp.GetExecutionSummaries()) != len(sr.copPlanIDs), IsTrue)
sr.updateCopRuntimeStats(context.Background(), &execdetails.ExecDetails{CalleeAddress: "callee"}, 0)
sr.updateCopRuntimeStats(context.Background(), &tikv.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "callee"}}, 0)
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.ExistsCopStats(1234), IsFalse)

sr.copPlanIDs = []int{sr.rootPlanID}
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl, NotNil)
c.Assert(len(sr.selectResp.GetExecutionSummaries()), Equals, len(sr.copPlanIDs))
sr.updateCopRuntimeStats(context.Background(), &execdetails.ExecDetails{CalleeAddress: "callee"}, 0)
sr.updateCopRuntimeStats(context.Background(), &tikv.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "callee"}}, 0)
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetCopStats(1234).String(), Equals, "time:1ns, loops:1")
}
12 changes: 8 additions & 4 deletions distsql/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,15 @@ func (r *streamResult) readDataFromResponse(ctx context.Context, resp kv.Respons
}
r.feedback.Update(resultSubset.GetStartKey(), stream.OutputCounts)
r.partialCount++
resultDetail := resultSubset.GetExecDetails()
if resultDetail != nil {
resultDetail.CopTime = duration

hasStats, ok := resultSubset.(CopRuntimeStats)
if ok {
copStats := hasStats.GetCopRuntimeStats()
if copStats != nil {
copStats.CopTime = duration
r.ctx.GetSessionVars().StmtCtx.MergeExecDetails(&copStats.ExecDetails, nil)
}
}
r.ctx.GetSessionVars().StmtCtx.MergeExecDetails(resultDetail, nil)
return false, nil
}

Expand Down
3 changes: 0 additions & 3 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/memory"
)

Expand Down Expand Up @@ -341,8 +340,6 @@ type ResultSubset interface {
GetData() []byte
// GetStartKey gets the start key.
GetStartKey() Key
// GetExecDetails gets the detail information.
GetExecDetails() *execdetails.ExecDetails
// MemSize returns how many bytes of memory this result use for tracing memory usage.
MemSize() int64
// RespTime returns the response time for the request.
Expand Down
6 changes: 0 additions & 6 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1030,12 +1030,6 @@ func getRuntimeInfo(ctx sessionctx.Context, p Plan) (actRows, analyzeInfo, memor
analyzeInfo = "time:0ns, loops:0"
actRows = "0"
}
switch p.(type) {
case *PhysicalTableReader, *PhysicalIndexReader, *PhysicalIndexLookUpReader:
if s := runtimeStatsColl.GetReaderStats(explainID); s != nil && len(s.String()) > 0 {
analyzeInfo += ", " + s.String()
}
}

memoryInfo = "N/A"
memTracker := ctx.GetSessionVars().StmtCtx.MemTracker.SearchTrackerWithoutLock(p.ID())
Expand Down
12 changes: 4 additions & 8 deletions store/tikv/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"go.uber.org/zap"
Expand All @@ -42,7 +41,7 @@ type batchCopTask struct {

type batchCopResponse struct {
pbResp *coprocessor.BatchResponse
detail *execdetails.ExecDetails
detail *CopRuntimeStats

// batch Cop Response is yet to return startKey. So batchCop cannot retry partially.
startKey kv.Key
Expand All @@ -63,7 +62,7 @@ func (rs *batchCopResponse) GetStartKey() kv.Key {

// GetExecDetails is unavailable currently, because TiFlash has not collected exec details for batch cop.
// TODO: Will fix in near future.
func (rs *batchCopResponse) GetExecDetails() *execdetails.ExecDetails {
func (rs *batchCopResponse) GetCopRuntimeStats() *CopRuntimeStats {
return rs.detail
}

Expand All @@ -77,9 +76,6 @@ func (rs *batchCopResponse) MemSize() int64 {
rs.respSize += int64(cap(rs.startKey))
if rs.detail != nil {
rs.respSize += int64(sizeofExecDetails)
if rs.detail.CommitDetail != nil {
rs.respSize += int64(sizeofCommitDetails)
}
}
if rs.pbResp != nil {
// Using a approximate size since it's hard to get a accurate value.
Expand Down Expand Up @@ -304,7 +300,7 @@ func (b *batchCopIterator) handleTask(ctx context.Context, bo *Backoffer, task *
for idx := 0; idx < len(tasks); idx++ {
ret, err := b.handleTaskOnce(ctx, bo, tasks[idx])
if err != nil {
resp := &batchCopResponse{err: errors.Trace(err), detail: new(execdetails.ExecDetails)}
resp := &batchCopResponse{err: errors.Trace(err), detail: new(CopRuntimeStats)}
b.sendToRespCh(resp)
break
}
Expand Down Expand Up @@ -415,7 +411,7 @@ func (b *batchCopIterator) handleBatchCopResponse(bo *Backoffer, response *copro

resp := batchCopResponse{
pbResp: response,
detail: new(execdetails.ExecDetails),
detail: new(CopRuntimeStats),
}

resp.detail.BackoffTime = time.Duration(bo.totalSleep) * time.Millisecond
Expand Down
Loading

0 comments on commit f5b1d1f

Please sign in to comment.