diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index 84a4664366994..e65d94224958f 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -30,7 +30,6 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" - "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tipb/go-tipb" ) @@ -422,11 +421,6 @@ func (r *mockResultSubset) GetData() []byte { return r.data } // GetStartKey implements kv.ResultSubset interface. func (r *mockResultSubset) GetStartKey() kv.Key { return nil } -// GetExecDetails implements kv.ResultSubset interface. -func (r *mockResultSubset) GetExecDetails() *execdetails.ExecDetails { - return &execdetails.ExecDetails{} -} - // MemSize implements kv.ResultSubset interface. func (r *mockResultSubset) MemSize() int64 { return int64(cap(r.data)) } diff --git a/distsql/select_result.go b/distsql/select_result.go index 98f607d38d228..365b5cb6adb28 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -14,7 +14,11 @@ package distsql import ( + "bytes" "context" + "fmt" + "sort" + "strconv" "sync/atomic" "time" @@ -25,6 +29,8 @@ import ( "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/statistics" + "github.com/pingcap/tidb/store/tikv" + "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" @@ -82,6 +88,8 @@ type selectResult struct { fetchDuration time.Duration durationReported bool memTracker *memory.Tracker + + stats *selectResultRuntimeStats } func (r *selectResult) Fetch(ctx context.Context) { @@ -129,14 +137,18 @@ func (r *selectResult) fetchResp(ctx context.Context) error { for _, warning := range r.selectResp.Warnings { sc.AppendWarning(terror.ClassTiKV.Synthesize(terror.ErrCode(warning.Code), warning.Msg)) } - resultDetail := resultSubset.GetExecDetails() - r.updateCopRuntimeStats(ctx, resultDetail, resultSubset.RespTime()) r.feedback.Update(resultSubset.GetStartKey(), r.selectResp.OutputCounts) r.partialCount++ - if resultDetail != nil { - resultDetail.CopTime = duration + + hasStats, ok := resultSubset.(CopRuntimeStats) + if ok { + copStats := hasStats.GetCopRuntimeStats() + if copStats != nil { + r.updateCopRuntimeStats(ctx, copStats, resultSubset.RespTime()) + copStats.CopTime = duration + sc.MergeExecDetails(&copStats.ExecDetails, nil) + } } - sc.MergeExecDetails(resultDetail, nil) if len(r.selectResp.Chunks) != 0 { break } @@ -232,8 +244,8 @@ func (r *selectResult) readFromChunk(ctx context.Context, chk *chunk.Chunk) erro return nil } -func (r *selectResult) updateCopRuntimeStats(ctx context.Context, detail *execdetails.ExecDetails, respTime time.Duration) { - callee := detail.CalleeAddress +func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *tikv.CopRuntimeStats, respTime time.Duration) { + callee := copStats.CalleeAddress if r.rootPlanID <= 0 || r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl == nil || callee == "" { return } @@ -244,8 +256,19 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, detail *execde return } + if r.stats == nil { + stmtCtx := r.ctx.GetSessionVars().StmtCtx + id := r.rootPlanID + originRuntimeStats := stmtCtx.RuntimeStatsColl.GetRootStats(id) + r.stats = &selectResultRuntimeStats{ + RuntimeStats: originRuntimeStats, + backoffSleep: make(map[string]time.Duration), + rpcStat: tikv.NewRegionRequestRuntimeStats(), + } + r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(id, r.stats) + } + r.stats.mergeCopRuntimeStats(copStats, respTime) - r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RecordOneReaderStats(r.rootPlanID, respTime, detail) for i, detail := range r.selectResp.GetExecutionSummaries() { if detail != nil && detail.TimeProcessedNs != nil && detail.NumProducedRows != nil && detail.NumIterations != nil { @@ -289,3 +312,106 @@ func (r *selectResult) Close() error { } return r.resp.Close() } + +// CopRuntimeStats is a interface uses to check whether the result has cop runtime stats. +type CopRuntimeStats interface { + // GetCopRuntimeStats gets the cop runtime stats information. + GetCopRuntimeStats() *tikv.CopRuntimeStats +} + +type selectResultRuntimeStats struct { + execdetails.RuntimeStats + copRespTime []time.Duration + procKeys []int64 + backoffSleep map[string]time.Duration + totalProcessTime time.Duration + totalWaitTime time.Duration + rpcStat tikv.RegionRequestRuntimeStats +} + +func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *tikv.CopRuntimeStats, respTime time.Duration) { + s.copRespTime = append(s.copRespTime, respTime) + s.procKeys = append(s.procKeys, copStats.ProcessedKeys) + + for k, v := range copStats.BackoffSleep { + s.backoffSleep[k] += v + } + s.totalProcessTime += copStats.ProcessTime + s.totalWaitTime += copStats.WaitTime + s.rpcStat.Merge(copStats.RegionRequestRuntimeStats) +} + +func (s *selectResultRuntimeStats) String() string { + buf := bytes.NewBuffer(nil) + if s.RuntimeStats != nil { + buf.WriteString(s.RuntimeStats.String()) + } + if len(s.copRespTime) > 0 { + size := len(s.copRespTime) + buf.WriteString(", ") + if size == 1 { + buf.WriteString(fmt.Sprintf("cop_task: {num: 1, max:%v, proc_keys: %v", s.copRespTime[0], s.procKeys[0])) + } else { + sort.Slice(s.copRespTime, func(i, j int) bool { + return s.copRespTime[i] < s.copRespTime[j] + }) + vMax, vMin := s.copRespTime[size-1], s.copRespTime[0] + vP95 := s.copRespTime[size*19/20] + sum := 0.0 + for _, t := range s.copRespTime { + sum += float64(t) + } + vAvg := time.Duration(sum / float64(size)) + + sort.Slice(s.procKeys, func(i, j int) bool { + return s.procKeys[i] < s.procKeys[j] + }) + keyMax := s.procKeys[size-1] + keyP95 := s.procKeys[size*19/20] + buf.WriteString(fmt.Sprintf("cop_task: {num: %v, max: %v, min: %v, avg: %v, p95: %v", size, vMax, vMin, vAvg, vP95)) + if keyMax > 0 { + buf.WriteString(", max_proc_keys: ") + buf.WriteString(strconv.FormatInt(keyMax, 10)) + buf.WriteString(", p95_proc_keys: ") + buf.WriteString(strconv.FormatInt(keyP95, 10)) + } + if s.totalProcessTime > 0 { + buf.WriteString(", tot_proc: ") + buf.WriteString(s.totalProcessTime.String()) + if s.totalWaitTime > 0 { + buf.WriteString(", tot_wait: ") + buf.WriteString(s.totalWaitTime.String()) + } + } + } + } + copRPC := s.rpcStat.Stats[tikvrpc.CmdCop] + if copRPC != nil && copRPC.Count > 0 { + delete(s.rpcStat.Stats, tikvrpc.CmdCop) + buf.WriteString(", rpc_num: ") + buf.WriteString(strconv.FormatInt(copRPC.Count, 10)) + buf.WriteString(", rpc_time: ") + buf.WriteString(time.Duration(copRPC.Consume).String()) + } + buf.WriteString("}") + + rpcStatsStr := s.rpcStat.String() + if len(rpcStatsStr) > 0 { + buf.WriteString(", ") + buf.WriteString(rpcStatsStr) + } + + if len(s.backoffSleep) > 0 { + buf.WriteString(", backoff{") + idx := 0 + for k, d := range s.backoffSleep { + if idx > 0 { + buf.WriteString(", ") + } + idx++ + buf.WriteString(fmt.Sprintf("%s: %s", k, d.String())) + } + buf.WriteString("}") + } + return buf.String() +} diff --git a/distsql/select_result_test.go b/distsql/select_result_test.go index ada163df3859c..469d687517752 100644 --- a/distsql/select_result_test.go +++ b/distsql/select_result_test.go @@ -18,6 +18,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/mock" "github.com/pingcap/tipb/go-tipb" @@ -29,7 +30,7 @@ func (s *testSuite) TestUpdateCopRuntimeStats(c *C) { sr := selectResult{ctx: ctx} c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl, IsNil) sr.rootPlanID = 1234 - sr.updateCopRuntimeStats(context.Background(), &execdetails.ExecDetails{CalleeAddress: "a"}, 0) + sr.updateCopRuntimeStats(context.Background(), &tikv.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "a"}}, 0) ctx.GetSessionVars().StmtCtx.RuntimeStatsColl = execdetails.NewRuntimeStatsColl() t := uint64(1) @@ -39,12 +40,12 @@ func (s *testSuite) TestUpdateCopRuntimeStats(c *C) { }, } c.Assert(len(sr.selectResp.GetExecutionSummaries()) != len(sr.copPlanIDs), IsTrue) - sr.updateCopRuntimeStats(context.Background(), &execdetails.ExecDetails{CalleeAddress: "callee"}, 0) + sr.updateCopRuntimeStats(context.Background(), &tikv.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "callee"}}, 0) c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.ExistsCopStats(1234), IsFalse) sr.copPlanIDs = []int{sr.rootPlanID} c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl, NotNil) c.Assert(len(sr.selectResp.GetExecutionSummaries()), Equals, len(sr.copPlanIDs)) - sr.updateCopRuntimeStats(context.Background(), &execdetails.ExecDetails{CalleeAddress: "callee"}, 0) + sr.updateCopRuntimeStats(context.Background(), &tikv.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "callee"}}, 0) c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetCopStats(1234).String(), Equals, "time:1ns, loops:1") } diff --git a/distsql/stream.go b/distsql/stream.go index 934eb28e16885..f1817084cdf44 100644 --- a/distsql/stream.go +++ b/distsql/stream.go @@ -106,11 +106,15 @@ func (r *streamResult) readDataFromResponse(ctx context.Context, resp kv.Respons } r.feedback.Update(resultSubset.GetStartKey(), stream.OutputCounts) r.partialCount++ - resultDetail := resultSubset.GetExecDetails() - if resultDetail != nil { - resultDetail.CopTime = duration + + hasStats, ok := resultSubset.(CopRuntimeStats) + if ok { + copStats := hasStats.GetCopRuntimeStats() + if copStats != nil { + copStats.CopTime = duration + r.ctx.GetSessionVars().StmtCtx.MergeExecDetails(&copStats.ExecDetails, nil) + } } - r.ctx.GetSessionVars().StmtCtx.MergeExecDetails(resultDetail, nil) return false, nil } diff --git a/kv/kv.go b/kv/kv.go index 3b814a8733096..314bb4ac4c296 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -20,7 +20,6 @@ import ( "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/store/tikv/oracle" - "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/memory" ) @@ -341,8 +340,6 @@ type ResultSubset interface { GetData() []byte // GetStartKey gets the start key. GetStartKey() Key - // GetExecDetails gets the detail information. - GetExecDetails() *execdetails.ExecDetails // MemSize returns how many bytes of memory this result use for tracing memory usage. MemSize() int64 // RespTime returns the response time for the request. diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index 3cfc3005bae65..b29b604c5ea87 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -1030,12 +1030,6 @@ func getRuntimeInfo(ctx sessionctx.Context, p Plan) (actRows, analyzeInfo, memor analyzeInfo = "time:0ns, loops:0" actRows = "0" } - switch p.(type) { - case *PhysicalTableReader, *PhysicalIndexReader, *PhysicalIndexLookUpReader: - if s := runtimeStatsColl.GetReaderStats(explainID); s != nil && len(s.String()) > 0 { - analyzeInfo += ", " + s.String() - } - } memoryInfo = "N/A" memTracker := ctx.GetSessionVars().StmtCtx.MemTracker.SearchTrackerWithoutLock(p.ID()) diff --git a/store/tikv/batch_coprocessor.go b/store/tikv/batch_coprocessor.go index 0ee09c06f24da..59db552d4a0de 100644 --- a/store/tikv/batch_coprocessor.go +++ b/store/tikv/batch_coprocessor.go @@ -26,7 +26,6 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/tikvrpc" - "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" "go.uber.org/zap" @@ -42,7 +41,7 @@ type batchCopTask struct { type batchCopResponse struct { pbResp *coprocessor.BatchResponse - detail *execdetails.ExecDetails + detail *CopRuntimeStats // batch Cop Response is yet to return startKey. So batchCop cannot retry partially. startKey kv.Key @@ -63,7 +62,7 @@ func (rs *batchCopResponse) GetStartKey() kv.Key { // GetExecDetails is unavailable currently, because TiFlash has not collected exec details for batch cop. // TODO: Will fix in near future. -func (rs *batchCopResponse) GetExecDetails() *execdetails.ExecDetails { +func (rs *batchCopResponse) GetCopRuntimeStats() *CopRuntimeStats { return rs.detail } @@ -77,9 +76,6 @@ func (rs *batchCopResponse) MemSize() int64 { rs.respSize += int64(cap(rs.startKey)) if rs.detail != nil { rs.respSize += int64(sizeofExecDetails) - if rs.detail.CommitDetail != nil { - rs.respSize += int64(sizeofCommitDetails) - } } if rs.pbResp != nil { // Using a approximate size since it's hard to get a accurate value. @@ -304,7 +300,7 @@ func (b *batchCopIterator) handleTask(ctx context.Context, bo *Backoffer, task * for idx := 0; idx < len(tasks); idx++ { ret, err := b.handleTaskOnce(ctx, bo, tasks[idx]) if err != nil { - resp := &batchCopResponse{err: errors.Trace(err), detail: new(execdetails.ExecDetails)} + resp := &batchCopResponse{err: errors.Trace(err), detail: new(CopRuntimeStats)} b.sendToRespCh(resp) break } @@ -415,7 +411,7 @@ func (b *batchCopIterator) handleBatchCopResponse(bo *Backoffer, response *copro resp := batchCopResponse{ pbResp: response, - detail: new(execdetails.ExecDetails), + detail: new(CopRuntimeStats), } resp.detail.BackoffTime = time.Duration(bo.totalSleep) * time.Millisecond diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index 8a936ae58197e..69e3d18a3cb11 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -433,7 +433,7 @@ type copIteratorTaskSender struct { type copResponse struct { pbResp *coprocessor.Response - detail *execdetails.ExecDetails + detail *CopRuntimeStats startKey kv.Key err error respSize int64 @@ -455,7 +455,7 @@ func (rs *copResponse) GetStartKey() kv.Key { return rs.startKey } -func (rs *copResponse) GetExecDetails() *execdetails.ExecDetails { +func (rs *copResponse) GetCopRuntimeStats() *CopRuntimeStats { return rs.detail } @@ -469,9 +469,6 @@ func (rs *copResponse) MemSize() int64 { rs.respSize += int64(cap(rs.startKey)) if rs.detail != nil { rs.respSize += int64(sizeofExecDetails) - if rs.detail.CommitDetail != nil { - rs.respSize += int64(sizeofCommitDetails) - } } if rs.pbResp != nil { // Using a approximate size since it's hard to get a accurate value. @@ -775,6 +772,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch }) req.StoreTp = task.storeType startTime := time.Now() + worker.Stats = make(map[tikvrpc.CmdType]*RPCRuntimeStats) resp, rpcCtx, storeAddr, err := worker.SendReqCtx(bo, req, task.region, ReadTimeoutMedium, task.storeType, task.storeAddr) if err != nil { if task.storeType == kv.TiDB { @@ -840,7 +838,7 @@ type clientHelper struct { *minCommitTSPushed Client resolveLite bool - stats map[tikvrpc.CmdType]*RegionRequestRuntimeStats + RegionRequestRuntimeStats } // ResolveLocks wraps the ResolveLocks function and store the resolved result. @@ -848,9 +846,9 @@ func (ch *clientHelper) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks var err error var resolvedLocks []uint64 var msBeforeTxnExpired int64 - if ch.stats != nil { + if ch.Stats != nil { defer func(start time.Time) { - recordRegionRequestRuntimeStats(ch.stats, tikvrpc.CmdResolveLock, time.Since(start)) + recordRegionRequestRuntimeStats(ch.Stats, tikvrpc.CmdResolveLock, time.Since(start)) }(time.Now()) } if ch.resolveLite { @@ -874,7 +872,7 @@ func (ch *clientHelper) SendReqCtx(bo *Backoffer, req *tikvrpc.Request, regionID if len(directStoreAddr) > 0 { sender.storeAddr = directStoreAddr } - sender.stats = ch.stats + sender.Stats = ch.Stats req.Context.ResolvedLocks = ch.minCommitTSPushed.Get() resp, ctx, err := sender.SendReqCtx(bo, req, regionID, timeout, sType) return resp, ctx, sender.storeAddr, err @@ -1023,8 +1021,9 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon resp.startKey = task.ranges.at(0).StartKey } if resp.detail == nil { - resp.detail = new(execdetails.ExecDetails) + resp.detail = new(CopRuntimeStats) } + resp.detail.Stats = worker.Stats resp.detail.BackoffTime = time.Duration(bo.totalSleep) * time.Millisecond resp.detail.BackoffSleep = make(map[string]time.Duration, len(bo.backoffTimes)) resp.detail.BackoffTimes = make(map[string]int, len(bo.backoffTimes)) @@ -1078,6 +1077,12 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon return nil, nil } +// CopRuntimeStats contains execution detail information. +type CopRuntimeStats struct { + execdetails.ExecDetails + RegionRequestRuntimeStats +} + func (worker *copIteratorWorker) handleTiDBSendReqErr(err error, task *copTask, ch chan<- *copResponse) error { errCode := errno.ErrUnknown errMsg := err.Error() @@ -1101,7 +1106,7 @@ func (worker *copIteratorWorker) handleTiDBSendReqErr(err error, task *copTask, pbResp: &coprocessor.Response{ Data: data, }, - detail: &execdetails.ExecDetails{}, + detail: &CopRuntimeStats{}, } worker.sendToRespCh(resp, ch, true) return nil diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go index 4e4a3bab710e5..57ba5f0df8d90 100644 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -14,7 +14,9 @@ package tikv import ( + "bytes" "context" + "fmt" "strconv" "sync" "sync/atomic" @@ -61,14 +63,54 @@ type RegionRequestSender struct { storeAddr string rpcError error failStoreIDs map[uint64]struct{} - stats map[tikvrpc.CmdType]*RegionRequestRuntimeStats + RegionRequestRuntimeStats } // RegionRequestRuntimeStats records the runtime stats of send region requests. type RegionRequestRuntimeStats struct { - count int64 + Stats map[tikvrpc.CmdType]*RPCRuntimeStats +} + +// NewRegionRequestRuntimeStats returns a new RegionRequestRuntimeStats. +func NewRegionRequestRuntimeStats() RegionRequestRuntimeStats { + return RegionRequestRuntimeStats{ + Stats: make(map[tikvrpc.CmdType]*RPCRuntimeStats), + } +} + +// RPCRuntimeStats indicates the RPC request count and consume time. +type RPCRuntimeStats struct { + Count int64 // Send region request consume time. - consume int64 + Consume int64 +} + +// String implements fmt.Stringer interface. +func (r *RegionRequestRuntimeStats) String() string { + var buf bytes.Buffer + for k, v := range r.Stats { + if buf.Len() > 0 { + buf.WriteByte(',') + } + buf.WriteString(fmt.Sprintf("%s:{num_rpc:%d, total_time:%s}", k.String(), v.Count, time.Duration(v.Consume))) + } + return buf.String() +} + +// Merge merges other RegionRequestRuntimeStats. +func (r *RegionRequestRuntimeStats) Merge(rs RegionRequestRuntimeStats) { + for cmd, v := range rs.Stats { + stat, ok := r.Stats[cmd] + if !ok { + r.Stats[cmd] = &RPCRuntimeStats{ + Count: v.Count, + Consume: v.Consume, + } + continue + } + stat.Count += v.Count + stat.Consume += v.Consume + } } // RegionBatchRequestSender sends BatchCop requests to TiFlash server by stream way. @@ -92,9 +134,9 @@ func (ss *RegionBatchRequestSender) sendStreamReqToAddr(bo *Backoffer, ctxs []co if rawHook := ctx.Value(RPCCancellerCtxKey{}); rawHook != nil { ctx, cancel = rawHook.(*RPCCanceller).WithCancel(ctx) } - if ss.stats != nil { + if ss.Stats != nil { defer func(start time.Time) { - recordRegionRequestRuntimeStats(ss.stats, req.Type, time.Since(start)) + recordRegionRequestRuntimeStats(ss.Stats, req.Type, time.Since(start)) }(time.Now()) } resp, err = ss.client.SendRequest(ctx, rpcCtx.Addr, req, timout) @@ -111,17 +153,17 @@ func (ss *RegionBatchRequestSender) sendStreamReqToAddr(bo *Backoffer, ctxs []co return } -func recordRegionRequestRuntimeStats(stats map[tikvrpc.CmdType]*RegionRequestRuntimeStats, cmd tikvrpc.CmdType, d time.Duration) { +func recordRegionRequestRuntimeStats(stats map[tikvrpc.CmdType]*RPCRuntimeStats, cmd tikvrpc.CmdType, d time.Duration) { stat, ok := stats[cmd] if !ok { - stats[cmd] = &RegionRequestRuntimeStats{ - count: 1, - consume: int64(d), + stats[cmd] = &RPCRuntimeStats{ + Count: 1, + Consume: int64(d), } return } - stat.count++ - stat.consume += int64(d) + stat.Count++ + stat.Consume += int64(d) } func (ss *RegionBatchRequestSender) onSendFail(bo *Backoffer, ctxs []copTaskAndRPCContext, err error) error { @@ -344,9 +386,10 @@ func (s *RegionRequestSender) sendReqToRegion(bo *Backoffer, rpcCtx *RPCContext, } defer s.releaseStoreToken(rpcCtx.Store) } - if s.stats != nil { + + if s.Stats != nil { defer func(start time.Time) { - recordRegionRequestRuntimeStats(s.stats, req.Type, time.Since(start)) + recordRegionRequestRuntimeStats(s.Stats, req.Type, time.Since(start)) }(time.Now()) } ctx := bo.ctx diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index 1b812392b1426..f78a9720fecf0 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -236,9 +236,9 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll Client: s.store.client, } if s.mu.stats != nil { - cli.stats = make(map[tikvrpc.CmdType]*RegionRequestRuntimeStats) + cli.Stats = make(map[tikvrpc.CmdType]*RPCRuntimeStats) defer func() { - s.mergeRegionRequestStats(cli.stats) + s.mergeRegionRequestStats(cli.Stats) }() } @@ -364,9 +364,9 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) { resolveLite: true, } if s.mu.stats != nil { - cli.stats = make(map[tikvrpc.CmdType]*RegionRequestRuntimeStats) + cli.Stats = make(map[tikvrpc.CmdType]*RPCRuntimeStats) defer func() { - s.mergeRegionRequestStats(cli.stats) + s.mergeRegionRequestStats(cli.Stats) }() } @@ -591,30 +591,30 @@ func (s *tikvSnapshot) recordBackoffInfo(bo *Backoffer) { } } -func (s *tikvSnapshot) mergeRegionRequestStats(stats map[tikvrpc.CmdType]*RegionRequestRuntimeStats) { +func (s *tikvSnapshot) mergeRegionRequestStats(stats map[tikvrpc.CmdType]*RPCRuntimeStats) { s.mu.Lock() defer s.mu.Unlock() if s.mu.stats == nil { return } - if s.mu.stats.rpcStats == nil { - s.mu.stats.rpcStats = stats + if s.mu.stats.rpcStats.Stats == nil { + s.mu.stats.rpcStats.Stats = stats return } for k, v := range stats { - stat, ok := s.mu.stats.rpcStats[k] + stat, ok := s.mu.stats.rpcStats.Stats[k] if !ok { - s.mu.stats.rpcStats[k] = v + s.mu.stats.rpcStats.Stats[k] = v continue } - stat.count += v.count - stat.consume += v.consume + stat.Count += v.Count + stat.Consume += v.Consume } } // SnapshotRuntimeStats records the runtime stats of snapshot. type SnapshotRuntimeStats struct { - rpcStats map[tikvrpc.CmdType]*RegionRequestRuntimeStats + rpcStats RegionRequestRuntimeStats backoffSleepMS map[backoffType]int backoffTimes map[backoffType]int } @@ -622,12 +622,7 @@ type SnapshotRuntimeStats struct { // String implements fmt.Stringer interface. func (rs *SnapshotRuntimeStats) String() string { var buf bytes.Buffer - for k, v := range rs.rpcStats { - if buf.Len() > 0 { - buf.WriteByte(',') - } - buf.WriteString(fmt.Sprintf("%s:{num_rpc:%d, total_time:%s}", k.String(), v.count, time.Duration(v.consume))) - } + buf.WriteString(rs.rpcStats.String()) for k, v := range rs.backoffTimes { if buf.Len() > 0 { buf.WriteByte(',') diff --git a/store/tikv/snapshot_test.go b/store/tikv/snapshot_test.go index 36fa86645ba9f..518ef35925951 100644 --- a/store/tikv/snapshot_test.go +++ b/store/tikv/snapshot_test.go @@ -303,13 +303,13 @@ func (s *testSnapshotSuite) TestSnapshotThreadSafe(c *C) { } func (s *testSnapshotSuite) TestSnapshotRuntimeStats(c *C) { - reqStats := make(map[tikvrpc.CmdType]*RegionRequestRuntimeStats) - recordRegionRequestRuntimeStats(reqStats, tikvrpc.CmdGet, time.Second) - recordRegionRequestRuntimeStats(reqStats, tikvrpc.CmdGet, time.Millisecond) + reqStats := NewRegionRequestRuntimeStats() + recordRegionRequestRuntimeStats(reqStats.Stats, tikvrpc.CmdGet, time.Second) + recordRegionRequestRuntimeStats(reqStats.Stats, tikvrpc.CmdGet, time.Millisecond) snapshot := newTiKVSnapshot(s.store, kv.Version{Ver: 0}, 0) snapshot.SetOption(kv.CollectRuntimeStats, &SnapshotRuntimeStats{}) - snapshot.mergeRegionRequestStats(reqStats) - snapshot.mergeRegionRequestStats(reqStats) + snapshot.mergeRegionRequestStats(reqStats.Stats) + snapshot.mergeRegionRequestStats(reqStats.Stats) bo := NewBackofferWithVars(context.Background(), 2000, nil) err := bo.BackoffWithMaxSleep(boTxnLockFast, 30, errors.New("test")) c.Assert(err, IsNil) diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go index 9bbc09fbdf40c..8f480e1354a20 100644 --- a/util/execdetails/execdetails.go +++ b/util/execdetails/execdetails.go @@ -323,49 +323,6 @@ func (crs *CopRuntimeStats) String() string { procTimes[n-1], procTimes[0], procTimes[n*4/5], procTimes[n*19/20], totalIters, totalTasks) } -// ReaderRuntimeStats collects stats for TableReader, IndexReader and IndexLookupReader -type ReaderRuntimeStats struct { - sync.Mutex - - copRespTime []time.Duration - procKeys []int64 -} - -// recordOneCopTask record once cop response time to update maxcopRespTime -func (rrs *ReaderRuntimeStats) recordOneCopTask(t time.Duration, detail *ExecDetails) { - rrs.Lock() - defer rrs.Unlock() - rrs.copRespTime = append(rrs.copRespTime, t) - rrs.procKeys = append(rrs.procKeys, detail.ProcessedKeys) -} - -func (rrs *ReaderRuntimeStats) String() string { - size := len(rrs.copRespTime) - if size == 0 { - return "" - } - if size == 1 { - return fmt.Sprintf("rpc num: 1, rpc time:%v, proc keys:%v", rrs.copRespTime[0], rrs.procKeys[0]) - } - sort.Slice(rrs.copRespTime, func(i, j int) bool { - return rrs.copRespTime[i] < rrs.copRespTime[j] - }) - vMax, vMin := rrs.copRespTime[size-1], rrs.copRespTime[0] - vP80, vP95 := rrs.copRespTime[size*4/5], rrs.copRespTime[size*19/20] - sum := 0.0 - for _, t := range rrs.copRespTime { - sum += float64(t) - } - vAvg := time.Duration(sum / float64(size)) - - sort.Slice(rrs.procKeys, func(i, j int) bool { - return rrs.procKeys[i] < rrs.procKeys[j] - }) - keyMax := rrs.procKeys[size-1] - keyP95 := rrs.procKeys[size*19/20] - return fmt.Sprintf("rpc num: %v, rpc max:%v, min:%v, avg:%v, p80:%v, p95:%v, proc keys max:%v, p95:%v", size, vMax, vMin, vAvg, vP80, vP95, keyMax, keyP95) -} - // RuntimeStats is used to express the executor runtime information. type RuntimeStats interface { GetActRows() int64 @@ -406,16 +363,15 @@ func (e *BasicRuntimeStats) String() string { // RuntimeStatsColl collects executors's execution info. type RuntimeStatsColl struct { - mu sync.Mutex - rootStats map[int]RuntimeStats - copStats map[int]*CopRuntimeStats - readerStats map[int]*ReaderRuntimeStats + mu sync.Mutex + rootStats map[int]RuntimeStats + copStats map[int]*CopRuntimeStats } // NewRuntimeStatsColl creates new executor collector. func NewRuntimeStatsColl() *RuntimeStatsColl { return &RuntimeStatsColl{rootStats: make(map[int]RuntimeStats), - copStats: make(map[int]*CopRuntimeStats), readerStats: make(map[int]*ReaderRuntimeStats)} + copStats: make(map[int]*CopRuntimeStats)} } // RegisterStats register execStat for a executor. @@ -455,12 +411,6 @@ func (e *RuntimeStatsColl) RecordOneCopTask(planID int, address string, summary copStats.RecordOneCopTask(address, summary) } -// RecordOneReaderStats records a specific stats for TableReader, IndexReader and IndexLookupReader. -func (e *RuntimeStatsColl) RecordOneReaderStats(planID int, copRespTime time.Duration, detail *ExecDetails) { - readerStats := e.GetReaderStats(planID) - readerStats.recordOneCopTask(copRespTime, detail) -} - // ExistsRootStats checks if the planID exists in the rootStats collection. func (e *RuntimeStatsColl) ExistsRootStats(planID int) bool { e.mu.Lock() @@ -477,18 +427,6 @@ func (e *RuntimeStatsColl) ExistsCopStats(planID int) bool { return exists } -// GetReaderStats gets the ReaderRuntimeStats specified by planID. -func (e *RuntimeStatsColl) GetReaderStats(planID int) *ReaderRuntimeStats { - e.mu.Lock() - defer e.mu.Unlock() - stats, exists := e.readerStats[planID] - if !exists { - stats = &ReaderRuntimeStats{copRespTime: make([]time.Duration, 0, 20)} - e.readerStats[planID] = stats - } - return stats -} - // ConcurrencyInfo is used to save the concurrency information of the executor operator type ConcurrencyInfo struct { concurrencyName string diff --git a/util/execdetails/execdetails_test.go b/util/execdetails/execdetails_test.go index e59b1b80df84b..65a5c5b9797f4 100644 --- a/util/execdetails/execdetails_test.go +++ b/util/execdetails/execdetails_test.go @@ -114,24 +114,3 @@ func TestCopRuntimeStats(t *testing.T) { t.Fatal("table_reader not exists") } } - -func TestReaderStats(t *testing.T) { - r := new(ReaderRuntimeStats) - if r.String() != "" { - t.Fatal() - } - - r.procKeys = append(r.procKeys, 100) - r.copRespTime = append(r.copRespTime, time.Millisecond*100) - if r.String() != "rpc num: 1, rpc time:100ms, proc keys:100" { - t.Fatal() - } - - for i := 0; i < 100; i++ { - r.procKeys = append(r.procKeys, int64(i)) - r.copRespTime = append(r.copRespTime, time.Millisecond*time.Duration(i)) - } - if r.String() != "rpc num: 101, rpc max:100ms, min:0s, avg:50ms, p80:80ms, p95:95ms, proc keys max:100, p95:95" { - t.Fatal() - } -}