Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics, executor: refactor statistics on columns (#21817) #22007

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 51 additions & 14 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,9 @@ func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultCh chan<-
switch task.taskType {
case colTask:
task.colExec.job = task.job
resultCh <- analyzeColumnsPushdown(task.colExec)
for _, result := range analyzeColumnsPushdown(task.colExec) {
resultCh <- result
}
case idxTask:
task.idxExec.job = task.job
resultCh <- analyzeIndexPushdown(task.idxExec)
Expand Down Expand Up @@ -400,7 +402,7 @@ func (e *AnalyzeIndexExec) buildStats(ranges []*ranger.Range, considerNull bool)
return hist, cms, topN, nil
}

func analyzeColumnsPushdown(colExec *AnalyzeColumnsExec) analyzeResult {
func analyzeColumnsPushdown(colExec *AnalyzeColumnsExec) []analyzeResult {
var ranges []*ranger.Range
if hc := colExec.handleCols; hc != nil {
if hc.IsInt() {
Expand All @@ -413,7 +415,31 @@ func analyzeColumnsPushdown(colExec *AnalyzeColumnsExec) analyzeResult {
}
hists, cms, topNs, extStats, err := colExec.buildStats(ranges, true)
if err != nil {
return analyzeResult{Err: err, job: colExec.job}
return []analyzeResult{{Err: err, job: colExec.job}}
}

if hasPkHist(colExec.handleCols) {
PKresult := analyzeResult{
TableID: colExec.tableID,
Hist: hists[:1],
Cms: cms[:1],
TopNs: topNs[:1],
ExtStats: nil,
job: nil,
StatsVer: statistics.Version1,
}
PKresult.Count = int64(PKresult.Hist[0].TotalRowCount())
restResult := analyzeResult{
TableID: colExec.tableID,
Hist: hists[1:],
Cms: cms[1:],
TopNs: topNs[1:],
ExtStats: extStats,
job: colExec.job,
StatsVer: colExec.analyzeVer,
}
restResult.Count = PKresult.Count
return []analyzeResult{PKresult, restResult}
}
result := analyzeResult{
TableID: colExec.tableID,
Expand All @@ -422,14 +448,13 @@ func analyzeColumnsPushdown(colExec *AnalyzeColumnsExec) analyzeResult {
TopNs: topNs,
ExtStats: extStats,
job: colExec.job,
StatsVer: colExec.ctx.GetSessionVars().AnalyzeVersion,
StatsVer: colExec.analyzeVer,
}
hist := hists[0]
result.Count = hist.NullCount
if hist.Len() > 0 {
result.Count += hist.Buckets[hist.Len()-1].Count
result.Count = int64(result.Hist[0].TotalRowCount())
if result.StatsVer == statistics.Version2 {
result.Count += int64(topNs[0].TotalCount())
}
return result
return []analyzeResult{result}
}

// AnalyzeColumnsExec represents Analyze columns push down executor.
Expand All @@ -444,6 +469,7 @@ type AnalyzeColumnsExec struct {
resultHandler *tableResultHandler
opts map[ast.AnalyzeOptionType]uint64
job *statistics.AnalyzeJob
analyzeVer int
}

func (e *AnalyzeColumnsExec) open(ranges []*ranger.Range) error {
Expand Down Expand Up @@ -555,11 +581,14 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats boo
topNs = append(topNs, nil)
}
for i, col := range e.colsInfo {
err := collectors[i].ExtractTopN(uint32(e.opts[ast.AnalyzeOptNumTopN]), e.ctx.GetSessionVars().StmtCtx, &col.FieldType, timeZone)
if err != nil {
return nil, nil, nil, nil, err
if e.analyzeVer < 2 {
// In analyze version 2, we don't collect TopN this way. We will collect TopN from samples in `BuildColumnHistAndTopN()` below.
err := collectors[i].ExtractTopN(uint32(e.opts[ast.AnalyzeOptNumTopN]), e.ctx.GetSessionVars().StmtCtx, &col.FieldType, timeZone)
if err != nil {
return nil, nil, nil, nil, err
}
topNs = append(topNs, collectors[i].TopN)
}
topNs = append(topNs, collectors[i].TopN)
for j, s := range collectors[i].Samples {
collectors[i].Samples[j].Ordinal = j
collectors[i].Samples[j].Value, err = tablecodec.DecodeColumnValue(s.Value.GetBytes(), &col.FieldType, timeZone)
Expand All @@ -572,7 +601,15 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats boo
collectors[i].Samples[j].Value.SetBytes(collectors[i].Samples[j].Value.GetBytes())
}
}
hg, err := statistics.BuildColumn(e.ctx, int64(e.opts[ast.AnalyzeOptNumBuckets]), col.ID, collectors[i], &col.FieldType)
var hg *statistics.Histogram
var err error
var topn *statistics.TopN
if e.analyzeVer < 2 {
hg, err = statistics.BuildColumn(e.ctx, int64(e.opts[ast.AnalyzeOptNumBuckets]), col.ID, collectors[i], &col.FieldType)
} else {
hg, topn, err = statistics.BuildColumnHistAndTopN(e.ctx, int(e.opts[ast.AnalyzeOptNumBuckets]), int(e.opts[ast.AnalyzeOptNumTopN]), col.ID, collectors[i], &col.FieldType)
topNs = append(topNs, topn)
}
if err != nil {
return nil, nil, nil, nil, err
}
Expand Down
11 changes: 10 additions & 1 deletion executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ func (s *testSuite1) TestExtractTopN(c *C) {
tblInfo := table.Meta()
tblStats := s.dom.StatsHandle().GetTableStats(tblInfo)
colStats := tblStats.Columns[tblInfo.Columns[1].ID]
c.Assert(len(colStats.TopN.TopN), Equals, 1)
c.Assert(len(colStats.TopN.TopN), Equals, 10)
item := colStats.TopN.TopN[0]
c.Assert(item.Count, Equals, uint64(11))
idxStats := tblStats.Indices[tblInfo.Indices[0].ID]
Expand All @@ -696,6 +696,15 @@ func (s *testSuite1) TestExtractTopN(c *C) {
c.Assert(idxItem.Count, Equals, uint64(11))
// The columns are: DBName, table name, column name, is index, value, count.
tk.MustQuery("show stats_topn").Sort().Check(testkit.Rows("test t b 0 0 11",
"test t b 0 1 1",
"test t b 0 2 1",
"test t b 0 3 1",
"test t b 0 4 1",
"test t b 0 5 1",
"test t b 0 6 1",
"test t b 0 7 1",
"test t b 0 8 1",
"test t b 0 9 1",
"test t index_b 1 0 11",
"test t index_b 1 1 1",
"test t index_b 1 2 1",
Expand Down
3 changes: 2 additions & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2020,7 +2020,8 @@ func (b *executorBuilder) buildAnalyzeColumnsPushdown(task plannercore.AnalyzeCo
Flags: sc.PushDownFlags(),
TimeZoneOffset: offset,
},
opts: opts,
opts: opts,
analyzeVer: b.ctx.GetSessionVars().AnalyzeVersion,
}
depth := int32(opts[ast.AnalyzeOptCMSketchDepth])
width := int32(opts[ast.AnalyzeOptCMSketchWidth])
Expand Down
12 changes: 12 additions & 0 deletions statistics/analyze_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ func GetAllAnalyzeJobs() []*AnalyzeJob {

// Start marks status of the analyze job as running and update the start time.
func (job *AnalyzeJob) Start() {
if job == nil {
return
}
job.Mutex.Lock()
job.State = running
now := time.Now()
Expand All @@ -102,6 +105,9 @@ func (job *AnalyzeJob) Start() {

// Update updates the row count of analyze job.
func (job *AnalyzeJob) Update(rowCount int64) {
if job == nil {
return
}
job.Mutex.Lock()
job.RowCount += rowCount
job.updateTime = time.Now()
Expand All @@ -110,6 +116,9 @@ func (job *AnalyzeJob) Update(rowCount int64) {

// Finish update the status of analyze job to finished or failed according to `meetError`.
func (job *AnalyzeJob) Finish(meetError bool) {
if job == nil {
return
}
job.Mutex.Lock()
if meetError {
job.State = failed
Expand All @@ -121,6 +130,9 @@ func (job *AnalyzeJob) Finish(meetError bool) {
}

func (job *AnalyzeJob) getUpdateTime() time.Time {
if job == nil {
return time.Time{}
}
job.Mutex.Lock()
defer job.Mutex.Unlock()
return job.updateTime
Expand Down
Loading