Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: merge the runtime stats in time to avoid using too many memory #39394

Merged
merged 19 commits into from
Nov 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ func TestSelectWithRuntimeStats(t *testing.T) {
}

func TestSelectResultRuntimeStats(t *testing.T) {
basic := &execdetails.BasicRuntimeStats{}
stmtStats := execdetails.NewRuntimeStatsColl(nil)
basic := stmtStats.GetBasicRuntimeStats(1)
basic.Record(time.Second, 20)
s1 := &selectResultRuntimeStats{
copRespTime: []time.Duration{time.Second, time.Millisecond},
Expand All @@ -120,8 +121,6 @@ func TestSelectResultRuntimeStats(t *testing.T) {
}

s2 := *s1
stmtStats := execdetails.NewRuntimeStatsColl(nil)
stmtStats.RegisterStats(1, basic)
stmtStats.RegisterStats(1, s1)
stmtStats.RegisterStats(1, &s2)
stats := stmtStats.GetRootStats(1)
Expand All @@ -136,7 +135,7 @@ func TestSelectResultRuntimeStats(t *testing.T) {
}
stmtStats.RegisterStats(2, s1)
stats = stmtStats.GetRootStats(2)
expect = "cop_task: {num: 2, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 1s, tot_wait: 1s, rpc_num: 1, rpc_time: 1s, copr_cache_hit_ratio: 0.00, distsql_concurrency: 15}, backoff{RegionMiss: 1ms}"
expect = "time:0s, loops:0, cop_task: {num: 2, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 1s, tot_wait: 1s, rpc_num: 1, rpc_time: 1s, copr_cache_hit_ratio: 0.00, distsql_concurrency: 15}, backoff{RegionMiss: 1ms}"
require.Equal(t, expect, stats.String())
// Test for idempotence.
require.Equal(t, expect, stats.String())
Expand Down
5 changes: 3 additions & 2 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,13 +359,11 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr
}

if r.stats == nil {
id := r.rootPlanID
r.stats = &selectResultRuntimeStats{
backoffSleep: make(map[string]time.Duration),
rpcStat: tikv.NewRegionRequestRuntimeStats(),
distSQLConcurrency: r.distSQLConcurrency,
}
r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(id, r.stats)
}
r.stats.mergeCopRuntimeStats(copStats, respTime)

Expand Down Expand Up @@ -456,6 +454,9 @@ func (r *selectResult) Close() error {
if respSize > 0 {
r.memConsume(-respSize)
}
if r.stats != nil {
defer r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(r.rootPlanID, r.stats)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why use defer here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use defer to ensure that the Close()(make sure all goroutines have been exit in some parallel executer) is finished before registering stats.

}
return r.resp.Close()
}

Expand Down
6 changes: 3 additions & 3 deletions distsql/select_result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestUpdateCopRuntimeStats(t *testing.T) {
require.Nil(t, ctx.GetSessionVars().StmtCtx.RuntimeStatsColl)

sr.rootPlanID = 1234
sr.updateCopRuntimeStats(context.Background(), &copr.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "a"}}, 0)
sr.updateCopRuntimeStats(context.Background(), &copr.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{DetailsNeedP90: execdetails.DetailsNeedP90{CalleeAddress: "a"}}}, 0)

ctx.GetSessionVars().StmtCtx.RuntimeStatsColl = execdetails.NewRuntimeStatsColl(nil)
i := uint64(1)
Expand All @@ -46,13 +46,13 @@ func TestUpdateCopRuntimeStats(t *testing.T) {

require.NotEqual(t, len(sr.copPlanIDs), len(sr.selectResp.GetExecutionSummaries()))

sr.updateCopRuntimeStats(context.Background(), &copr.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "callee"}}, 0)
sr.updateCopRuntimeStats(context.Background(), &copr.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{DetailsNeedP90: execdetails.DetailsNeedP90{CalleeAddress: "callee"}}}, 0)
require.False(t, ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.ExistsCopStats(1234))

sr.copPlanIDs = []int{sr.rootPlanID}
require.NotNil(t, ctx.GetSessionVars().StmtCtx.RuntimeStatsColl)
require.Equal(t, len(sr.copPlanIDs), len(sr.selectResp.GetExecutionSummaries()))

sr.updateCopRuntimeStats(context.Background(), &copr.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "callee"}}, 0)
sr.updateCopRuntimeStats(context.Background(), &copr.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{DetailsNeedP90: execdetails.DetailsNeedP90{CalleeAddress: "callee"}}}, 0)
require.Equal(t, "tikv_task:{time:1ns, loops:1}", ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetOrCreateCopStats(1234, "tikv").String())
}
4 changes: 3 additions & 1 deletion executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@ func (d *HashAggIntermData) getPartialResultBatch(_ *stmtctx.StatementContext, p

// Close implements the Executor Close interface.
func (e *HashAggExec) Close() error {
if e.stats != nil {
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
if e.isUnparallelExec {
var firstErr error
e.childResult = nil
Expand Down Expand Up @@ -1131,7 +1134,6 @@ func (e *HashAggExec) initRuntimeStats() {
stats.PartialStats = make([]*AggWorkerStat, 0, stats.PartialConcurrency)
stats.FinalStats = make([]*AggWorkerStat, 0, stats.FinalConcurrency)
e.stats = stats
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
}

Expand Down
3 changes: 3 additions & 0 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ func MockNewCacheTableSnapShot(snapshot kv.Snapshot, memBuffer kv.MemBuffer) *ca

// Close implements the Executor interface.
func (e *BatchPointGetExec) Close() error {
if e.runtimeStats != nil {
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
if e.runtimeStats != nil && e.snapshot != nil {
e.snapshot.SetOption(kv.CollectRuntimeStats, nil)
}
Expand Down
1 change: 0 additions & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4906,7 +4906,6 @@ func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan
SnapshotRuntimeStats: snapshotStats,
}
e.snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats)
b.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}

if plan.IndexInfo != nil {
Expand Down
6 changes: 4 additions & 2 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,9 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookup

// Close implements Exec Close interface.
func (e *IndexLookUpExecutor) Close() error {
if e.stats != nil {
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
}
e.kvRanges = e.kvRanges[:0]
if e.dummy {
return nil
Expand Down Expand Up @@ -808,7 +811,6 @@ func (e *IndexLookUpExecutor) initRuntimeStats() {
indexScanBasicStats: &execdetails.BasicRuntimeStats{},
Concurrency: e.ctx.GetSessionVars().IndexLookupConcurrency(),
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
}

Expand Down Expand Up @@ -876,7 +878,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes
idxID := w.idxLookup.getIndexPlanRootID()
if w.idxLookup.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil {
if idxID != w.idxLookup.id && w.idxLookup.stats != nil {
w.idxLookup.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(idxID, w.idxLookup.stats.indexScanBasicStats)
w.idxLookup.stats.indexScanBasicStats = w.idxLookup.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetBasicRuntimeStats(idxID)
}
}
for {
Expand Down
3 changes: 1 addition & 2 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,7 @@ func newBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id int,
}
if ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil {
if e.id > 0 {
e.runtimeStats = &execdetails.BasicRuntimeStats{}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(id, e.runtimeStats)
e.runtimeStats = e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetBasicRuntimeStats(id)
}
}
if schema != nil {
Expand Down
4 changes: 3 additions & 1 deletion executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ func (e *IndexNestedLoopHashJoin) Open(ctx context.Context) error {
e.innerPtrBytes = make([][]byte, 0, 8)
if e.runtimeStats != nil {
e.stats = &indexLookUpJoinRuntimeStats{}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
e.finished.Store(false)
return nil
Expand Down Expand Up @@ -288,6 +287,9 @@ func (e *IndexNestedLoopHashJoin) isDryUpTasks(ctx context.Context) bool {

// Close implements the IndexNestedLoopHashJoin Executor interface.
func (e *IndexNestedLoopHashJoin) Close() error {
if e.stats != nil {
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
if e.cancelFunc != nil {
e.cancelFunc()
}
Expand Down
4 changes: 3 additions & 1 deletion executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ func (e *IndexLookUpJoin) Open(ctx context.Context) error {
e.finished.Store(false)
if e.runtimeStats != nil {
e.stats = &indexLookUpJoinRuntimeStats{}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
e.cancelFunc = nil
return nil
Expand Down Expand Up @@ -765,6 +764,9 @@ func (iw *innerWorker) hasNullInJoinKey(row chunk.Row) bool {

// Close implements the Executor interface.
func (e *IndexLookUpJoin) Close() error {
if e.stats != nil {
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
if e.cancelFunc != nil {
e.cancelFunc()
}
Expand Down
3 changes: 3 additions & 0 deletions executor/index_lookup_merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,9 @@ func (imw *innerMergeWorker) fetchNextInnerResult(ctx context.Context, task *loo

// Close implements the Executor interface.
func (e *IndexLookUpMergeJoin) Close() error {
if e.runtimeStats != nil {
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.runtimeStats)
}
if e.cancelFunc != nil {
e.cancelFunc()
e.cancelFunc = nil
Expand Down
7 changes: 4 additions & 3 deletions executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,6 @@ func (e *IndexMergeReaderExecutor) initRuntimeStats() {
e.stats = &IndexMergeRuntimeStat{
Concurrency: e.ctx.GetSessionVars().IndexLookupConcurrency(),
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
}

Expand Down Expand Up @@ -704,6 +703,9 @@ func (e *IndexMergeReaderExecutor) handleHandlesFetcherPanic(ctx context.Context

// Close implements Exec Close interface.
func (e *IndexMergeReaderExecutor) Close() error {
if e.stats != nil {
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
if e.finished == nil {
return nil
}
Expand Down Expand Up @@ -821,8 +823,7 @@ func (w *partialIndexWorker) fetchHandles(
var basicStats *execdetails.BasicRuntimeStats
if w.stats != nil {
if w.idxID != 0 {
basicStats = &execdetails.BasicRuntimeStats{}
w.sc.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(w.idxID, basicStats)
basicStats = w.sc.GetSessionVars().StmtCtx.RuntimeStatsColl.GetBasicRuntimeStats(w.idxID)
}
}
for {
Expand Down
3 changes: 3 additions & 0 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,9 @@ func (e *InsertExec) Next(ctx context.Context, req *chunk.Chunk) error {

// Close implements the Executor Close interface.
func (e *InsertExec) Close() error {
if e.runtimeStats != nil && e.stats != nil {
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
defer e.memTracker.ReplaceBytesUsed(0)
e.ctx.GetSessionVars().CurrInsertValues = chunk.Row{}
e.ctx.GetSessionVars().CurrInsertBatchExtraCols = e.ctx.GetSessionVars().CurrInsertBatchExtraCols[0:0:0]
Expand Down
1 change: 0 additions & 1 deletion executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -1097,7 +1097,6 @@ func (e *InsertValues) collectRuntimeStatsEnabled() bool {
for _, fkc := range e.fkChecks {
fkc.stats = e.stats.FKCheckStats
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
return true
}
Expand Down
17 changes: 15 additions & 2 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ func (e *HashJoinExec) Close() error {
if e.stats != nil && e.rowContainer != nil {
e.stats.hashStat = *e.rowContainer.stat
}
if e.stats != nil {
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
err := e.baseExecutor.Close()
return err
}
Expand Down Expand Up @@ -210,7 +213,6 @@ func (e *HashJoinExec) Open(ctx context.Context) error {
e.stats = &hashJoinRuntimeStats{
concurrent: int(e.concurrency),
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
return nil
}
Expand Down Expand Up @@ -1271,7 +1273,6 @@ func (e *NestedLoopApplyExec) Close() error {
e.memTracker = nil
if e.runtimeStats != nil {
runtimeStats := newJoinRuntimeStats()
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats)
if e.canUseCache {
var hitRatio float64
if e.cacheAccessCounter > 0 {
Expand All @@ -1282,6 +1283,7 @@ func (e *NestedLoopApplyExec) Close() error {
runtimeStats.setCacheInfo(false, 0)
}
runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", 0))
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats)
}
return e.outerExec.Close()
}
Expand Down Expand Up @@ -1539,6 +1541,17 @@ func (e *joinRuntimeStats) Tp() int {
return execdetails.TpJoinRuntimeStats
}

func (e *joinRuntimeStats) Clone() execdetails.RuntimeStats {
newJRS := &joinRuntimeStats{
RuntimeStatsWithConcurrencyInfo: e.RuntimeStatsWithConcurrencyInfo,
applyCache: e.applyCache,
cache: e.cache,
hasHashStat: e.hasHashStat,
hashStat: e.hashStat,
}
return newJRS
}

type hashJoinRuntimeStats struct {
fetchAndBuildHashTable time.Duration
hashStat hashStatistic
Expand Down
3 changes: 3 additions & 0 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ func (e *LoadDataExec) Next(ctx context.Context, req *chunk.Chunk) error {

// Close implements the Executor Close interface.
func (e *LoadDataExec) Close() error {
if e.runtimeStats != nil && e.loadDataInfo != nil && e.loadDataInfo.stats != nil {
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.loadDataInfo.stats)
}
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion executor/parallel_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ func (e *ParallelNestedLoopApplyExec) Close() error {

if e.runtimeStats != nil {
runtimeStats := newJoinRuntimeStats()
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats)
if e.useCache {
var hitRatio float64
if e.cacheAccessCounter > 0 {
Expand All @@ -187,6 +186,7 @@ func (e *ParallelNestedLoopApplyExec) Close() error {
runtimeStats.setCacheInfo(false, 0)
}
runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", e.concurrency))
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats)
}
return err
}
Expand Down
4 changes: 3 additions & 1 deletion executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor {
SnapshotRuntimeStats: snapshotStats,
}
e.snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats)
b.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}

if p.IndexInfo != nil {
Expand Down Expand Up @@ -194,6 +193,9 @@ func (e *PointGetExecutor) Open(context.Context) error {

// Close implements the Executor interface.
func (e *PointGetExecutor) Close() error {
if e.stats != nil {
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
if e.runtimeStats != nil && e.snapshot != nil {
e.snapshot.SetOption(kv.CollectRuntimeStats, nil)
}
Expand Down
3 changes: 3 additions & 0 deletions executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type ReplaceExec struct {
// Close implements the Executor Close interface.
func (e *ReplaceExec) Close() error {
e.setMessage()
if e.runtimeStats != nil && e.stats != nil {
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
if e.SelectExec != nil {
return e.SelectExec.Close()
}
Expand Down
2 changes: 1 addition & 1 deletion executor/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ func (e *UpdateExec) Close() error {
if err == nil && txn.Valid() && txn.GetSnapshot() != nil {
txn.GetSnapshot().SetOption(kv.CollectRuntimeStats, nil)
}
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
return e.children[0].Close()
}
Expand Down Expand Up @@ -463,7 +464,6 @@ func (e *UpdateExec) collectRuntimeStatsEnabled() bool {
SnapshotRuntimeStats: &txnsnapshot.SnapshotRuntimeStats{},
AllocatorRuntimeStats: autoid.NewAllocatorRuntimeStats(),
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
return true
}
Expand Down
Loading