Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor, stats: extract topn from cm sketch #11409

Merged
merged 7 commits into from
Jul 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions executor/analyze.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,8 @@ func (e *AnalyzeIndexExec) buildStatsFromResult(result distsql.SelectResult, nee
}
}
}
return hist, cms, nil
err := hist.ExtractTopN(cms, len(e.idxInfo.Columns), uint32(e.opts[ast.AnalyzeOptNumTopN]))
return hist, cms, err
}

func (e *AnalyzeIndexExec) buildStats(ranges []*ranger.Range, considerNull bool) (hist *statistics.Histogram, cms *statistics.CMSketch, err error) {
Expand Down Expand Up @@ -508,6 +509,7 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range) (hists []*statis
cms = append(cms, nil)
}
for i, col := range e.colsInfo {
collectors[i].ExtractTopN(uint32(e.opts[ast.AnalyzeOptNumTopN]))
for j, s := range collectors[i].Samples {
collectors[i].Samples[j].Ordinal = j
collectors[i].Samples[j].Value, err = tablecodec.DecodeColumnValue(s.Value.GetBytes(), &col.FieldType, timeZone)
Expand Down Expand Up @@ -1213,7 +1215,7 @@ func analyzeIndexIncremental(idxExec *analyzeIndexIncrementalExec) analyzeResult
return analyzeResult{Err: err, job: idxExec.job}
}
if idxExec.oldCMS != nil && cms != nil {
err = cms.MergeCMSketch4IncrementalAnalyze(idxExec.oldCMS)
err = cms.MergeCMSketch4IncrementalAnalyze(idxExec.oldCMS, uint32(idxExec.opts[ast.AnalyzeOptNumTopN]))
if err != nil {
return analyzeResult{Err: err, job: idxExec.job}
}
Expand Down
33 changes: 30 additions & 3 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,16 +135,16 @@ func (s *testSuite1) TestAnalyzeParameters(c *C) {
tbl := s.dom.StatsHandle().GetTableStats(tableInfo)
col := tbl.Columns[1]
c.Assert(col.Len(), Equals, 20)
c.Assert(len(col.CMSketch.TopN()), Equals, 20)
c.Assert(len(col.CMSketch.TopN()), Equals, 1)
width, depth := col.CMSketch.GetWidthAndDepth()
c.Assert(depth, Equals, int32(5))
c.Assert(width, Equals, int32(2048))

tk.MustExec("analyze table t with 4 buckets, 1 topn, 4 cmsketch width, 4 cmsketch depth")
tk.MustExec("analyze table t with 4 buckets, 0 topn, 4 cmsketch width, 4 cmsketch depth")
tbl = s.dom.StatsHandle().GetTableStats(tableInfo)
col = tbl.Columns[1]
c.Assert(col.Len(), Equals, 4)
c.Assert(len(col.CMSketch.TopN()), Equals, 1)
c.Assert(len(col.CMSketch.TopN()), Equals, 0)
width, depth = col.CMSketch.GetWidthAndDepth()
c.Assert(depth, Equals, int32(4))
c.Assert(width, Equals, int32(4))
Expand Down Expand Up @@ -462,3 +462,30 @@ func (s *testSuite1) TestFailedAnalyzeRequest(c *C) {
c.Assert(err.Error(), Equals, "mock buildStatsFromResult error")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/buildStatsFromResult"), IsNil)
}

func (s *testSuite1) TestExtractTopN(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int primary key, b int, index index_b(b))")
for i := 0; i < 10; i++ {
tk.MustExec(fmt.Sprintf("insert into t values (%d, %d)", i, i))
}
for i := 0; i < 10; i++ {
tk.MustExec(fmt.Sprintf("insert into t values (%d, 0)", i+10))
}
tk.MustExec("analyze table t")
is := s.dom.InfoSchema()
table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
tblInfo := table.Meta()
tblStats := s.dom.StatsHandle().GetTableStats(tblInfo)
colStats := tblStats.Columns[tblInfo.Columns[1].ID]
c.Assert(len(colStats.CMSketch.TopN()), Equals, 1)
item := colStats.CMSketch.TopN()[0]
c.Assert(item.Count, Equals, uint64(11))
idxStats := tblStats.Indices[tblInfo.Indices[0].ID]
c.Assert(len(idxStats.CMSketch.TopN()), Equals, 1)
item = idxStats.CMSketch.TopN()[0]
c.Assert(item.Count, Equals, uint64(11))
}
37 changes: 29 additions & 8 deletions statistics/cmsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ func newTopNHelper(sample [][]byte, numTop uint32) *topNHelper {
if i >= numTop && sorted[i]*3 < sorted[numTop-1]*2 && last != sorted[i] {
break
}
if sorted[i] == 1 {
eurekaka marked this conversation as resolved.
Show resolved Hide resolved
break
}
last = sorted[i]
sumTopN += sorted[i]
}
Expand Down Expand Up @@ -244,6 +247,14 @@ func (c *CMSketch) setValue(h1, h2 uint64, count uint64) {
}
}

func (c *CMSketch) subValue(h1, h2 uint64, count uint64) {
c.count -= count
for i := range c.table {
j := (h1 + h2*uint64(i)) % uint64(c.width)
c.table[i][j] = c.table[i][j] - uint32(count)
}
}

func (c *CMSketch) queryValue(sc *stmtctx.StatementContext, val types.Datum) (uint64, error) {
bytes, err := codec.EncodeValue(sc, nil, val)
if err != nil {
Expand Down Expand Up @@ -287,7 +298,7 @@ func (c *CMSketch) queryHashValue(h1, h2 uint64) uint64 {
return uint64(res)
}

func (c *CMSketch) mergeTopN(lTopN map[uint64][]*TopNMeta, rTopN map[uint64][]*TopNMeta, numTop uint32) {
func (c *CMSketch) mergeTopN(lTopN map[uint64][]*TopNMeta, rTopN map[uint64][]*TopNMeta, numTop uint32, usingMax bool) {
counter := make(map[hack.MutableString]uint64)
for _, metas := range lTopN {
for _, meta := range metas {
Expand All @@ -296,7 +307,11 @@ func (c *CMSketch) mergeTopN(lTopN map[uint64][]*TopNMeta, rTopN map[uint64][]*T
}
for _, metas := range rTopN {
for _, meta := range metas {
counter[hack.String(meta.Data)] += meta.Count
if usingMax {
counter[hack.String(meta.Data)] = mathutil.MaxUint64(counter[hack.String(meta.Data)], meta.Count)
} else {
counter[hack.String(meta.Data)] += meta.Count
}
}
}
sorted := make([]uint64, len(counter))
Expand Down Expand Up @@ -326,7 +341,7 @@ func (c *CMSketch) MergeCMSketch(rc *CMSketch, numTopN uint32) error {
return errors.New("Dimensions of Count-Min Sketch should be the same")
}
if c.topN != nil || rc.topN != nil {
c.mergeTopN(c.topN, rc.topN, numTopN)
c.mergeTopN(c.topN, rc.topN, numTopN, false)
}
c.count += rc.count
for i := range c.table {
Expand All @@ -345,12 +360,12 @@ func (c *CMSketch) MergeCMSketch(rc *CMSketch, numTopN uint32) error {
// (3): For values that appears both in `c` and `rc`, if they do not appear partially in `c` and `rc`, for example,
// if `v` appears 5 times in the table, it can appears 5 times in `c` and 3 times in `rc`, then `max` also gives the correct answer.
// So in fact, if we can know the number of appearances of each value in the first place, it is better to use `max` to construct the CM sketch rather than `sum`.
func (c *CMSketch) MergeCMSketch4IncrementalAnalyze(rc *CMSketch) error {
func (c *CMSketch) MergeCMSketch4IncrementalAnalyze(rc *CMSketch, numTopN uint32) error {
if c.depth != rc.depth || c.width != rc.width {
return errors.New("Dimensions of Count-Min Sketch should be the same")
}
if c.topN != nil || rc.topN != nil {
return errors.New("CMSketch with Top-N does not support merge")
c.mergeTopN(c.topN, rc.topN, numTopN, true)
eurekaka marked this conversation as resolved.
Show resolved Hide resolved
}
for i := range c.table {
c.count = 0
Expand Down Expand Up @@ -393,10 +408,10 @@ func CMSketchFromProto(protoSketch *tipb.CMSketch) *CMSketch {
c.count = c.count + uint64(counter)
}
}
c.defaultValue = protoSketch.DefaultValue
if len(protoSketch.TopN) == 0 {
return c
}
c.defaultValue = protoSketch.DefaultValue
c.topN = make(map[uint64][]*TopNMeta)
for _, e := range protoSketch.TopN {
h1, h2 := murmur3.Sum128(e.Data)
Expand Down Expand Up @@ -450,9 +465,15 @@ func LoadCMSketchWithTopN(exec sqlexec.RestrictedSQLExecutor, tableID, isIndex,
return decodeCMSketch(cms, topN)
}

// TotalCount returns the count, it is only used for test.
// TotalCount returns the total count in the sketch, it is only used for test.
func (c *CMSketch) TotalCount() uint64 {
return c.count
res := c.count
for _, metas := range c.topN {
for _, meta := range metas {
res += meta.Count
}
}
return res
}

// Equal tests if two CM Sketch equal, it is only used for test.
Expand Down
2 changes: 1 addition & 1 deletion statistics/cmsketch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (s *testStatisticsSuite) TestMergeCMSketch4IncrementalAnalyze(c *C) {
for key, val := range rMap {
lMap[key] += val
}
c.Assert(lSketch.MergeCMSketch4IncrementalAnalyze(rSketch), IsNil)
c.Assert(lSketch.MergeCMSketch4IncrementalAnalyze(rSketch, 0), IsNil)
avg, err = averageAbsoluteError(lSketch, lMap)
c.Assert(err, IsNil)
c.Check(avg, LessEqual, t.avgError)
Expand Down
1 change: 1 addition & 0 deletions statistics/handle/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1257,6 +1257,7 @@ func (s *testStatsSuite) TestNeedAnalyzeTable(c *C) {
}

func (s *testStatsSuite) TestIndexQueryFeedback(c *C) {
c.Skip("support update the topn of index equal conditions")
defer cleanEnv(c, s.store, s.do)
testKit := testkit.NewTestKit(c, s.store)

Expand Down
66 changes: 66 additions & 0 deletions statistics/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"bytes"
"fmt"
"math"
"sort"
"strings"
"time"

Expand All @@ -33,6 +34,7 @@ import (
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tipb/go-tipb"
"github.com/spaolacci/murmur3"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -1054,3 +1056,67 @@ func matchPrefix(row chunk.Row, colIdx int, ad *types.Datum) bool {
}
return false
}

type dataCnt struct {
data []byte
cnt uint64
}

func getIndexPrefixLens(data []byte, numCols int) (prefixLens []int, err error) {
prefixLens = make([]int, 0, numCols)
var colData []byte
prefixLen := 0
for len(data) > 0 {
colData, data, err = codec.CutOne(data)
if err != nil {
return nil, err
}
prefixLen += len(colData)
prefixLens = append(prefixLens, prefixLen)
}
return prefixLens, nil
}

// ExtractTopN extracts topn from histogram.
func (hg *Histogram) ExtractTopN(cms *CMSketch, numCols int, numTopN uint32) error {
if hg.Len() == 0 || cms == nil || numTopN == 0 {
return nil
}
dataSet := make(map[string]struct{}, hg.Bounds.NumRows())
dataCnts := make([]dataCnt, 0, hg.Bounds.NumRows())
hg.PreCalculateScalar()
// Set a limit on the frequency of boundary values to avoid extract values with low frequency.
limit := hg.notNullCount() / float64(hg.Len())
// Since our histogram are equal depth, they must occurs on the boundaries of buckets.
for i := 0; i < hg.Bounds.NumRows(); i++ {
data := hg.Bounds.GetRow(i).GetBytes(0)
prefixLens, err := getIndexPrefixLens(data, numCols)
if err != nil {
return err
}
for _, prefixLen := range prefixLens {
prefixColData := data[:prefixLen]
_, ok := dataSet[string(prefixColData)]
if ok {
continue
}
dataSet[string(prefixColData)] = struct{}{}
res := hg.BetweenRowCount(types.NewBytesDatum(prefixColData), types.NewBytesDatum(kv.Key(prefixColData).PrefixNext()))
eurekaka marked this conversation as resolved.
Show resolved Hide resolved
if res >= limit {
dataCnts = append(dataCnts, dataCnt{prefixColData, uint64(res)})
}
}
}
sort.SliceStable(dataCnts, func(i, j int) bool { return dataCnts[i].cnt >= dataCnts[j].cnt })
cms.topN = make(map[uint64][]*TopNMeta)
if len(dataCnts) > int(numTopN) {
dataCnts = dataCnts[:numTopN]
}
for _, dataCnt := range dataCnts {
h1, h2 := murmur3.Sum128(dataCnt.data)
realCnt := cms.queryHashValue(h1, h2)
cms.subValue(h1, h2, realCnt)
cms.topN[h1] = append(cms.topN[h1], &TopNMeta{h2, dataCnt.data, realCnt})
}
return nil
}
31 changes: 31 additions & 0 deletions statistics/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import (
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tipb/go-tipb"
"github.com/spaolacci/murmur3"
)

// SampleItem is an item of sampled column value.
Expand Down Expand Up @@ -257,3 +259,32 @@ func RowToDatums(row chunk.Row, fields []*ast.ResultField) []types.Datum {
}
return datums
}

// ExtractTopN extracts the topn from the CM Sketch.
func (c *SampleCollector) ExtractTopN(numTop uint32) {
if numTop == 0 {
return
}
values := make([][]byte, 0, len(c.Samples))
for _, sample := range c.Samples {
values = append(values, sample.Value.GetBytes())
}
helper := newTopNHelper(values, numTop)
cms := c.CMSketch
cms.topN = make(map[uint64][]*TopNMeta)
dataCnts := make([]dataCnt, 0, len(helper.counter))
for key, cnt := range helper.counter {
if cnt >= helper.lastVal {
dataCnts = append(dataCnts, dataCnt{hack.Slice(string(key)), cnt})
}
}
// Sort them decreasingly so we can handle most frequent values first and reduce the probability of hash collision
// by small values.
sort.SliceStable(dataCnts, func(i, j int) bool { return dataCnts[i].cnt >= dataCnts[j].cnt })
for _, dc := range dataCnts {
h1, h2 := murmur3.Sum128(dc.data)
realCnt := cms.queryHashValue(h1, h2)
cms.subValue(h1, h2, realCnt)
cms.topN[h1] = append(cms.topN[h1], &TopNMeta{h2, dc.data, realCnt})
}
}