Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor:add runtime information for StreamAggExec #20861

Closed
wants to merge 12 commits into from
71 changes: 69 additions & 2 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"sync"
"time"

"github.com/cznic/mathutil"
"github.com/pingcap/errors"
Expand Down Expand Up @@ -835,6 +836,7 @@ type StreamAggExec struct {
childResult *chunk.Chunk

memTracker *memory.Tracker // track memory usage.
stats *StreamAggRuntimeStat
}

// Open implements the Executor Open interface.
Expand All @@ -847,7 +849,7 @@ func (e *StreamAggExec) Open(ctx context.Context) error {
e.isChildReturnEmpty = true
e.inputIter = chunk.NewIterator4Chunk(e.childResult)
e.inputRow = e.inputIter.End()

e.initRuntimeStat()
e.partialResults = make([]aggfuncs.PartialResult, 0, len(e.aggFuncs))
for _, aggFunc := range e.aggFuncs {
partialResult, memDelta := aggFunc.AllocPartialResult()
Expand Down Expand Up @@ -884,12 +886,17 @@ func (e *StreamAggExec) Next(ctx context.Context, req *chunk.Chunk) (err error)
}

func (e *StreamAggExec) consumeOneGroup(ctx context.Context, chk *chunk.Chunk) (err error) {
var start time.Time
if e.groupChecker.isExhausted() {
if err = e.consumeCurGroupRowsAndFetchChild(ctx, chk); err != nil {
return err
}
if !e.executed {
start = time.Now()
_, err := e.groupChecker.splitIntoGroups(e.childResult)
if e.stats != nil {
e.stats.SplitTime += time.Since(start)
}
if err != nil {
return err
}
Expand All @@ -907,8 +914,11 @@ func (e *StreamAggExec) consumeOneGroup(ctx context.Context, chk *chunk.Chunk) (
if err = e.consumeCurGroupRowsAndFetchChild(ctx, chk); err != nil || e.executed {
return err
}

start = time.Now()
isFirstGroupSameAsPrev, err := e.groupChecker.splitIntoGroups(e.childResult)
if e.stats != nil {
e.stats.SplitTime += time.Since(start)
}
if err != nil {
return err
}
Expand Down Expand Up @@ -948,6 +958,7 @@ func (e *StreamAggExec) consumeGroupRows() error {

func (e *StreamAggExec) consumeCurGroupRowsAndFetchChild(ctx context.Context, chk *chunk.Chunk) (err error) {
// Before fetching a new batch of input, we should consume the last group.
start := time.Now()
err = e.consumeGroupRows()
if err != nil {
return err
Expand All @@ -973,6 +984,9 @@ func (e *StreamAggExec) consumeCurGroupRowsAndFetchChild(ctx context.Context, ch
// Reach here, "e.childrenResults[0].NumRows() > 0" is guaranteed.
e.isChildReturnEmpty = false
e.inputRow = e.inputIter.Begin()
if e.stats != nil {
e.stats.AllocateTime += time.Since(start)
}
return nil
}

Expand All @@ -992,6 +1006,59 @@ func (e *StreamAggExec) appendResult2Chunk(chk *chunk.Chunk) error {
return nil
}

func (e *StreamAggExec) initRuntimeStat() {
if e.runtimeStats != nil {
if e.stats == nil {
e.stats = &StreamAggRuntimeStat{
AllocateTime: 0,
SplitTime: 0,
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
}
}

// StreamAggRuntimeStat record the StreamAgg runtime stat
type StreamAggRuntimeStat struct {
AllocateTime time.Duration
SplitTime time.Duration
}

func (e *StreamAggRuntimeStat) String() string {
var result bytes.Buffer
if e.AllocateTime != 0 {
result.WriteString(fmt.Sprintf("allocate_time:%v", e.AllocateTime))
}
if e.SplitTime != 0 {
if result.Len() > 0 {
result.WriteByte(',')
}
result.WriteString(fmt.Sprintf(" split_time:%v", e.SplitTime))
}
return result.String()
}

// Clone implements the RuntimeStats interface.
func (e *StreamAggRuntimeStat) Clone() execdetails.RuntimeStats {
newRs := *e
return &newRs
}

// Merge implements the RuntimeStats interface.
func (e *StreamAggRuntimeStat) Merge(other execdetails.RuntimeStats) {
tmp, ok := other.(*StreamAggRuntimeStat)
if !ok {
return
}
e.AllocateTime += tmp.AllocateTime
e.SplitTime += tmp.SplitTime
}

// Tp implements the RuntimeStats interface.
func (e *StreamAggRuntimeStat) Tp() int {
return execdetails.TpStreamAggRuntimeStat
}

// vecGroupChecker is used to split a given chunk according to the `group by` expression in a vectorized manner
// It is usually used for streamAgg
type vecGroupChecker struct {
Expand Down
13 changes: 13 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6527,6 +6527,19 @@ func (s *testSerialSuite1) TestCollectCopRuntimeStats(c *C) {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/tikvStoreRespResult"), IsNil)
}

func (s *testSerialSuite1) TestStreamAggRuntimeStats(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test;")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1 (a int, b int)")
tk.MustExec("insert into t1 values (1,2),(2,3),(3,4)")
sql := "explain analyze SELECT /*+ STREAM_AGG() */ sum(a) FROM t1 WHERE a < 10;"
rows := tk.MustQuery(sql).Rows()
c.Assert(len(rows), Equals, 5)
explain := fmt.Sprintf("%v", rows[0])
c.Assert(explain, Matches, ".*time:.*loops:.*allocate_time:.*split_time:.*")
}

func (s *testSerialSuite1) TestIndexlookupRuntimeStats(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test;")
Expand Down
2 changes: 2 additions & 0 deletions util/execdetails/execdetails.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,8 @@ const (
TpIndexLookUpRunTimeStats
// TpSlowQueryRuntimeStat is the tp for TpSlowQueryRuntimeStat
TpSlowQueryRuntimeStat
// TpStreamAggRuntimeStat is the tp for StreamAggRuntimeStat
TpStreamAggRuntimeStat
)

// RuntimeStats is used to express the executor runtime information.
Expand Down