From 74a1fb690367211d397e5adbdb11126905b68c73 Mon Sep 17 00:00:00 2001 From: Zhou Kunqin <25057648+time-and-fate@users.noreply.github.com> Date: Thu, 24 Dec 2020 13:56:30 +0800 Subject: [PATCH] statistics, executor: refactor statistics on columns (#21817) --- executor/analyze.go | 65 +++++++++--- executor/analyze_test.go | 11 +- executor/builder.go | 3 +- statistics/analyze_jobs.go | 12 +++ statistics/builder.go | 179 ++++++++++++++++++++++++++++++--- statistics/handle/dump.go | 11 +- statistics/handle/dump_test.go | 2 +- statistics/handle/handle.go | 42 +++++++- statistics/histogram.go | 130 +++++++++++++++++++++--- statistics/statistics_test.go | 26 ++++- statistics/table.go | 5 +- 11 files changed, 431 insertions(+), 55 deletions(-) diff --git a/executor/analyze.go b/executor/analyze.go index 091abb8575c74..58d84dc459d3a 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -199,7 +199,9 @@ func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultCh chan<- switch task.taskType { case colTask: task.colExec.job = task.job - resultCh <- analyzeColumnsPushdown(task.colExec) + for _, result := range analyzeColumnsPushdown(task.colExec) { + resultCh <- result + } case idxTask: task.idxExec.job = task.job resultCh <- analyzeIndexPushdown(task.idxExec) @@ -400,7 +402,7 @@ func (e *AnalyzeIndexExec) buildStats(ranges []*ranger.Range, considerNull bool) return hist, cms, topN, nil } -func analyzeColumnsPushdown(colExec *AnalyzeColumnsExec) analyzeResult { +func analyzeColumnsPushdown(colExec *AnalyzeColumnsExec) []analyzeResult { var ranges []*ranger.Range if hc := colExec.handleCols; hc != nil { if hc.IsInt() { @@ -413,7 +415,31 @@ func analyzeColumnsPushdown(colExec *AnalyzeColumnsExec) analyzeResult { } hists, cms, topNs, extStats, err := colExec.buildStats(ranges, true) if err != nil { - return analyzeResult{Err: err, job: colExec.job} + return []analyzeResult{{Err: err, job: colExec.job}} + } + + if hasPkHist(colExec.handleCols) { + PKresult := analyzeResult{ + TableID: colExec.tableID, + Hist: hists[:1], + Cms: cms[:1], + TopNs: topNs[:1], + ExtStats: nil, + job: nil, + StatsVer: statistics.Version1, + } + PKresult.Count = int64(PKresult.Hist[0].TotalRowCount()) + restResult := analyzeResult{ + TableID: colExec.tableID, + Hist: hists[1:], + Cms: cms[1:], + TopNs: topNs[1:], + ExtStats: extStats, + job: colExec.job, + StatsVer: colExec.analyzeVer, + } + restResult.Count = PKresult.Count + return []analyzeResult{PKresult, restResult} } result := analyzeResult{ TableID: colExec.tableID, @@ -422,14 +448,13 @@ func analyzeColumnsPushdown(colExec *AnalyzeColumnsExec) analyzeResult { TopNs: topNs, ExtStats: extStats, job: colExec.job, - StatsVer: colExec.ctx.GetSessionVars().AnalyzeVersion, + StatsVer: colExec.analyzeVer, } - hist := hists[0] - result.Count = hist.NullCount - if hist.Len() > 0 { - result.Count += hist.Buckets[hist.Len()-1].Count + result.Count = int64(result.Hist[0].TotalRowCount()) + if result.StatsVer == statistics.Version2 { + result.Count += int64(topNs[0].TotalCount()) } - return result + return []analyzeResult{result} } // AnalyzeColumnsExec represents Analyze columns push down executor. @@ -444,6 +469,7 @@ type AnalyzeColumnsExec struct { resultHandler *tableResultHandler opts map[ast.AnalyzeOptionType]uint64 job *statistics.AnalyzeJob + analyzeVer int } func (e *AnalyzeColumnsExec) open(ranges []*ranger.Range) error { @@ -555,11 +581,14 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats boo topNs = append(topNs, nil) } for i, col := range e.colsInfo { - err := collectors[i].ExtractTopN(uint32(e.opts[ast.AnalyzeOptNumTopN]), e.ctx.GetSessionVars().StmtCtx, &col.FieldType, timeZone) - if err != nil { - return nil, nil, nil, nil, err + if e.analyzeVer < 2 { + // In analyze version 2, we don't collect TopN this way. We will collect TopN from samples in `BuildColumnHistAndTopN()` below. + err := collectors[i].ExtractTopN(uint32(e.opts[ast.AnalyzeOptNumTopN]), e.ctx.GetSessionVars().StmtCtx, &col.FieldType, timeZone) + if err != nil { + return nil, nil, nil, nil, err + } + topNs = append(topNs, collectors[i].TopN) } - topNs = append(topNs, collectors[i].TopN) for j, s := range collectors[i].Samples { collectors[i].Samples[j].Ordinal = j collectors[i].Samples[j].Value, err = tablecodec.DecodeColumnValue(s.Value.GetBytes(), &col.FieldType, timeZone) @@ -572,7 +601,15 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats boo collectors[i].Samples[j].Value.SetBytes(collectors[i].Samples[j].Value.GetBytes()) } } - hg, err := statistics.BuildColumn(e.ctx, int64(e.opts[ast.AnalyzeOptNumBuckets]), col.ID, collectors[i], &col.FieldType) + var hg *statistics.Histogram + var err error + var topn *statistics.TopN + if e.analyzeVer < 2 { + hg, err = statistics.BuildColumn(e.ctx, int64(e.opts[ast.AnalyzeOptNumBuckets]), col.ID, collectors[i], &col.FieldType) + } else { + hg, topn, err = statistics.BuildColumnHistAndTopN(e.ctx, int(e.opts[ast.AnalyzeOptNumBuckets]), int(e.opts[ast.AnalyzeOptNumTopN]), col.ID, collectors[i], &col.FieldType) + topNs = append(topNs, topn) + } if err != nil { return nil, nil, nil, nil, err } diff --git a/executor/analyze_test.go b/executor/analyze_test.go index 0c26ef3de110c..f8fdf69f56be5 100644 --- a/executor/analyze_test.go +++ b/executor/analyze_test.go @@ -687,7 +687,7 @@ func (s *testSuite1) TestExtractTopN(c *C) { tblInfo := table.Meta() tblStats := s.dom.StatsHandle().GetTableStats(tblInfo) colStats := tblStats.Columns[tblInfo.Columns[1].ID] - c.Assert(len(colStats.TopN.TopN), Equals, 1) + c.Assert(len(colStats.TopN.TopN), Equals, 10) item := colStats.TopN.TopN[0] c.Assert(item.Count, Equals, uint64(11)) idxStats := tblStats.Indices[tblInfo.Indices[0].ID] @@ -696,6 +696,15 @@ func (s *testSuite1) TestExtractTopN(c *C) { c.Assert(idxItem.Count, Equals, uint64(11)) // The columns are: DBName, table name, column name, is index, value, count. tk.MustQuery("show stats_topn").Sort().Check(testkit.Rows("test t b 0 0 11", + "test t b 0 1 1", + "test t b 0 2 1", + "test t b 0 3 1", + "test t b 0 4 1", + "test t b 0 5 1", + "test t b 0 6 1", + "test t b 0 7 1", + "test t b 0 8 1", + "test t b 0 9 1", "test t index_b 1 0 11", "test t index_b 1 1 1", "test t index_b 1 2 1", diff --git a/executor/builder.go b/executor/builder.go index a2ea15b4438af..d65856847a97a 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -2020,7 +2020,8 @@ func (b *executorBuilder) buildAnalyzeColumnsPushdown(task plannercore.AnalyzeCo Flags: sc.PushDownFlags(), TimeZoneOffset: offset, }, - opts: opts, + opts: opts, + analyzeVer: b.ctx.GetSessionVars().AnalyzeVersion, } depth := int32(opts[ast.AnalyzeOptCMSketchDepth]) width := int32(opts[ast.AnalyzeOptCMSketchWidth]) diff --git a/statistics/analyze_jobs.go b/statistics/analyze_jobs.go index ded4b7d66225d..18fe32c4e3d3a 100644 --- a/statistics/analyze_jobs.go +++ b/statistics/analyze_jobs.go @@ -92,6 +92,9 @@ func GetAllAnalyzeJobs() []*AnalyzeJob { // Start marks status of the analyze job as running and update the start time. func (job *AnalyzeJob) Start() { + if job == nil { + return + } job.Mutex.Lock() job.State = running now := time.Now() @@ -102,6 +105,9 @@ func (job *AnalyzeJob) Start() { // Update updates the row count of analyze job. func (job *AnalyzeJob) Update(rowCount int64) { + if job == nil { + return + } job.Mutex.Lock() job.RowCount += rowCount job.updateTime = time.Now() @@ -110,6 +116,9 @@ func (job *AnalyzeJob) Update(rowCount int64) { // Finish update the status of analyze job to finished or failed according to `meetError`. func (job *AnalyzeJob) Finish(meetError bool) { + if job == nil { + return + } job.Mutex.Lock() if meetError { job.State = failed @@ -121,6 +130,9 @@ func (job *AnalyzeJob) Finish(meetError bool) { } func (job *AnalyzeJob) getUpdateTime() time.Time { + if job == nil { + return time.Time{} + } job.Mutex.Lock() defer job.Mutex.Unlock() return job.updateTime diff --git a/statistics/builder.go b/statistics/builder.go index 45ae0a5678c3f..0c2182bc7c7af 100644 --- a/statistics/builder.go +++ b/statistics/builder.go @@ -14,10 +14,13 @@ package statistics import ( + "bytes" + "github.com/pingcap/errors" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/codec" ) // SortedBuilder is used to build histograms for PK and index. @@ -116,26 +119,38 @@ func BuildColumnHist(ctx sessionctx.Context, numBuckets, id int64, collector *Sa } hg := NewHistogram(id, ndv, nullCount, 0, tp, int(numBuckets), collector.TotalSize) + corrXYSum, err := buildHist(sc, hg, samples, count, ndv, numBuckets) + if err != nil { + return nil, err + } + hg.Correlation = calcCorrelation(int64(len(samples)), corrXYSum) + return hg, nil +} + +// buildHist builds histogram from samples and other information. +// It stores the built histogram in hg and return corrXYSum used for calculating the correlation. +func buildHist(sc *stmtctx.StatementContext, hg *Histogram, samples []*SampleItem, count, ndv, numBuckets int64) (corrXYSum float64, err error) { sampleNum := int64(len(samples)) // As we use samples to build the histogram, the bucket number and repeat should multiply a factor. - sampleFactor := float64(count) / float64(len(samples)) + sampleFactor := float64(count) / float64(sampleNum) + ndvFactor := float64(count) / float64(ndv) + if ndvFactor > sampleFactor { + ndvFactor = sampleFactor + } // Since bucket count is increased by sampleFactor, so the actual max values per bucket is // floor(valuesPerBucket/sampleFactor)*sampleFactor, which may less than valuesPerBucket, // thus we need to add a sampleFactor to avoid building too many buckets. valuesPerBucket := float64(count)/float64(numBuckets) + sampleFactor - ndvFactor := float64(count) / float64(hg.NDV) - if ndvFactor > sampleFactor { - ndvFactor = sampleFactor - } + bucketIdx := 0 var lastCount int64 - var corrXYSum float64 + corrXYSum = float64(0) hg.AppendBucket(&samples[0].Value, &samples[0].Value, int64(sampleFactor), int64(ndvFactor)) for i := int64(1); i < sampleNum; i++ { corrXYSum += float64(i) * float64(samples[i].Ordinal) cmp, err := hg.GetUpper(bucketIdx).CompareDatum(sc, &samples[i].Value) if err != nil { - return nil, errors.Trace(err) + return 0, errors.Trace(err) } totalCount := float64(i+1) * sampleFactor if cmp == 0 { @@ -143,7 +158,7 @@ func BuildColumnHist(ctx sessionctx.Context, numBuckets, id int64, collector *Sa // a same value only stored in a single bucket, we do not increase bucketIdx even if it exceeds // valuesPerBucket. hg.Buckets[bucketIdx].Count = int64(totalCount) - if float64(hg.Buckets[bucketIdx].Repeat) == ndvFactor { + if hg.Buckets[bucketIdx].Repeat == int64(ndvFactor) { hg.Buckets[bucketIdx].Repeat = int64(2 * sampleFactor) } else { hg.Buckets[bucketIdx].Repeat += int64(sampleFactor) @@ -158,12 +173,15 @@ func BuildColumnHist(ctx sessionctx.Context, numBuckets, id int64, collector *Sa hg.AppendBucket(&samples[i].Value, &samples[i].Value, int64(totalCount), int64(ndvFactor)) } } - // Compute column order correlation with handle. + return corrXYSum, nil +} + +// calcCorrelation computes column order correlation with the handle. +func calcCorrelation(sampleNum int64, corrXYSum float64) float64 { if sampleNum == 1 { - hg.Correlation = 1 - return hg, nil + return 1 } - // X means the ordinal of the item in original sequence, Y means the oridnal of the item in the + // X means the ordinal of the item in original sequence, Y means the ordinal of the item in the // sorted sequence, we know that X and Y value sets are both: // 0, 1, ..., sampleNum-1 // we can simply compute sum(X) = sum(Y) = @@ -176,11 +194,144 @@ func BuildColumnHist(ctx sessionctx.Context, numBuckets, id int64, collector *Sa itemsCount := float64(sampleNum) corrXSum := (itemsCount - 1) * itemsCount / 2.0 corrX2Sum := (itemsCount - 1) * itemsCount * (2*itemsCount - 1) / 6.0 - hg.Correlation = (itemsCount*corrXYSum - corrXSum*corrXSum) / (itemsCount*corrX2Sum - corrXSum*corrXSum) - return hg, nil + return (itemsCount*corrXYSum - corrXSum*corrXSum) / (itemsCount*corrX2Sum - corrXSum*corrXSum) } // BuildColumn builds histogram from samples for column. func BuildColumn(ctx sessionctx.Context, numBuckets, id int64, collector *SampleCollector, tp *types.FieldType) (*Histogram, error) { return BuildColumnHist(ctx, numBuckets, id, collector, tp, collector.Count, collector.FMSketch.NDV(), collector.NullCount) } + +// BuildColumnHistAndTopN build a histogram and TopN for a column from samples. +func BuildColumnHistAndTopN(ctx sessionctx.Context, numBuckets, numTopN int, id int64, collector *SampleCollector, tp *types.FieldType) (*Histogram, *TopN, error) { + count := collector.Count + ndv := collector.FMSketch.NDV() + nullCount := collector.NullCount + if ndv > count { + ndv = count + } + if count == 0 || len(collector.Samples) == 0 { + return NewHistogram(id, ndv, nullCount, 0, tp, 0, collector.TotalSize), nil, nil + } + sc := ctx.GetSessionVars().StmtCtx + samples := collector.Samples + samples, err := SortSampleItems(sc, samples) + if err != nil { + return nil, nil, err + } + hg := NewHistogram(id, ndv, nullCount, 0, tp, numBuckets, collector.TotalSize) + + sampleNum := int64(len(samples)) + // As we use samples to build the histogram, the bucket number and repeat should multiply a factor. + sampleFactor := float64(count) / float64(len(samples)) + + // Step1: collect topn from samples + + // the topNList is always sorted by count from more to less + topNList := make([]TopNMeta, 0, numTopN) + cur, err := codec.EncodeKey(ctx.GetSessionVars().StmtCtx, nil, samples[0].Value) + if err != nil { + return nil, nil, errors.Trace(err) + } + curCnt := float64(0) + var corrXYSum float64 + + // Iterate through the samples + for i := int64(0); i < sampleNum; i++ { + corrXYSum += float64(i) * float64(samples[i].Ordinal) + + sampleBytes, err := codec.EncodeKey(ctx.GetSessionVars().StmtCtx, nil, samples[i].Value) + if err != nil { + return nil, nil, errors.Trace(err) + } + // case 1, this value is equal to the last one: current count++ + if bytes.Equal(cur, sampleBytes) { + curCnt += 1 + continue + } + // case 2, meet a different value: counting for the "current" is complete + // case 2-1, now topn is empty: append the "current" count directly + if len(topNList) == 0 { + topNList = append(topNList, TopNMeta{Encoded: cur, Count: uint64(curCnt)}) + cur, curCnt = sampleBytes, 1 + continue + } + // case 2-2, now topn is full, and the "current" count is less than the least count in the topn: no need to insert the "current" + if len(topNList) >= numTopN && uint64(curCnt) <= topNList[len(topNList)-1].Count { + cur, curCnt = sampleBytes, 1 + continue + } + // case 2-3, now topn is not full, or the "current" count is larger than the least count in the topn: need to find a slot to insert the "current" + j := len(topNList) + for ; j > 0; j-- { + if uint64(curCnt) < topNList[j-1].Count { + break + } + } + topNList = append(topNList, TopNMeta{}) + copy(topNList[j+1:], topNList[j:]) + topNList[j] = TopNMeta{Encoded: cur, Count: uint64(curCnt)} + if len(topNList) > numTopN { + topNList = topNList[:numTopN] + } + cur, curCnt = sampleBytes, 1 + } + + // Handle the counting for the last value. Basically equal to the case 2 above. + // now topn is empty: append the "current" count directly + if len(topNList) == 0 { + topNList = append(topNList, TopNMeta{Encoded: cur, Count: uint64(curCnt)}) + } else if len(topNList) < numTopN || uint64(curCnt) > topNList[len(topNList)-1].Count { + // now topn is not full, or the "current" count is larger than the least count in the topn: need to find a slot to insert the "current" + j := len(topNList) + for ; j > 0; j-- { + if uint64(curCnt) < topNList[j-1].Count { + break + } + } + topNList = append(topNList, TopNMeta{}) + copy(topNList[j+1:], topNList[j:]) + topNList[j] = TopNMeta{Encoded: cur, Count: uint64(curCnt)} + if len(topNList) > numTopN { + topNList = topNList[:numTopN] + } + } + + // Step2: exclude topn from samples + for i := int64(0); i < int64(len(samples)); i++ { + sampleBytes, err := codec.EncodeKey(ctx.GetSessionVars().StmtCtx, nil, samples[i].Value) + if err != nil { + return nil, nil, errors.Trace(err) + } + for j := 0; j < len(topNList); j++ { + if bytes.Equal(sampleBytes, topNList[j].Encoded) { + // find the same value in topn: need to skip over this value in samples + copy(samples[i:], samples[uint64(i)+topNList[j].Count:]) + samples = samples[:uint64(len(samples))-topNList[j].Count] + i-- + continue + } + } + } + + for i := 0; i < len(topNList); i++ { + topNList[i].Count *= uint64(sampleFactor) + } + topn := &TopN{TopN: topNList} + + if uint64(count) <= topn.TotalCount() || int(hg.NDV) <= len(topn.TopN) { + // TopN includes all sample data + return hg, topn, nil + } + + // Step3: build histogram with the rest samples + if len(samples) > 0 { + _, err = buildHist(sc, hg, samples, count-int64(topn.TotalCount()), ndv-int64(len(topn.TopN)), int64(numBuckets)) + if err != nil { + return nil, nil, err + } + } + + hg.Correlation = calcCorrelation(int64(len(samples)), corrXYSum) + return hg, topn, nil +} diff --git a/statistics/handle/dump.go b/statistics/handle/dump.go index 2040ad0752030..006a6d1dc2711 100644 --- a/statistics/handle/dump.go +++ b/statistics/handle/dump.go @@ -163,7 +163,7 @@ func (h *Handle) tableStatsToJSON(dbName string, tableInfo *model.TableInfo, phy if err != nil { return nil, errors.Trace(err) } - jsonTbl.Columns[col.Info.Name.L] = dumpJSONCol(hist, col.CMSketch, col.TopN, nil) + jsonTbl.Columns[col.Info.Name.L] = dumpJSONCol(hist, col.CMSketch, col.TopN, &col.StatsVer) } for _, idx := range tbl.Indices { @@ -208,7 +208,7 @@ func (h *Handle) loadStatsFromJSON(tableInfo *model.TableInfo, physicalID int64, } for _, col := range tbl.Columns { - err = h.SaveStatsToStorage(tbl.PhysicalID, tbl.Count, 0, &col.Histogram, col.CMSketch, col.TopN, 0, 1) + err = h.SaveStatsToStorage(tbl.PhysicalID, tbl.Count, 0, &col.Histogram, col.CMSketch, col.TopN, int(col.StatsVer), 1) if err != nil { return errors.Trace(err) } @@ -278,6 +278,12 @@ func TableStatsFromJSON(tableInfo *model.TableInfo, physicalID int64, jsonTbl *J } cm, topN := statistics.CMSketchAndTopNFromProto(jsonCol.CMSketch) hist.ID, hist.NullCount, hist.LastUpdateVersion, hist.TotColSize, hist.Correlation = colInfo.ID, jsonCol.NullCount, jsonCol.LastUpdateVersion, jsonCol.TotColSize, jsonCol.Correlation + // If the statistics is loaded from a JSON without stats version, + // we set it to 1. + statsVer := int64(statistics.Version1) + if jsonCol.StatsVer != nil { + statsVer = *jsonCol.StatsVer + } col := &statistics.Column{ PhysicalID: physicalID, Histogram: *hist, @@ -286,6 +292,7 @@ func TableStatsFromJSON(tableInfo *model.TableInfo, physicalID int64, jsonTbl *J Info: colInfo, Count: count, IsHandle: tableInfo.PKIsHandle && mysql.HasPriKeyFlag(colInfo.Flag), + StatsVer: statsVer, } tbl.Columns[col.ID] = col } diff --git a/statistics/handle/dump_test.go b/statistics/handle/dump_test.go index fc25c63d1f92d..d80294f03bf14 100644 --- a/statistics/handle/dump_test.go +++ b/statistics/handle/dump_test.go @@ -150,7 +150,7 @@ func (s *testStatsSuite) TestDumpCMSketchWithTopN(c *C) { cms, _, _, _ := statistics.NewCMSketchAndTopN(5, 2048, fakeData, 20, 100) stat := h.GetTableStats(tableInfo) - err = h.SaveStatsToStorage(tableInfo.ID, 1, 0, &stat.Columns[tableInfo.Columns[0].ID].Histogram, cms, nil, statistics.CurStatsVersion, 1) + err = h.SaveStatsToStorage(tableInfo.ID, 1, 0, &stat.Columns[tableInfo.Columns[0].ID].Histogram, cms, nil, statistics.Version2, 1) c.Assert(err, IsNil) c.Assert(h.Update(is), IsNil) diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index 09b634f4ea578..7d880464cc036 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -359,6 +359,14 @@ func (h *Handle) LoadNeededHistograms() (err error) { if err != nil { return errors.Trace(err) } + selSQL := fmt.Sprintf("select stats_ver from mysql.stats_histograms where is_index = 0 and table_id = %d and hist_id = %d", col.TableID, col.ColumnID) + rows, _, err := reader.read(selSQL) + if err != nil { + return errors.Trace(err) + } + if len(rows) == 0 { + logutil.BgLogger().Error("fail to get stats version for this histogram", zap.Int64("table_id", col.TableID), zap.Int64("hist_id", col.ColumnID)) + } tbl.Columns[c.ID] = &statistics.Column{ PhysicalID: col.TableID, Histogram: *hg, @@ -367,7 +375,9 @@ func (h *Handle) LoadNeededHistograms() (err error) { TopN: topN, Count: int64(hg.TotalRowCount()), IsHandle: c.IsHandle, + StatsVer: rows[0].GetInt64(0), } + tbl.Columns[c.ID].Count = int64(tbl.Columns[c.ID].TotalRowCount()) h.updateStatsCache(statsCache.update([]*statistics.Table{tbl}, nil, statsCache.version)) statistics.HistogramNeededColumns.Delete(col) } @@ -461,6 +471,7 @@ func (h *Handle) columnStatsFromStorage(reader *statsReader, row chunk.Row, tabl histVer := row.GetUint64(4) nullCount := row.GetInt64(5) totColSize := row.GetInt64(6) + statsVer := row.GetInt64(7) correlation := row.GetFloat64(9) lastAnalyzePos := row.GetDatum(10, types.NewFieldType(mysql.TypeBlob)) col := table.Columns[histID] @@ -486,7 +497,7 @@ func (h *Handle) columnStatsFromStorage(reader *statsReader, row chunk.Row, tabl (col == nil || col.Len() == 0 && col.LastUpdateVersion < histVer) && !loadAll if notNeedLoad { - count, err := h.columnCountFromStorage(reader, table.PhysicalID, histID) + count, err := h.columnCountFromStorage(reader, table.PhysicalID, histID, statsVer) if err != nil { return errors.Trace(err) } @@ -522,7 +533,9 @@ func (h *Handle) columnStatsFromStorage(reader *statsReader, row chunk.Row, tabl ErrorRate: errorRate, IsHandle: tableInfo.PKIsHandle && mysql.HasPriKeyFlag(colInfo.Flag), Flag: flag, + StatsVer: statsVer, } + col.Count = int64(col.TotalRowCount()) lastAnalyzePos.Copy(&col.LastAnalyzePos) break } @@ -766,8 +779,8 @@ func (h *Handle) histogramFromStorage(reader *statsReader, tableID int64, colID return hg, nil } -func (h *Handle) columnCountFromStorage(reader *statsReader, tableID, colID int64) (int64, error) { - selSQL := fmt.Sprintf("select sum(count) from mysql.stats_buckets where table_id = %d and is_index = %d and hist_id = %d", tableID, 0, colID) +func (h *Handle) columnCountFromStorage(reader *statsReader, tableID, colID, statsVer int64) (int64, error) { + selSQL := fmt.Sprintf("select sum(count) from mysql.stats_buckets where table_id = %d and is_index = 0 and hist_id = %d", tableID, colID) rows, _, err := reader.read(selSQL) if err != nil { return 0, errors.Trace(err) @@ -775,7 +788,28 @@ func (h *Handle) columnCountFromStorage(reader *statsReader, tableID, colID int6 if rows[0].IsNull(0) { return 0, nil } - return rows[0].GetMyDecimal(0).ToInt() + count, err := rows[0].GetMyDecimal(0).ToInt() + if err != nil { + return 0, errors.Trace(err) + } + if statsVer == statistics.Version2 { + // Before stats ver 2, histogram represents all data in this column. + // In stats ver 2, histogram + TopN represent all data in this column. + // So we need to add TopN total count here. + selSQL = fmt.Sprintf("select sum(count) from mysql.stats_top_n where table_id = %d and is_index = 0 and hist_id = %d", tableID, colID) + rows, _, err = reader.read(selSQL) + if err != nil { + return 0, errors.Trace(err) + } + if !rows[0].IsNull(0) { + topNCount, err := rows[0].GetMyDecimal(0).ToInt() + if err != nil { + return 0, errors.Trace(err) + } + count += topNCount + } + } + return count, err } func (h *Handle) statsMetaByTableIDFromStorage(tableID int64, historyStatsExec sqlexec.RestrictedSQLExecutor) (version uint64, modifyCount, count int64, err error) { diff --git a/statistics/histogram.go b/statistics/histogram.go index 1e1c6f8a87dae..070590c33fc0f 100644 --- a/statistics/histogram.go +++ b/statistics/histogram.go @@ -271,10 +271,20 @@ func HistogramEqual(a, b *Histogram, ignoreID bool) bool { // constants for stats version. These const can be used for solving compatibility issue. const ( - CurStatsVersion = Version2 - Version0 = 0 - Version1 = 1 - Version2 = 2 + Version0 = 0 + // In Version1 + // Column stats: CM Sketch is built in TiKV using full data. Histogram is built from samples. TopN is extracted from CM Sketch. + // TopN + CM Sketch represent all data. Histogram also represents all data. + // Index stats: CM Sketch and Histogram is built in TiKV using full data. TopN is extracted from histogram. Then values covered by TopN is removed from CM Sketch. + // TopN + CM Sketch represent all data. Histogram also represents all data. + // Int PK column stats is always Version1 because it only has histogram built from full data. + // Fast analyze is always Version1 currently. + Version1 = 1 + // In Version2 + // Column stats: CM Sketch is not used. TopN and Histogram are built from samples. TopN + Histogram represent all data. + // Index stats: CM SKetch is not used. TopN and Histograms are built in TiKV using full data. NDV is also collected for each bucket in histogram. + // Then values covered by TopN is removed from Histogram. TopN + Histogram represent all data. + Version2 = 2 ) // AnalyzeFlag is set when the statistics comes from analyze and has not been modified by feedback. @@ -434,6 +444,23 @@ func (hg *Histogram) BetweenRowCount(a, b types.Datum) float64 { return lessCountB - lessCountA } +// BetweenRowCount estimates the row count for interval [l, r). +func (c *Column) BetweenRowCount(sc *stmtctx.StatementContext, l, r types.Datum) (float64, error) { + histBetweenCnt := c.Histogram.BetweenRowCount(l, r) + if c.StatsVer <= Version1 { + return histBetweenCnt, nil + } + lBytes, err := codec.EncodeKey(sc, nil, l) + if err != nil { + return 0, errors.Trace(err) + } + rBytes, err := codec.EncodeKey(sc, nil, r) + if err != nil { + return 0, errors.Trace(err) + } + return float64(c.TopN.BetweenCount(lBytes, rBytes)) + histBetweenCnt, nil +} + // TotalRowCount returns the total count of this histogram. func (hg *Histogram) TotalRowCount() float64 { return hg.notNullCount() + float64(hg.NullCount) @@ -808,12 +835,38 @@ type Column struct { ErrorRate Flag int64 LastAnalyzePos types.Datum + StatsVer int64 // StatsVer is the version of the current stats, used to maintain compatibility } func (c *Column) String() string { return c.Histogram.ToString(0) } +// TotalRowCount returns the total count of this column. +func (c *Column) TotalRowCount() float64 { + if c.StatsVer == Version2 { + return c.Histogram.TotalRowCount() + float64(c.TopN.TotalCount()) + } + return c.Histogram.TotalRowCount() +} + +func (c *Column) notNullCount() float64 { + if c.StatsVer == Version2 { + return c.Histogram.notNullCount() + float64(c.TopN.TotalCount()) + } + return c.Histogram.notNullCount() +} + +// GetIncreaseFactor get the increase factor to adjust the final estimated count when the table is modified. +func (c *Column) GetIncreaseFactor(totalCount int64) float64 { + columnCount := c.TotalRowCount() + if columnCount == 0 { + // avoid dividing by 0 + return 1.0 + } + return float64(totalCount) / columnCount +} + // MemoryUsage returns the total memory usage of Histogram and CMSketch in Column. // We ignore the size of other metadata in Column func (c *Column) MemoryUsage() (sum int64) { @@ -834,29 +887,71 @@ func (c *Column) IsInvalid(sc *stmtctx.StatementContext, collPseudo bool) bool { if collPseudo && c.NotAccurate() { return true } - if c.NDV > 0 && c.Len() == 0 && sc != nil { + if c.NDV > 0 && c.notNullCount() == 0 && sc != nil { sc.SetHistogramsNotLoad() HistogramNeededColumns.insert(tableColumnID{TableID: c.PhysicalID, ColumnID: c.Info.ID}) } - return c.TotalRowCount() == 0 || (c.NDV > 0 && c.Len() == 0) + return c.TotalRowCount() == 0 || (c.NDV > 0 && c.notNullCount() == 0) } func (c *Column) equalRowCount(sc *stmtctx.StatementContext, val types.Datum, modifyCount int64) (float64, error) { if val.IsNull() { return float64(c.NullCount), nil } - // All the values are null. - if c.Histogram.Bounds.NumRows() == 0 { - return 0.0, nil + if c.StatsVer < Version2 { + // All the values are null. + if c.Histogram.Bounds.NumRows() == 0 { + return 0.0, nil + } + if c.NDV > 0 && c.outOfRange(val) { + return outOfRangeEQSelectivity(c.NDV, modifyCount, int64(c.TotalRowCount())) * c.TotalRowCount(), nil + } + if c.CMSketch != nil { + count, err := queryValue(sc, c.CMSketch, c.TopN, val) + return float64(count), errors.Trace(err) + } + return c.Histogram.equalRowCount(val), nil } - if c.NDV > 0 && c.outOfRange(val) { - return outOfRangeEQSelectivity(c.NDV, modifyCount, int64(c.TotalRowCount())) * c.TotalRowCount(), nil + // Stats version == 2 + // 1. try to find this value in TopN + if c.TopN != nil { + valBytes, err := codec.EncodeKey(sc, nil, val) + if err != nil { + return 0, errors.Trace(err) + } + rowcount, ok := c.QueryTopN(valBytes) + if ok { + return float64(rowcount), nil + } } - if c.CMSketch != nil { - count, err := queryValue(sc, c.CMSketch, c.TopN, val) - return float64(count), errors.Trace(err) + // 2. try to find this value in bucket.repeats(the last value in every bucket) + index, match := c.Histogram.Bounds.LowerBound(0, &val) + if index%2 == 1 && match { + return float64(c.Histogram.Buckets[index/2].Repeat), nil } - return c.Histogram.equalRowCount(val), nil + if match { + cmp := chunk.GetCompareFunc(c.Histogram.Tp) + if cmp(c.Histogram.Bounds.GetRow(index), 0, c.Histogram.Bounds.GetRow(index+1), 0) == 0 { + return float64(c.Histogram.Buckets[index/2].Repeat), nil + } + } + // 3. use uniform distribution assumption for the rest + cnt := c.Histogram.notNullCount() + for _, bkt := range c.Histogram.Buckets { + if cnt <= float64(bkt.Repeat) { + return 0, nil + } + cnt -= float64(bkt.Repeat) + } + topNLen := int64(0) + if c.TopN != nil { + topNLen = int64(len(c.TopN.TopN)) + } + ndv := c.NDV - topNLen - int64(len(c.Histogram.Buckets)) + if ndv <= 0 { + return 0, nil + } + return cnt / float64(ndv), nil } // GetColumnRowCount estimates the row count by a slice of Range. @@ -905,7 +1000,10 @@ func (c *Column) GetColumnRowCount(sc *stmtctx.StatementContext, ranges []*range continue } // The interval case. - cnt := c.BetweenRowCount(lowVal, highVal) + cnt, err := c.BetweenRowCount(sc, lowVal, highVal) + if err != nil { + return 0, err + } if (c.outOfRange(lowVal) && !lowVal.IsNull()) || c.outOfRange(highVal) { cnt += outOfRangeEQSelectivity(outOfRangeBetweenRate, modifyCount, int64(c.TotalRowCount())) * c.TotalRowCount() } diff --git a/statistics/statistics_test.go b/statistics/statistics_test.go index 326fae923a078..d7d6aa6bb2226 100644 --- a/statistics/statistics_test.go +++ b/statistics/statistics_test.go @@ -242,6 +242,7 @@ func checkRepeats(c *C, hg *Histogram) { func (s *testStatisticsSuite) TestBuild(c *C) { bucketCount := int64(256) + topNCount := 20 ctx := mock.NewContext() sc := ctx.GetSessionVars().StmtCtx sketch, _, err := buildFMSketch(sc, s.rc.(*recordSet).data, 1000) @@ -275,7 +276,30 @@ func (s *testStatisticsSuite) TestBuild(c *C) { count = col.BetweenRowCount(types.NewIntDatum(3000), types.NewIntDatum(3500)) c.Check(int(count), Equals, 4994) count = col.lessRowCount(types.NewIntDatum(1)) - c.Check(int(count), Equals, 9) + c.Check(int(count), Equals, 5) + + colv2, topnv2, err := BuildColumnHistAndTopN(ctx, int(bucketCount), topNCount, 2, collector, types.NewFieldType(mysql.TypeLonglong)) + c.Check(err, IsNil) + c.Check(topnv2.TopN, NotNil) + expectedTopNCount := []uint64{9990, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30} + for i, meta := range topnv2.TopN { + c.Check(meta.Count, Equals, expectedTopNCount[i]) + } + c.Check(colv2.Len(), Equals, 256) + count = colv2.lessRowCount(types.NewIntDatum(1000)) + c.Check(int(count), Equals, 325) + count = colv2.lessRowCount(types.NewIntDatum(2000)) + c.Check(int(count), Equals, 9430) + count = colv2.greaterRowCount(types.NewIntDatum(2000)) + c.Check(int(count), Equals, 80008) + count = colv2.lessRowCount(types.NewIntDatum(200000000)) + c.Check(int(count), Equals, 89440) + count = colv2.greaterRowCount(types.NewIntDatum(200000000)) + c.Check(count, Equals, 0.0) + count = colv2.BetweenRowCount(types.NewIntDatum(3000), types.NewIntDatum(3500)) + c.Check(int(count), Equals, 4995) + count = colv2.lessRowCount(types.NewIntDatum(1)) + c.Check(int(count), Equals, 0) builder := SampleBuilder{ Sc: mock.NewContext().GetSessionVars().StmtCtx, diff --git a/statistics/table.go b/statistics/table.go index 69694e5c5a3a1..42d82016b478e 100644 --- a/statistics/table.go +++ b/statistics/table.go @@ -271,7 +271,10 @@ func (t *Table) ColumnBetweenRowCount(sc *stmtctx.StatementContext, a, b types.D if !ok || c.IsInvalid(sc, t.Pseudo) { return float64(t.Count) / pseudoBetweenRate } - count := c.BetweenRowCount(a, b) + count, err := c.BetweenRowCount(sc, a, b) + if err != nil { + return 0 + } if a.IsNull() { count += float64(c.NullCount) }