From 3dd842f50a75ebbf9a4f9d7b30fb2ce8a8dd4b37 Mon Sep 17 00:00:00 2001 From: Yiding Cui Date: Wed, 13 Jan 2021 18:24:21 +0800 Subject: [PATCH] statistics: add bucket ndv for index histogram (#20580) Co-authored-by: Yuanjia Zhang Co-authored-by: ti-srebot <66930949+ti-srebot@users.noreply.github.com> --- cmd/explaintest/run-tests.sh | 2 +- distsql/select_result.go | 3 +- distsql/stream.go | 2 +- executor/analyze.go | 12 +- executor/analyze_test.go | 97 ++++++------- executor/show_stats.go | 1 + executor/show_stats_test.go | 24 ++-- planner/core/planbuilder.go | 4 +- session/bootstrap.go | 15 +- statistics/builder.go | 20 ++- statistics/feedback.go | 77 +++++++---- statistics/feedback_test.go | 129 +++++++++++------- statistics/handle/handle.go | 6 +- statistics/handle/update.go | 18 ++- statistics/handle/update_test.go | 102 +++++++------- statistics/histogram.go | 87 ++++++++---- statistics/histogram_test.go | 28 ++-- statistics/sample_test.go | 2 +- statistics/statistics_test.go | 14 +- statistics/table.go | 2 +- statistics/testdata/stats_suite_out.json | 4 +- store/mockstore/mocktikv/analyze.go | 4 +- .../mockstore/unistore/cophandler/analyze.go | 14 +- .../unistore/cophandler/closure_exec.go | 30 ++++ .../unistore/cophandler/cop_handler.go | 7 +- util/testkit/testkit.go | 2 +- 26 files changed, 439 insertions(+), 267 deletions(-) diff --git a/cmd/explaintest/run-tests.sh b/cmd/explaintest/run-tests.sh index b614a9ced73f3..1677b88f2bcce 100755 --- a/cmd/explaintest/run-tests.sh +++ b/cmd/explaintest/run-tests.sh @@ -208,7 +208,7 @@ if [ "${TIDB_TEST_STORE_NAME}" = "tikv" ]; then $tidb_server -P "$port" -status "$status" -config config.toml -store tikv -path "${TIKV_PATH}" > $explain_test_log 2>&1 & SERVER_PID=$! else - $tidb_server -P "$port" -status "$status" -config config.toml -store mocktikv -path "" > $explain_test_log 2>&1 & + $tidb_server -P "$port" -status "$status" -config config.toml -store unistore -path "" > $explain_test_log 2>&1 & SERVER_PID=$! fi echo "tidb-server(PID: $SERVER_PID) started" diff --git a/distsql/select_result.go b/distsql/select_result.go index 92bd66506dedb..eae235f892ffe 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -149,9 +149,8 @@ func (r *selectResult) fetchResp(ctx context.Context) error { sc.AppendWarning(dbterror.ClassTiKV.Synthesize(terror.ErrCode(warning.Code), warning.Msg)) } if r.feedback != nil { - r.feedback.Update(resultSubset.GetStartKey(), r.selectResp.OutputCounts) + r.feedback.Update(resultSubset.GetStartKey(), r.selectResp.OutputCounts, r.selectResp.Ndvs) } - r.partialCount++ hasStats, ok := resultSubset.(CopRuntimeStats) diff --git a/distsql/stream.go b/distsql/stream.go index 56e8b9e89b244..912f4d5bfd09c 100644 --- a/distsql/stream.go +++ b/distsql/stream.go @@ -105,7 +105,7 @@ func (r *streamResult) readDataFromResponse(ctx context.Context, resp kv.Respons if err != nil { return false, errors.Trace(err) } - r.feedback.Update(resultSubset.GetStartKey(), stream.OutputCounts) + r.feedback.Update(resultSubset.GetStartKey(), stream.OutputCounts, stream.Ndvs) r.partialCount++ hasStats, ok := resultSubset.(CopRuntimeStats) diff --git a/executor/analyze.go b/executor/analyze.go index c0837bf5e07a0..a5ce368901f0d 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -335,6 +335,10 @@ func (e *AnalyzeIndexExec) buildStatsFromResult(result distsql.SelectResult, nee cms = statistics.NewCMSketch(int32(e.opts[ast.AnalyzeOptCMSketchDepth]), int32(e.opts[ast.AnalyzeOptCMSketchWidth])) topn = statistics.NewTopN(int(e.opts[ast.AnalyzeOptNumTopN])) } + statsVer := statistics.Version1 + if e.analyzePB.IdxReq.Version != nil { + statsVer = int(*e.analyzePB.IdxReq.Version) + } for { data, err := result.NextRaw(context.TODO()) if err != nil { @@ -350,7 +354,7 @@ func (e *AnalyzeIndexExec) buildStatsFromResult(result distsql.SelectResult, nee } respHist := statistics.HistogramFromProto(resp.Hist) e.job.Update(int64(respHist.TotalRowCount())) - hist, err = statistics.MergeHistograms(e.ctx.GetSessionVars().StmtCtx, hist, respHist, int(e.opts[ast.AnalyzeOptNumBuckets])) + hist, err = statistics.MergeHistograms(e.ctx.GetSessionVars().StmtCtx, hist, respHist, int(e.opts[ast.AnalyzeOptNumBuckets]), statsVer) if err != nil { return nil, nil, nil, err } @@ -556,7 +560,7 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats boo if hasPkHist(e.handleCols) { respHist := statistics.HistogramFromProto(resp.PkHist) rowCount = int64(respHist.TotalRowCount()) - pkHist, err = statistics.MergeHistograms(sc, pkHist, respHist, int(e.opts[ast.AnalyzeOptNumBuckets])) + pkHist, err = statistics.MergeHistograms(sc, pkHist, respHist, int(e.opts[ast.AnalyzeOptNumBuckets]), statistics.Version1) if err != nil { return nil, nil, nil, nil, err } @@ -1244,7 +1248,7 @@ func analyzeIndexIncremental(idxExec *analyzeIndexIncrementalExec) analyzeResult if err != nil { return analyzeResult{Err: err, job: idxExec.job} } - hist, err = statistics.MergeHistograms(idxExec.ctx.GetSessionVars().StmtCtx, idxExec.oldHist, hist, int(idxExec.opts[ast.AnalyzeOptNumBuckets])) + hist, err = statistics.MergeHistograms(idxExec.ctx.GetSessionVars().StmtCtx, idxExec.oldHist, hist, int(idxExec.opts[ast.AnalyzeOptNumBuckets]), statistics.Version1) if err != nil { return analyzeResult{Err: err, job: idxExec.job} } @@ -1295,7 +1299,7 @@ func analyzePKIncremental(colExec *analyzePKIncrementalExec) analyzeResult { return analyzeResult{Err: err, job: colExec.job} } hist := hists[0] - hist, err = statistics.MergeHistograms(colExec.ctx.GetSessionVars().StmtCtx, colExec.oldHist, hist, int(colExec.opts[ast.AnalyzeOptNumBuckets])) + hist, err = statistics.MergeHistograms(colExec.ctx.GetSessionVars().StmtCtx, colExec.oldHist, hist, int(colExec.opts[ast.AnalyzeOptNumBuckets]), statistics.Version1) if err != nil { return analyzeResult{Err: err, job: colExec.job} } diff --git a/executor/analyze_test.go b/executor/analyze_test.go index f8fdf69f56be5..02a20e787f5fa 100644 --- a/executor/analyze_test.go +++ b/executor/analyze_test.go @@ -461,8 +461,8 @@ func (s *testFastAnalyze) TestFastAnalyze(c *C) { tk.MustExec("insert into t2 values (0), (18446744073709551615)") tk.MustExec("analyze table t2") tk.MustQuery("show stats_buckets where table_name = 't2'").Check(testkit.Rows( - "test t2 a 0 0 1 1 0 0", - "test t2 a 0 1 2 1 18446744073709551615 18446744073709551615")) + "test t2 a 0 0 1 1 0 0 0", + "test t2 a 0 1 2 1 18446744073709551615 18446744073709551615 0")) tk.MustExec(`set @@tidb_partition_prune_mode='` + string(variable.StaticOnly) + `'`) tk.MustExec(`create table t3 (id int, v int, primary key(id), index k(v)) partition by hash (id) partitions 4`) @@ -531,6 +531,7 @@ func (s *testSuite1) TestAnalyzeIncremental(c *C) { } func (s *testSuite1) TestAnalyzeIncrementalStreaming(c *C) { + c.Skip("unistore hasn't support streaming yet.") tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") tk.Se.GetSessionVars().EnableStreaming = true @@ -545,13 +546,13 @@ func (s *testSuite1) testAnalyzeIncremental(tk *testkit.TestKit, c *C) { tk.MustQuery("show stats_buckets").Check(testkit.Rows()) tk.MustExec("insert into t values (1,1)") tk.MustExec("analyze incremental table t index") - tk.MustQuery("show stats_buckets").Check(testkit.Rows("test t a 0 0 1 1 1 1", "test t idx 1 0 1 1 1 1")) + tk.MustQuery("show stats_buckets").Check(testkit.Rows("test t a 0 0 1 1 1 1 0", "test t idx 1 0 1 1 1 1 0")) tk.MustExec("insert into t values (2,2)") tk.MustExec("analyze incremental table t index") - tk.MustQuery("show stats_buckets").Check(testkit.Rows("test t a 0 0 1 1 1 1", "test t a 0 1 2 1 2 2", "test t idx 1 0 1 1 1 1", "test t idx 1 1 2 1 2 2")) + tk.MustQuery("show stats_buckets").Check(testkit.Rows("test t a 0 0 1 1 1 1 0", "test t a 0 1 2 1 2 2 0", "test t idx 1 0 1 1 1 1 0", "test t idx 1 1 2 1 2 2 0")) tk.MustExec("analyze incremental table t index") // Result should not change. - tk.MustQuery("show stats_buckets").Check(testkit.Rows("test t a 0 0 1 1 1 1", "test t a 0 1 2 1 2 2", "test t idx 1 0 1 1 1 1", "test t idx 1 1 2 1 2 2")) + tk.MustQuery("show stats_buckets").Check(testkit.Rows("test t a 0 0 1 1 1 1 0", "test t a 0 1 2 1 2 2 0", "test t idx 1 0 1 1 1 1 0", "test t idx 1 1 2 1 2 2 0")) // Test analyze incremental with feedback. tk.MustExec("insert into t values (3,3)") @@ -574,7 +575,7 @@ func (s *testSuite1) testAnalyzeIncremental(tk *testkit.TestKit, c *C) { c.Assert(h.DumpStatsFeedbackToKV(), IsNil) c.Assert(h.HandleUpdateStats(is), IsNil) c.Assert(h.Update(is), IsNil) - tk.MustQuery("show stats_buckets").Check(testkit.Rows("test t a 0 0 1 1 1 1", "test t a 0 1 3 0 2 2147483647", "test t idx 1 0 1 1 1 1", "test t idx 1 1 2 1 2 2")) + tk.MustQuery("show stats_buckets").Check(testkit.Rows("test t a 0 0 1 1 1 1 0", "test t a 0 1 3 0 2 2147483647 0", "test t idx 1 0 1 1 1 1 0", "test t idx 1 1 2 1 2 2 0")) tblStats := h.GetTableStats(tblInfo) val, err := codec.EncodeKey(tk.Se.GetSessionVars().StmtCtx, nil, types.NewIntDatum(3)) c.Assert(err, IsNil) @@ -583,8 +584,8 @@ func (s *testSuite1) testAnalyzeIncremental(tk *testkit.TestKit, c *C) { c.Assert(statistics.IsAnalyzed(tblStats.Columns[tblInfo.Columns[0].ID].Flag), IsFalse) tk.MustExec("analyze incremental table t index") - tk.MustQuery("show stats_buckets").Check(testkit.Rows("test t a 0 0 1 1 1 1", "test t a 0 1 2 1 2 2", "test t a 0 2 3 1 3 3", - "test t idx 1 0 1 1 1 1", "test t idx 1 1 2 1 2 2", "test t idx 1 2 3 1 3 3")) + tk.MustQuery("show stats_buckets").Check(testkit.Rows("test t a 0 0 1 1 1 1 0", "test t a 0 1 2 1 2 2 0", "test t a 0 2 3 1 3 3 0", + "test t idx 1 0 1 1 1 1 0", "test t idx 1 1 2 1 2 2 0", "test t idx 1 2 3 1 3 3 0")) tblStats = h.GetTableStats(tblInfo) c.Assert(tblStats.Indices[tblInfo.Indices[0].ID].QueryBytes(val), Equals, uint64(1)) } @@ -769,36 +770,36 @@ func (s *testSuite1) TestNormalAnalyzeOnCommonHandle(c *C) { tk.MustExec("analyze table t1, t2, t3") tk.MustQuery(`show stats_buckets where table_name in ("t1", "t2", "t3")`).Sort().Check(testkit.Rows( - "test t1 a 0 0 1 1 1 1", - "test t1 a 0 1 2 1 2 2", - "test t1 a 0 2 3 1 3 3", - "test t1 b 0 0 1 1 1 1", - "test t1 b 0 1 2 1 2 2", - "test t1 b 0 2 3 1 3 3", - "test t2 PRIMARY 1 0 1 1 111 111", - "test t2 PRIMARY 1 1 2 1 222 222", - "test t2 PRIMARY 1 2 3 1 333 333", - "test t2 a 0 0 1 1 111 111", - "test t2 a 0 1 2 1 222 222", - "test t2 a 0 2 3 1 333 333", - "test t2 b 0 0 1 1 1 1", - "test t2 b 0 1 2 1 2 2", - "test t2 b 0 2 3 1 3 3", - "test t3 PRIMARY 1 0 1 1 (1, 1) (1, 1)", - "test t3 PRIMARY 1 1 2 1 (2, 2) (2, 2)", - "test t3 PRIMARY 1 2 3 1 (3, 3) (3, 3)", - "test t3 a 0 0 1 1 1 1", - "test t3 a 0 1 2 1 2 2", - "test t3 a 0 2 3 1 3 3", - "test t3 b 0 0 1 1 1 1", - "test t3 b 0 1 2 1 2 2", - "test t3 b 0 2 3 1 3 3", - "test t3 c 0 0 1 1 1 1", - "test t3 c 0 1 2 1 2 2", - "test t3 c 0 2 3 1 3 3", - "test t3 c 1 0 1 1 1 1", - "test t3 c 1 1 2 1 2 2", - "test t3 c 1 2 3 1 3 3")) + "test t1 a 0 0 1 1 1 1 0", + "test t1 a 0 1 2 1 2 2 0", + "test t1 a 0 2 3 1 3 3 0", + "test t1 b 0 0 1 1 1 1 0", + "test t1 b 0 1 2 1 2 2 0", + "test t1 b 0 2 3 1 3 3 0", + "test t2 PRIMARY 1 0 1 1 111 111 0", + "test t2 PRIMARY 1 1 2 1 222 222 0", + "test t2 PRIMARY 1 2 3 1 333 333 0", + "test t2 a 0 0 1 1 111 111 0", + "test t2 a 0 1 2 1 222 222 0", + "test t2 a 0 2 3 1 333 333 0", + "test t2 b 0 0 1 1 1 1 0", + "test t2 b 0 1 2 1 2 2 0", + "test t2 b 0 2 3 1 3 3 0", + "test t3 PRIMARY 1 0 1 1 (1, 1) (1, 1) 0", + "test t3 PRIMARY 1 1 2 1 (2, 2) (2, 2) 0", + "test t3 PRIMARY 1 2 3 1 (3, 3) (3, 3) 0", + "test t3 a 0 0 1 1 1 1 0", + "test t3 a 0 1 2 1 2 2 0", + "test t3 a 0 2 3 1 3 3 0", + "test t3 b 0 0 1 1 1 1 0", + "test t3 b 0 1 2 1 2 2 0", + "test t3 b 0 2 3 1 3 3 0", + "test t3 c 0 0 1 1 1 1 0", + "test t3 c 0 1 2 1 2 2 0", + "test t3 c 0 2 3 1 3 3 0", + "test t3 c 1 0 1 1 1 1 0", + "test t3 c 1 1 2 1 2 2 0", + "test t3 c 1 2 3 1 3 3 0")) } func (s *testSuite1) TestDefaultValForAnalyze(c *C) { @@ -846,15 +847,15 @@ func (s *testSerialSuite2) TestIssue20874(c *C) { tk.MustExec("insert into t values ('#', 'C'), ('$', 'c'), ('a', 'a')") tk.MustExec("analyze table t") tk.MustQuery("show stats_buckets where db_name = 'test' and table_name = 't'").Sort().Check(testkit.Rows( - "test t a 0 0 1 1 \x02\xd2 \x02\xd2", - "test t a 0 1 2 1 \x0e\x0f \x0e\x0f", - "test t a 0 2 3 1 \x0e3 \x0e3", - "test t b 0 0 1 1 \x00A \x00A", - "test t b 0 1 3 2 \x00C \x00C", - "test t idxa 1 0 1 1 \x02\xd2 \x02\xd2", - "test t idxa 1 1 2 1 \x0e\x0f \x0e\x0f", - "test t idxa 1 2 3 1 \x0e3 \x0e3", - "test t idxb 1 0 1 1 \x00A \x00A", - "test t idxb 1 1 3 2 \x00C \x00C", + "test t a 0 0 1 1 \x02\xd2 \x02\xd2 0", + "test t a 0 1 2 1 \x0e\x0f \x0e\x0f 0", + "test t a 0 2 3 1 \x0e3 \x0e3 0", + "test t b 0 0 1 1 \x00A \x00A 0", + "test t b 0 1 3 2 \x00C \x00C 0", + "test t idxa 1 0 1 1 \x02\xd2 \x02\xd2 0", + "test t idxa 1 1 2 1 \x0e\x0f \x0e\x0f 0", + "test t idxa 1 2 3 1 \x0e3 \x0e3 0", + "test t idxb 1 0 1 1 \x00A \x00A 0", + "test t idxb 1 1 3 2 \x00C \x00C 0", )) } diff --git a/executor/show_stats.go b/executor/show_stats.go index ad9c454e4b96f..86c8974d0af95 100644 --- a/executor/show_stats.go +++ b/executor/show_stats.go @@ -259,6 +259,7 @@ func (e *ShowExec) bucketsToRows(dbName, tblName, partitionName, colName string, hist.Buckets[i].Repeat, lowerBoundStr, upperBoundStr, + hist.Buckets[i].NDV, }) } return nil diff --git a/executor/show_stats_test.go b/executor/show_stats_test.go index 270c35f5abf2d..f21ada8ea2b1c 100644 --- a/executor/show_stats_test.go +++ b/executor/show_stats_test.go @@ -80,36 +80,36 @@ func (s *testShowStatsSuite) TestShowStatsBuckets(c *C) { tk.MustExec("insert into t values (1,1)") tk.MustExec("analyze table t") result := tk.MustQuery("show stats_buckets").Sort() - result.Check(testkit.Rows("test t a 0 0 1 1 1 1", "test t b 0 0 1 1 1 1", "test t idx 1 0 1 1 (1, 1) (1, 1)")) + result.Check(testkit.Rows("test t a 0 0 1 1 1 1 0", "test t b 0 0 1 1 1 1 0", "test t idx 1 0 1 1 (1, 1) (1, 1) 0")) result = tk.MustQuery("show stats_buckets where column_name = 'idx'") - result.Check(testkit.Rows("test t idx 1 0 1 1 (1, 1) (1, 1)")) + result.Check(testkit.Rows("test t idx 1 0 1 1 (1, 1) (1, 1) 0")) tk.MustExec("drop table t") tk.MustExec("create table t (`a` datetime, `b` int, key `idx`(`a`, `b`))") tk.MustExec("insert into t values (\"2020-01-01\", 1)") tk.MustExec("analyze table t") result = tk.MustQuery("show stats_buckets").Sort() - result.Check(testkit.Rows("test t a 0 0 1 1 2020-01-01 00:00:00 2020-01-01 00:00:00", "test t b 0 0 1 1 1 1", "test t idx 1 0 1 1 (2020-01-01 00:00:00, 1) (2020-01-01 00:00:00, 1)")) + result.Check(testkit.Rows("test t a 0 0 1 1 2020-01-01 00:00:00 2020-01-01 00:00:00 0", "test t b 0 0 1 1 1 1 0", "test t idx 1 0 1 1 (2020-01-01 00:00:00, 1) (2020-01-01 00:00:00, 1) 0")) result = tk.MustQuery("show stats_buckets where column_name = 'idx'") - result.Check(testkit.Rows("test t idx 1 0 1 1 (2020-01-01 00:00:00, 1) (2020-01-01 00:00:00, 1)")) + result.Check(testkit.Rows("test t idx 1 0 1 1 (2020-01-01 00:00:00, 1) (2020-01-01 00:00:00, 1) 0")) tk.MustExec("drop table t") tk.MustExec("create table t (`a` date, `b` int, key `idx`(`a`, `b`))") tk.MustExec("insert into t values (\"2020-01-01\", 1)") tk.MustExec("analyze table t") result = tk.MustQuery("show stats_buckets").Sort() - result.Check(testkit.Rows("test t a 0 0 1 1 2020-01-01 2020-01-01", "test t b 0 0 1 1 1 1", "test t idx 1 0 1 1 (2020-01-01, 1) (2020-01-01, 1)")) + result.Check(testkit.Rows("test t a 0 0 1 1 2020-01-01 2020-01-01 0", "test t b 0 0 1 1 1 1 0", "test t idx 1 0 1 1 (2020-01-01, 1) (2020-01-01, 1) 0")) result = tk.MustQuery("show stats_buckets where column_name = 'idx'") - result.Check(testkit.Rows("test t idx 1 0 1 1 (2020-01-01, 1) (2020-01-01, 1)")) + result.Check(testkit.Rows("test t idx 1 0 1 1 (2020-01-01, 1) (2020-01-01, 1) 0")) tk.MustExec("drop table t") tk.MustExec("create table t (`a` timestamp, `b` int, key `idx`(`a`, `b`))") tk.MustExec("insert into t values (\"2020-01-01\", 1)") tk.MustExec("analyze table t") result = tk.MustQuery("show stats_buckets").Sort() - result.Check(testkit.Rows("test t a 0 0 1 1 2020-01-01 00:00:00 2020-01-01 00:00:00", "test t b 0 0 1 1 1 1", "test t idx 1 0 1 1 (2020-01-01 00:00:00, 1) (2020-01-01 00:00:00, 1)")) + result.Check(testkit.Rows("test t a 0 0 1 1 2020-01-01 00:00:00 2020-01-01 00:00:00 0", "test t b 0 0 1 1 1 1 0", "test t idx 1 0 1 1 (2020-01-01 00:00:00, 1) (2020-01-01 00:00:00, 1) 0")) result = tk.MustQuery("show stats_buckets where column_name = 'idx'") - result.Check(testkit.Rows("test t idx 1 0 1 1 (2020-01-01 00:00:00, 1) (2020-01-01 00:00:00, 1)")) + result.Check(testkit.Rows("test t idx 1 0 1 1 (2020-01-01 00:00:00, 1) (2020-01-01 00:00:00, 1) 0")) } func (s *testShowStatsSuite) TestShowStatsHasNullValue(c *C) { @@ -124,14 +124,14 @@ func (s *testShowStatsSuite) TestShowStatsHasNullValue(c *C) { tk.MustExec("insert into t values(1)") tk.MustExec("analyze table t") tk.MustQuery("show stats_buckets").Sort().Check(testkit.Rows( - "test t a 0 0 1 1 1 1", - "test t idx 1 0 1 1 1 1", + "test t a 0 0 1 1 1 1 0", + "test t idx 1 0 1 1 1 1 0", )) tk.MustExec("drop table t") tk.MustExec("create table t (a int, b int, index idx(a, b))") tk.MustExec("insert into t values(NULL, NULL)") tk.MustExec("analyze table t") - tk.MustQuery("show stats_buckets").Check(testkit.Rows("test t idx 1 0 1 1 (NULL, NULL) (NULL, NULL)")) + tk.MustQuery("show stats_buckets").Check(testkit.Rows("test t idx 1 0 1 1 (NULL, NULL) (NULL, NULL) 0")) tk.MustExec("drop table t") tk.MustExec("create table t(a int, b int, c int, index idx_b(b), index idx_c_a(c, a))") @@ -201,7 +201,7 @@ func (s *testShowStatsSuite) TestShowPartitionStats(c *C) { c.Assert(result.Rows()[2][3], Equals, "idx") result = tk.MustQuery("show stats_buckets").Sort() - result.Check(testkit.Rows("test t p0 a 0 0 1 1 1 1", "test t p0 b 0 0 1 1 1 1", "test t p0 idx 1 0 1 1 1 1")) + result.Check(testkit.Rows("test t p0 a 0 0 1 1 1 1 0", "test t p0 b 0 0 1 1 1 1 0", "test t p0 idx 1 0 1 1 1 1 0")) result = tk.MustQuery("show stats_healthy") result.Check(testkit.Rows("test t p0 100")) diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index d71f156e94694..3661a6436daaf 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -3671,9 +3671,9 @@ func buildShowSchema(s *ast.ShowStmt, isView bool, isSequence bool) (schema *exp mysql.TypeLonglong, mysql.TypeLonglong, mysql.TypeDouble, mysql.TypeDouble} case ast.ShowStatsBuckets: names = []string{"Db_name", "Table_name", "Partition_name", "Column_name", "Is_index", "Bucket_id", "Count", - "Repeats", "Lower_Bound", "Upper_Bound"} + "Repeats", "Lower_Bound", "Upper_Bound", "Ndv"} ftypes = []byte{mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeTiny, mysql.TypeLonglong, - mysql.TypeLonglong, mysql.TypeLonglong, mysql.TypeVarchar, mysql.TypeVarchar} + mysql.TypeLonglong, mysql.TypeLonglong, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLonglong} case ast.ShowStatsTopN: names = []string{"Db_name", "Table_name", "Partition_name", "Column_name", "Is_index", "Value", "Count"} ftypes = []byte{mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeTiny, mysql.TypeVarchar, mysql.TypeLonglong} diff --git a/session/bootstrap.go b/session/bootstrap.go index 2bfb718d88c48..e1dfbcabb7f9a 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -207,7 +207,8 @@ const ( count BIGINT(64) NOT NULL, repeats BIGINT(64) NOT NULL, upper_bound BLOB NOT NULL, - lower_bound BLOB , + lower_bound BLOB , + ndv BIGINT NOT NULL DEFAULT 0, UNIQUE INDEX tbl(table_id, is_index, hist_id, bucket_id) );` @@ -456,9 +457,11 @@ const ( version60 = 60 // version61 restore all SQL bindings. version61 = 61 + // version62 add column ndv for mysql.stats_buckets. + version62 = 62 // please make sure this is the largest version - currentBootstrapVersion = version61 + currentBootstrapVersion = version62 ) var ( @@ -524,6 +527,7 @@ var ( upgradeToVer59, upgradeToVer60, upgradeToVer61, + upgradeToVer62, } ) @@ -1412,6 +1416,13 @@ func writeMemoryQuotaQuery(s Session) { mustExecute(s, sql) } +func upgradeToVer62(s Session, ver int64) { + if ver >= version62 { + return + } + doReentrantDDL(s, "ALTER TABLE mysql.stats_buckets ADD COLUMN `ndv` bigint not null default 0", infoschema.ErrColumnExists) +} + func writeOOMAction(s Session) { comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+" sql := fmt.Sprintf(`INSERT HIGH_PRIORITY INTO %s.%s VALUES ("%s", '%s', '%s') ON DUPLICATE KEY UPDATE VARIABLE_VALUE='%s'`, diff --git a/statistics/builder.go b/statistics/builder.go index 0c2182bc7c7af..774180fa48785 100644 --- a/statistics/builder.go +++ b/statistics/builder.go @@ -32,15 +32,17 @@ type SortedBuilder struct { bucketIdx int64 Count int64 hist *Histogram + needBucketNDV bool } // NewSortedBuilder creates a new SortedBuilder. -func NewSortedBuilder(sc *stmtctx.StatementContext, numBuckets, id int64, tp *types.FieldType) *SortedBuilder { +func NewSortedBuilder(sc *stmtctx.StatementContext, numBuckets, id int64, tp *types.FieldType, statsVer int) *SortedBuilder { return &SortedBuilder{ sc: sc, numBuckets: numBuckets, valuesPerBucket: 1, hist: NewHistogram(id, 0, 0, 0, tp, int(numBuckets), 0), + needBucketNDV: statsVer == Version2, } } @@ -52,8 +54,14 @@ func (b *SortedBuilder) Hist() *Histogram { // Iterate updates the histogram incrementally. func (b *SortedBuilder) Iterate(data types.Datum) error { b.Count++ + appendBucket := b.hist.AppendBucket + if b.needBucketNDV { + appendBucket = func(lower, upper *types.Datum, count, repeat int64) { + b.hist.AppendBucketWithNDV(lower, upper, count, repeat, 1) + } + } if b.Count == 1 { - b.hist.AppendBucket(&data, &data, 1, 1) + appendBucket(&data, &data, 1, 1) b.hist.NDV = 1 return nil } @@ -69,7 +77,7 @@ func (b *SortedBuilder) Iterate(data types.Datum) error { b.hist.Buckets[b.bucketIdx].Repeat++ } else if b.hist.Buckets[b.bucketIdx].Count+1-b.lastNumber <= b.valuesPerBucket { // The bucket still have room to store a new item, update the bucket. - b.hist.updateLastBucket(&data, b.hist.Buckets[b.bucketIdx].Count+1, 1) + b.hist.updateLastBucket(&data, b.hist.Buckets[b.bucketIdx].Count+1, 1, b.needBucketNDV) b.hist.NDV++ } else { // All buckets are full, we should merge buckets. @@ -85,11 +93,11 @@ func (b *SortedBuilder) Iterate(data types.Datum) error { } // We may merge buckets, so we should check it again. if b.hist.Buckets[b.bucketIdx].Count+1-b.lastNumber <= b.valuesPerBucket { - b.hist.updateLastBucket(&data, b.hist.Buckets[b.bucketIdx].Count+1, 1) + b.hist.updateLastBucket(&data, b.hist.Buckets[b.bucketIdx].Count+1, 1, b.needBucketNDV) } else { b.lastNumber = b.hist.Buckets[b.bucketIdx].Count b.bucketIdx++ - b.hist.AppendBucket(&data, &data, b.lastNumber+1, 1) + appendBucket(&data, &data, b.lastNumber+1, 1) } b.hist.NDV++ } @@ -165,7 +173,7 @@ func buildHist(sc *stmtctx.StatementContext, hg *Histogram, samples []*SampleIte } } else if totalCount-float64(lastCount) <= valuesPerBucket { // The bucket still have room to store a new item, update the bucket. - hg.updateLastBucket(&samples[i].Value, int64(totalCount), int64(ndvFactor)) + hg.updateLastBucket(&samples[i].Value, int64(totalCount), int64(ndvFactor), false) } else { lastCount = hg.Buckets[bucketIdx].Count // The bucket is full, store the item in the next bucket. diff --git a/statistics/feedback.go b/statistics/feedback.go index 3e241338d7ee3..7e2e4225925ed 100644 --- a/statistics/feedback.go +++ b/statistics/feedback.go @@ -44,6 +44,7 @@ type Feedback struct { Upper *types.Datum Count int64 Repeat int64 + Ndv int64 } // QueryFeedback is used to represent the query feedback info. It contains the query's scan ranges and number of rows @@ -236,7 +237,7 @@ func (q *QueryFeedback) DecodeIntValues() *QueryFeedback { func (q *QueryFeedback) StoreRanges(ranges []*ranger.Range) { q.Feedback = make([]Feedback, 0, len(ranges)) for _, ran := range ranges { - q.Feedback = append(q.Feedback, Feedback{&ran.LowVal[0], &ran.HighVal[0], 0, 0}) + q.Feedback = append(q.Feedback, Feedback{&ran.LowVal[0], &ran.HighVal[0], 0, 0, 0}) } } @@ -258,7 +259,7 @@ func (q *QueryFeedback) Actual() int64 { // Update updates the query feedback. `startKey` is the start scan key of the partial result, used to find // the range for update. `counts` is the scan counts of each range, used to update the feedback count info. -func (q *QueryFeedback) Update(startKey kv.Key, counts []int64) { +func (q *QueryFeedback) Update(startKey kv.Key, counts, ndvs []int64) { // Older versions do not have the counts info. if len(counts) == 0 { q.Invalidate() @@ -292,6 +293,7 @@ func (q *QueryFeedback) Update(startKey kv.Key, counts []int64) { for i := 0; i < len(counts)/2; i++ { j := len(counts) - i - 1 counts[i], counts[j] = counts[j], counts[i] + ndvs[i], ndvs[j] = ndvs[j], ndvs[i] } } // Update the feedback count info. @@ -301,6 +303,7 @@ func (q *QueryFeedback) Update(startKey kv.Key, counts []int64) { break } q.Feedback[i+idx].Count += count + q.Feedback[i+idx].Ndv += ndvs[i] } } @@ -503,23 +506,25 @@ type bucket = Feedback // calculates the count for each new bucket, merge the new bucket whose count // is smaller than "minBucketFraction*totalCount" with the next new bucket // until the last new bucket. -func (b *BucketFeedback) splitBucket(newNumBkts int, totalCount float64, originBucketCount float64) []bucket { +func (b *BucketFeedback) splitBucket(newNumBkts int, totalCount float64, originBucketCount float64, originalNdv int64) []bucket { // Split the bucket. bounds := b.getBoundaries(newNumBkts + 1) bkts := make([]bucket, 0, len(bounds)-1) sc := &stmtctx.StatementContext{TimeZone: time.UTC} for i := 1; i < len(bounds); i++ { - newBkt := bucket{&bounds[i-1], bounds[i].Clone(), 0, 0} + newBkt := bucket{&bounds[i-1], bounds[i].Clone(), 0, 0, 0} // get bucket count - _, ratio := getOverlapFraction(Feedback{b.lower, b.upper, int64(originBucketCount), 0}, newBkt) + _, ratio := getOverlapFraction(Feedback{b.lower, b.upper, int64(originBucketCount), 0, 0}, newBkt) countInNewBkt := originBucketCount * ratio - countInNewBkt = b.refineBucketCount(sc, newBkt, countInNewBkt) + ndvInNewBkt := int64(float64(originalNdv) * ratio) + countInNewBkt, ndvInNewBkt = b.refineBucketCount(sc, newBkt, countInNewBkt, ndvInNewBkt) // do not split if the count of result bucket is too small. if countInNewBkt < minBucketFraction*totalCount { bounds[i] = bounds[i-1] continue } newBkt.Count = int64(countInNewBkt) + newBkt.Ndv = ndvInNewBkt bkts = append(bkts, newBkt) // To guarantee that each bucket's range will not overlap. setNextValue(&bounds[i]) @@ -556,45 +561,51 @@ func getOverlapFraction(fb Feedback, bkt bucket) (float64, float64) { } // mergeFullyContainedFeedback merges the max fraction of non-overlapped feedbacks that are fully contained in the bucket. -func (b *BucketFeedback) mergeFullyContainedFeedback(sc *stmtctx.StatementContext, bkt bucket) (float64, float64, bool) { +func (b *BucketFeedback) mergeFullyContainedFeedback(sc *stmtctx.StatementContext, bkt bucket) (float64, float64, int64, bool) { feedbacks := make([]Feedback, 0, len(b.feedback)) // Get all the fully contained feedbacks. for _, fb := range b.feedback { res, err := outOfRange(sc, bkt.Lower, bkt.Upper, fb.Lower) if res != 0 || err != nil { - return 0, 0, false + return 0, 0, 0, false } res, err = outOfRange(sc, bkt.Lower, bkt.Upper, fb.Upper) if res != 0 || err != nil { - return 0, 0, false + return 0, 0, 0, false } feedbacks = append(feedbacks, fb) } if len(feedbacks) == 0 { - return 0, 0, false + return 0, 0, 0, false } sortedFBs, ok := NonOverlappedFeedbacks(sc, feedbacks) if !ok { - return 0, 0, false + return 0, 0, 0, false } - var sumFraction, sumCount float64 + var ( + sumFraction, sumCount float64 + ndv int64 + ) for _, fb := range sortedFBs { fraction, _ := getOverlapFraction(fb, bkt) sumFraction += fraction sumCount += float64(fb.Count) + ndv += fb.Ndv } - return sumFraction, sumCount, true + return sumFraction, sumCount, ndv, true } // refineBucketCount refine the newly split bucket count. It uses the feedback that overlaps most // with the bucket to get the bucket count. -func (b *BucketFeedback) refineBucketCount(sc *stmtctx.StatementContext, bkt bucket, defaultCount float64) float64 { +func (b *BucketFeedback) refineBucketCount(sc *stmtctx.StatementContext, bkt bucket, defaultCount float64, defaultNdv int64) (float64, int64) { bestFraction := minBucketFraction count := defaultCount - sumFraction, sumCount, ok := b.mergeFullyContainedFeedback(sc, bkt) + ndv := defaultNdv + sumFraction, sumCount, sumNdv, ok := b.mergeFullyContainedFeedback(sc, bkt) if ok && sumFraction > bestFraction { bestFraction = sumFraction count = sumCount / sumFraction + ndv = int64(float64(sumNdv) / sumFraction) } for _, fb := range b.feedback { fraction, ratio := getOverlapFraction(fb, bkt) @@ -602,9 +613,10 @@ func (b *BucketFeedback) refineBucketCount(sc *stmtctx.StatementContext, bkt buc if fraction > bestFraction { bestFraction = fraction count = float64(fb.Count) * ratio + ndv = int64(float64(fb.Ndv) * ratio) } } - return count + return count, ndv } const ( @@ -685,6 +697,7 @@ func mergeBuckets(bkts []bucket, isNewBuckets []bool, totalCount float64) []buck bkts[bktCursor-1].Upper = bkts[i].Upper bkts[bktCursor-1].Count += bkts[i].Count bkts[bktCursor-1].Repeat = bkts[i].Repeat + bkts[bktCursor-1].Ndv += bkts[i].Ndv idCursor++ } else { bkts[bktCursor] = bkts[i] @@ -705,13 +718,13 @@ func splitBuckets(h *Histogram, feedback *QueryFeedback) ([]bucket, []bool, int6 bktFB, ok := bktID2FB[i] // No feedback, just use the original one. if !ok { - buckets = append(buckets, bucket{h.GetLower(i), h.GetUpper(i), h.bucketCount(i), h.Buckets[i].Repeat}) + buckets = append(buckets, bucket{h.GetLower(i), h.GetUpper(i), h.bucketCount(i), h.Buckets[i].Repeat, h.Buckets[i].NDV}) isNewBuckets = append(isNewBuckets, false) continue } // Distribute the total split count to bucket based on number of bucket feedback. newBktNums := splitCount * len(bktFB.feedback) / numTotalFBs - bkts := bktFB.splitBucket(newBktNums, h.TotalRowCount(), float64(h.bucketCount(i))) + bkts := bktFB.splitBucket(newBktNums, h.TotalRowCount(), float64(h.bucketCount(i)), h.Buckets[i].NDV) buckets = append(buckets, bkts...) if len(bkts) == 1 { isNewBuckets = append(isNewBuckets, false) @@ -729,13 +742,26 @@ func splitBuckets(h *Histogram, feedback *QueryFeedback) ([]bucket, []bool, int6 } // UpdateHistogram updates the histogram according buckets. -func UpdateHistogram(h *Histogram, feedback *QueryFeedback) *Histogram { +func UpdateHistogram(h *Histogram, feedback *QueryFeedback, statsVer int) *Histogram { + if statsVer < Version2 { + // If it's the stats we haven't maintain the bucket NDV yet. Reset the ndv. + for i := range feedback.Feedback { + feedback.Feedback[i].Ndv = 0 + } + } buckets, isNewBuckets, totalCount := splitBuckets(h, feedback) buckets = mergeBuckets(buckets, isNewBuckets, float64(totalCount)) hist := buildNewHistogram(h, buckets) // Update the NDV of primary key column. if feedback.Tp == PkType { hist.NDV = int64(hist.TotalRowCount()) + // If we maintained the NDV of bucket. We can also update the total ndv. + } else if feedback.Tp == IndexType && statsVer == 2 { + totNdv := int64(0) + for _, bkt := range buckets { + totNdv += bkt.Ndv + } + hist.NDV = totNdv } return hist } @@ -757,7 +783,7 @@ func buildNewHistogram(h *Histogram, buckets []bucket) *Histogram { hist := NewHistogram(h.ID, h.NDV, h.NullCount, h.LastUpdateVersion, h.Tp, len(buckets), h.TotColSize) preCount := int64(0) for _, bkt := range buckets { - hist.AppendBucket(bkt.Lower, bkt.Upper, bkt.Count+preCount, bkt.Repeat) + hist.AppendBucketWithNDV(bkt.Lower, bkt.Upper, bkt.Count+preCount, bkt.Repeat, bkt.Ndv) preCount += bkt.Count } return hist @@ -776,6 +802,8 @@ type queryFeedback struct { // After that, it stores the Ranges for `HashValues`. Counts []int64 ColumnRanges [][]byte + + Ndvs []int64 } func encodePKFeedback(q *QueryFeedback) (*queryFeedback, error) { @@ -795,6 +823,7 @@ func encodePKFeedback(q *QueryFeedback) (*queryFeedback, error) { } pb.IntRanges = append(pb.IntRanges, low, high) pb.Counts = append(pb.Counts, fb.Count) + pb.Ndvs = append(pb.Ndvs, fb.Ndv) } return pb, nil } @@ -806,9 +835,11 @@ func encodeIndexFeedback(q *QueryFeedback) *queryFeedback { if bytes.Compare(kv.Key(fb.Lower.GetBytes()).PrefixNext(), fb.Upper.GetBytes()) >= 0 { pb.IndexPoints = append(pb.IndexPoints, fb.Lower.GetBytes()) pointCounts = append(pointCounts, fb.Count) + pb.Ndvs = append(pb.Ndvs, fb.Ndv) } else { pb.IndexRanges = append(pb.IndexRanges, fb.Lower.GetBytes(), fb.Upper.GetBytes()) pb.Counts = append(pb.Counts, fb.Count) + pb.Ndvs = append(pb.Ndvs, fb.Ndv) } } pb.Counts = append(pb.Counts, pointCounts...) @@ -859,7 +890,7 @@ func decodeFeedbackForIndex(q *QueryFeedback, pb *queryFeedback, c *CMSketch, t // decode the index range feedback for i := 0; i < len(pb.IndexRanges); i += 2 { lower, upper := types.NewBytesDatum(pb.IndexRanges[i]), types.NewBytesDatum(pb.IndexRanges[i+1]) - q.Feedback = append(q.Feedback, Feedback{&lower, &upper, pb.Counts[i/2], 0}) + q.Feedback = append(q.Feedback, Feedback{&lower, &upper, pb.Counts[i/2], 0, pb.Ndvs[i/2]}) } if c != nil { // decode the index point feedback, just set value count in CM Sketch @@ -888,7 +919,7 @@ func decodeFeedbackForPK(q *QueryFeedback, pb *queryFeedback, isUnsigned bool) { lower.SetInt64(pb.IntRanges[i]) upper.SetInt64(pb.IntRanges[i+1]) } - q.Feedback = append(q.Feedback, Feedback{&lower, &upper, pb.Counts[i/2], 0}) + q.Feedback = append(q.Feedback, Feedback{&lower, &upper, pb.Counts[i/2], 0, pb.Ndvs[i/2]}) } } @@ -927,7 +958,7 @@ func decodeFeedbackForColumn(q *QueryFeedback, pb *queryFeedback, ft *types.Fiel if err != nil { return err } - q.Feedback = append(q.Feedback, Feedback{&low[0], &high[0], pb.Counts[i/2], 0}) + q.Feedback = append(q.Feedback, Feedback{&low[0], &high[0], pb.Counts[i/2], 0, 0}) } return nil } diff --git a/statistics/feedback_test.go b/statistics/feedback_test.go index 37cd53c9746d2..12b87ffa57c3f 100644 --- a/statistics/feedback_test.go +++ b/statistics/feedback_test.go @@ -17,9 +17,11 @@ import ( "bytes" . "github.com/pingcap/check" + "github.com/pingcap/log" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/codec" + "go.uber.org/zap" ) var _ = Suite(&testFeedbackSuite{}) @@ -27,22 +29,22 @@ var _ = Suite(&testFeedbackSuite{}) type testFeedbackSuite struct { } -func newFeedback(lower, upper, count int64) Feedback { +func newFeedback(lower, upper, count, ndv int64) Feedback { low, upp := types.NewIntDatum(lower), types.NewIntDatum(upper) - return Feedback{&low, &upp, count, 0} + return Feedback{&low, &upp, count, 0, ndv} } func genFeedbacks(lower, upper int64) []Feedback { var feedbacks []Feedback for i := lower; i < upper; i++ { - feedbacks = append(feedbacks, newFeedback(i, upper, upper-i+1)) + feedbacks = append(feedbacks, newFeedback(i, upper, upper-i+1, upper-i+1)) } return feedbacks } func appendBucket(h *Histogram, l, r int64) { lower, upper := types.NewIntDatum(l), types.NewIntDatum(r) - h.AppendBucket(&lower, &upper, 0, 0) + h.AppendBucketWithNDV(&lower, &upper, 0, 0, 0) } func genHistogram() *Histogram { @@ -57,11 +59,11 @@ func genHistogram() *Histogram { func (s *testFeedbackSuite) TestUpdateHistogram(c *C) { feedbacks := []Feedback{ - newFeedback(0, 1, 10000), - newFeedback(1, 2, 1), - newFeedback(2, 3, 3), - newFeedback(4, 5, 2), - newFeedback(5, 7, 4), + newFeedback(0, 1, 10000, 1), + newFeedback(1, 2, 1, 1), + newFeedback(2, 3, 3, 1), + newFeedback(4, 5, 2, 1), + newFeedback(5, 7, 4, 1), } feedbacks = append(feedbacks, genFeedbacks(8, 20)...) feedbacks = append(feedbacks, genFeedbacks(21, 60)...) @@ -71,52 +73,66 @@ func (s *testFeedbackSuite) TestUpdateHistogram(c *C) { originBucketCount := defaultBucketCount defaultBucketCount = 7 defer func() { defaultBucketCount = originBucketCount }() - c.Assert(UpdateHistogram(q.Hist, q).ToString(0), Equals, + c.Assert(UpdateHistogram(q.Hist, q, Version2).ToString(0), Equals, "column:0 ndv:10053 totColSize:0\n"+ - "num: 10001 lower_bound: 0 upper_bound: 2 repeats: 0\n"+ - "num: 7 lower_bound: 2 upper_bound: 5 repeats: 0\n"+ - "num: 4 lower_bound: 5 upper_bound: 7 repeats: 0\n"+ - "num: 11 lower_bound: 10 upper_bound: 20 repeats: 0\n"+ - "num: 19 lower_bound: 30 upper_bound: 49 repeats: 0\n"+ - "num: 11 lower_bound: 50 upper_bound: 60 repeats: 0") + "num: 10001 lower_bound: 0 upper_bound: 2 repeats: 0 ndv: 2\n"+ + "num: 7 lower_bound: 2 upper_bound: 5 repeats: 0 ndv: 2\n"+ + "num: 4 lower_bound: 5 upper_bound: 7 repeats: 0 ndv: 1\n"+ + "num: 11 lower_bound: 10 upper_bound: 20 repeats: 0 ndv: 11\n"+ + "num: 19 lower_bound: 30 upper_bound: 49 repeats: 0 ndv: 19\n"+ + "num: 11 lower_bound: 50 upper_bound: 60 repeats: 0 ndv: 11") } func (s *testFeedbackSuite) TestSplitBuckets(c *C) { // test bucket split - feedbacks := []Feedback{newFeedback(0, 1, 1)} + feedbacks := []Feedback{newFeedback(0, 1, 1, 1)} for i := 0; i < 100; i++ { - feedbacks = append(feedbacks, newFeedback(10, 15, 5)) + feedbacks = append(feedbacks, newFeedback(10, 15, 5, 5)) } q := NewQueryFeedback(0, genHistogram(), 0, false) q.Feedback = feedbacks + oldCnts := make([]int64, q.Hist.Len()) + for i := range q.Hist.Buckets { + oldCnts[i] = q.Hist.bucketCount(i) + } + oldNdvs := make([]int64, q.Hist.Len()) + for i := range q.Hist.Buckets { + oldNdvs[i] = q.Hist.Buckets[i].NDV + } + log.Warn("in test", zap.Int64s("ndvs", oldNdvs), zap.Int64s("cnts", oldCnts)) buckets, isNewBuckets, totalCount := splitBuckets(q.Hist, q) + ndvs := make([]int64, len(buckets)) + for i := range buckets { + ndvs[i] = buckets[i].Ndv + } + log.Warn("in test", zap.Int64s("ndvs", ndvs)) c.Assert(buildNewHistogram(q.Hist, buckets).ToString(0), Equals, "column:0 ndv:0 totColSize:0\n"+ - "num: 1 lower_bound: 0 upper_bound: 1 repeats: 0\n"+ - "num: 0 lower_bound: 2 upper_bound: 3 repeats: 0\n"+ - "num: 0 lower_bound: 5 upper_bound: 7 repeats: 0\n"+ - "num: 5 lower_bound: 10 upper_bound: 15 repeats: 0\n"+ - "num: 0 lower_bound: 16 upper_bound: 20 repeats: 0\n"+ - "num: 0 lower_bound: 30 upper_bound: 50 repeats: 0") + "num: 1 lower_bound: 0 upper_bound: 1 repeats: 0 ndv: 1\n"+ + "num: 0 lower_bound: 2 upper_bound: 3 repeats: 0 ndv: 0\n"+ + "num: 0 lower_bound: 5 upper_bound: 7 repeats: 0 ndv: 0\n"+ + "num: 5 lower_bound: 10 upper_bound: 15 repeats: 0 ndv: 5\n"+ + "num: 0 lower_bound: 16 upper_bound: 20 repeats: 0 ndv: 0\n"+ + "num: 0 lower_bound: 30 upper_bound: 50 repeats: 0 ndv: 0") c.Assert(isNewBuckets, DeepEquals, []bool{false, false, false, true, true, false}) c.Assert(totalCount, Equals, int64(6)) // test do not split if the bucket count is too small - feedbacks = []Feedback{newFeedback(0, 1, 100000)} + feedbacks = []Feedback{newFeedback(0, 1, 100000, 1)} for i := 0; i < 100; i++ { - feedbacks = append(feedbacks, newFeedback(10, 15, 1)) + feedbacks = append(feedbacks, newFeedback(10, 15, 1, 1)) } q = NewQueryFeedback(0, genHistogram(), 0, false) q.Feedback = feedbacks buckets, isNewBuckets, totalCount = splitBuckets(q.Hist, q) c.Assert(buildNewHistogram(q.Hist, buckets).ToString(0), Equals, "column:0 ndv:0 totColSize:0\n"+ - "num: 100000 lower_bound: 0 upper_bound: 1 repeats: 0\n"+ - "num: 0 lower_bound: 2 upper_bound: 3 repeats: 0\n"+ - "num: 0 lower_bound: 5 upper_bound: 7 repeats: 0\n"+ - "num: 1 lower_bound: 10 upper_bound: 15 repeats: 0\n"+ - "num: 0 lower_bound: 16 upper_bound: 20 repeats: 0\n"+ - "num: 0 lower_bound: 30 upper_bound: 50 repeats: 0") + "num: 100000 lower_bound: 0 upper_bound: 1 repeats: 0 ndv: 1\n"+ + "num: 0 lower_bound: 2 upper_bound: 3 repeats: 0 ndv: 0\n"+ + "num: 0 lower_bound: 5 upper_bound: 7 repeats: 0 ndv: 0\n"+ + "num: 1 lower_bound: 10 upper_bound: 15 repeats: 0 ndv: 1\n"+ + "num: 0 lower_bound: 16 upper_bound: 20 repeats: 0 ndv: 0\n"+ + "num: 0 lower_bound: 30 upper_bound: 50 repeats: 0 ndv: 0") c.Assert(isNewBuckets, DeepEquals, []bool{false, false, false, true, true, false}) c.Assert(totalCount, Equals, int64(100001)) @@ -124,16 +140,17 @@ func (s *testFeedbackSuite) TestSplitBuckets(c *C) { h := NewHistogram(0, 0, 0, 0, types.NewFieldType(mysql.TypeLong), 5, 0) appendBucket(h, 0, 1000000) h.Buckets[0].Count = 1000000 + h.Buckets[0].NDV = 1000000 feedbacks = feedbacks[:0] for i := 0; i < 100; i++ { - feedbacks = append(feedbacks, newFeedback(0, 10, 1)) + feedbacks = append(feedbacks, newFeedback(0, 10, 1, 1)) } q = NewQueryFeedback(0, h, 0, false) q.Feedback = feedbacks buckets, isNewBuckets, totalCount = splitBuckets(q.Hist, q) c.Assert(buildNewHistogram(q.Hist, buckets).ToString(0), Equals, "column:0 ndv:0 totColSize:0\n"+ - "num: 1000000 lower_bound: 0 upper_bound: 1000000 repeats: 0") + "num: 1000000 lower_bound: 0 upper_bound: 1000000 repeats: 0 ndv: 1000000") c.Assert(isNewBuckets, DeepEquals, []bool{false}) c.Assert(totalCount, Equals, int64(1000000)) @@ -142,15 +159,15 @@ func (s *testFeedbackSuite) TestSplitBuckets(c *C) { appendBucket(h, 0, 1000000) feedbacks = feedbacks[:0] for i := 0; i < 100; i++ { - feedbacks = append(feedbacks, newFeedback(0, 10, 1)) + feedbacks = append(feedbacks, newFeedback(0, 10, 1, 1)) } q = NewQueryFeedback(0, h, 0, false) q.Feedback = feedbacks buckets, isNewBuckets, totalCount = splitBuckets(q.Hist, q) c.Assert(buildNewHistogram(q.Hist, buckets).ToString(0), Equals, "column:0 ndv:0 totColSize:0\n"+ - "num: 1 lower_bound: 0 upper_bound: 10 repeats: 0\n"+ - "num: 0 lower_bound: 11 upper_bound: 1000000 repeats: 0") + "num: 1 lower_bound: 0 upper_bound: 10 repeats: 0 ndv: 1\n"+ + "num: 0 lower_bound: 11 upper_bound: 1000000 repeats: 0 ndv: 0") c.Assert(isNewBuckets, DeepEquals, []bool{true, true}) c.Assert(totalCount, Equals, int64(1)) @@ -158,14 +175,14 @@ func (s *testFeedbackSuite) TestSplitBuckets(c *C) { h = NewHistogram(0, 0, 0, 0, types.NewFieldType(mysql.TypeLong), 5, 0) appendBucket(h, 0, 10000) feedbacks = feedbacks[:0] - feedbacks = append(feedbacks, newFeedback(0, 4000, 4000)) - feedbacks = append(feedbacks, newFeedback(4001, 9999, 1000)) + feedbacks = append(feedbacks, newFeedback(0, 4000, 4000, 4000)) + feedbacks = append(feedbacks, newFeedback(4001, 9999, 1000, 1000)) q = NewQueryFeedback(0, h, 0, false) q.Feedback = feedbacks buckets, isNewBuckets, totalCount = splitBuckets(q.Hist, q) c.Assert(buildNewHistogram(q.Hist, buckets).ToString(0), Equals, "column:0 ndv:0 totColSize:0\n"+ - "num: 5001 lower_bound: 0 upper_bound: 10000 repeats: 0") + "num: 5001 lower_bound: 0 upper_bound: 10000 repeats: 0 ndv: 5001") c.Assert(isNewBuckets, DeepEquals, []bool{false}) c.Assert(totalCount, Equals, int64(5001)) } @@ -176,6 +193,7 @@ func (s *testFeedbackSuite) TestMergeBuckets(c *C) { tests := []struct { points []int64 counts []int64 + ndvs []int64 isNewBuckets []bool bucketCount int result string @@ -183,37 +201,43 @@ func (s *testFeedbackSuite) TestMergeBuckets(c *C) { { points: []int64{1, 2}, counts: []int64{1}, + ndvs: []int64{1}, isNewBuckets: []bool{false}, bucketCount: 1, - result: "column:0 ndv:0 totColSize:0\nnum: 1 lower_bound: 1 upper_bound: 2 repeats: 0", + result: "column:0 ndv:0 totColSize:0\nnum: 1 lower_bound: 1 upper_bound: 2 repeats: 0 ndv: 1", }, { points: []int64{1, 2, 2, 3, 3, 4}, counts: []int64{100000, 1, 1}, + ndvs: []int64{1, 1, 1}, isNewBuckets: []bool{false, false, false}, bucketCount: 2, result: "column:0 ndv:0 totColSize:0\n" + - "num: 100000 lower_bound: 1 upper_bound: 2 repeats: 0\n" + - "num: 2 lower_bound: 2 upper_bound: 4 repeats: 0", + "num: 100000 lower_bound: 1 upper_bound: 2 repeats: 0 ndv: 1\n" + + "num: 2 lower_bound: 2 upper_bound: 4 repeats: 0 ndv: 2", }, // test do not Merge if the result bucket count is too large { points: []int64{1, 2, 2, 3, 3, 4, 4, 5}, counts: []int64{1, 1, 100000, 100000}, + ndvs: []int64{1, 1, 1, 1}, isNewBuckets: []bool{false, false, false, false}, bucketCount: 3, result: "column:0 ndv:0 totColSize:0\n" + - "num: 2 lower_bound: 1 upper_bound: 3 repeats: 0\n" + - "num: 100000 lower_bound: 3 upper_bound: 4 repeats: 0\n" + - "num: 100000 lower_bound: 4 upper_bound: 5 repeats: 0", + "num: 2 lower_bound: 1 upper_bound: 3 repeats: 0 ndv: 2\n" + + "num: 100000 lower_bound: 3 upper_bound: 4 repeats: 0 ndv: 1\n" + + "num: 100000 lower_bound: 4 upper_bound: 5 repeats: 0 ndv: 1", }, } for _, t := range tests { + if len(t.counts) != len(t.ndvs) { + c.Assert(false, IsTrue) + } bkts := make([]bucket, 0, len(t.counts)) totalCount := int64(0) for i := 0; i < len(t.counts); i++ { lower, upper := types.NewIntDatum(t.points[2*i]), types.NewIntDatum(t.points[2*i+1]) - bkts = append(bkts, bucket{&lower, &upper, t.counts[i], 0}) + bkts = append(bkts, bucket{&lower, &upper, t.counts[i], 0, t.ndvs[i]}) totalCount += t.counts[i] } defaultBucketCount = t.bucketCount @@ -232,8 +256,8 @@ func encodeInt(v int64) *types.Datum { func (s *testFeedbackSuite) TestFeedbackEncoding(c *C) { hist := NewHistogram(0, 0, 0, 0, types.NewFieldType(mysql.TypeLong), 0, 0) q := &QueryFeedback{Hist: hist, Tp: PkType} - q.Feedback = append(q.Feedback, Feedback{encodeInt(0), encodeInt(3), 1, 0}) - q.Feedback = append(q.Feedback, Feedback{encodeInt(0), encodeInt(5), 1, 0}) + q.Feedback = append(q.Feedback, Feedback{encodeInt(0), encodeInt(3), 1, 0, 1}) + q.Feedback = append(q.Feedback, Feedback{encodeInt(0), encodeInt(5), 1, 0, 1}) val, err := EncodeFeedback(q) c.Assert(err, IsNil) rq := &QueryFeedback{} @@ -246,8 +270,8 @@ func (s *testFeedbackSuite) TestFeedbackEncoding(c *C) { hist.Tp = types.NewFieldType(mysql.TypeBlob) q = &QueryFeedback{Hist: hist} - q.Feedback = append(q.Feedback, Feedback{encodeInt(0), encodeInt(3), 1, 0}) - q.Feedback = append(q.Feedback, Feedback{encodeInt(0), encodeInt(1), 1, 0}) + q.Feedback = append(q.Feedback, Feedback{encodeInt(0), encodeInt(3), 1, 0, 1}) + q.Feedback = append(q.Feedback, Feedback{encodeInt(0), encodeInt(1), 1, 0, 1}) val, err = EncodeFeedback(q) c.Assert(err, IsNil) rq = &QueryFeedback{} @@ -268,6 +292,9 @@ func (q *QueryFeedback) Equal(rq *QueryFeedback) bool { if fb.Count != rfb.Count { return false } + if fb.Ndv != rfb.Ndv { + return false + } if fb.Lower.Kind() == types.KindInt64 { if fb.Lower.GetInt64() != rfb.Lower.GetInt64() { return false diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index dd88e1f7a6289..51c99e093b245 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -718,7 +718,7 @@ func (h *Handle) SaveStatsToStorage(tableID int64, count int64, isIndex int, hg if err != nil { return } - sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_buckets(table_id, is_index, hist_id, bucket_id, count, repeats, lower_bound, upper_bound) values(%d, %d, %d, %d, %d, %d, X'%X', X'%X')", tableID, isIndex, hg.ID, i, count, hg.Buckets[i].Repeat, lowerBound.GetBytes(), upperBound.GetBytes())) + sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_buckets(table_id, is_index, hist_id, bucket_id, count, repeats, lower_bound, upper_bound, ndv) values(%d, %d, %d, %d, %d, %d, X'%X', X'%X', %d)", tableID, isIndex, hg.ID, i, count, hg.Buckets[i].Repeat, lowerBound.GetBytes(), upperBound.GetBytes(), hg.Buckets[i].NDV)) } if isAnalyzed == 1 && len(lastAnalyzePos) > 0 { sqls = append(sqls, fmt.Sprintf("update mysql.stats_histograms set last_analyze_pos = X'%X' where table_id = %d and is_index = %d and hist_id = %d", lastAnalyzePos, tableID, isIndex, hg.ID)) @@ -751,7 +751,7 @@ func (h *Handle) SaveMetaToStorage(tableID, count, modifyCount int64) (err error } func (h *Handle) histogramFromStorage(reader *statsReader, tableID int64, colID int64, tp *types.FieldType, distinct int64, isIndex int, ver uint64, nullCount int64, totColSize int64, corr float64) (_ *statistics.Histogram, err error) { - selSQL := fmt.Sprintf("select count, repeats, lower_bound, upper_bound from mysql.stats_buckets where table_id = %d and is_index = %d and hist_id = %d order by bucket_id", tableID, isIndex, colID) + selSQL := fmt.Sprintf("select count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets where table_id = %d and is_index = %d and hist_id = %d order by bucket_id", tableID, isIndex, colID) rows, fields, err := reader.read(selSQL) if err != nil { return nil, errors.Trace(err) @@ -781,7 +781,7 @@ func (h *Handle) histogramFromStorage(reader *statsReader, tableID int64, colID } } totalCount += count - hg.AppendBucket(&lowerBound, &upperBound, totalCount, repeats) + hg.AppendBucketWithNDV(&lowerBound, &upperBound, totalCount, repeats, rows[i].GetInt64(4)) } hg.PreCalculateScalar() return hg, nil diff --git a/statistics/handle/update.go b/statistics/handle/update.go index dbc06d0595a26..d36e9bad343ac 100644 --- a/statistics/handle/update.go +++ b/statistics/handle/update.go @@ -506,8 +506,17 @@ func (h *Handle) DumpStatsFeedbackToKV() error { err = h.DumpFeedbackToKV(fb) } else { t, ok := h.statsCache.Load().(statsCache).tables[fb.PhysicalID] - if ok { + if !ok { + continue + } + idx, ok := t.Indices[fb.Hist.ID] + if !ok { + continue + } + if idx.StatsVer == statistics.Version1 { err = h.DumpFeedbackForIndex(fb, t) + } else { + err = h.DumpFeedbackToKV(fb) } } if err != nil { @@ -572,7 +581,7 @@ func (h *Handle) UpdateStatsByLocalFeedback(is infoschema.InfoSchema) { ranFB = statistics.CleanRangeFeedbackByTopN(ranFB, idx.TopN) } newIdx.CMSketch, newIdx.TopN = statistics.UpdateCMSketchAndTopN(idx.CMSketch, idx.TopN, eqFB) - newIdx.Histogram = *statistics.UpdateHistogram(&idx.Histogram, &statistics.QueryFeedback{Feedback: ranFB}) + newIdx.Histogram = *statistics.UpdateHistogram(&idx.Histogram, &statistics.QueryFeedback{Feedback: ranFB}, int(idx.StatsVer)) newIdx.Histogram.PreCalculateScalar() newIdx.Flag = statistics.ResetAnalyzeFlag(newIdx.Flag) newTblStats.Indices[fb.Hist.ID] = &newIdx @@ -586,7 +595,7 @@ func (h *Handle) UpdateStatsByLocalFeedback(is infoschema.InfoSchema) { _, ranFB := statistics.SplitFeedbackByQueryType(fb.Feedback) newFB := &statistics.QueryFeedback{Feedback: ranFB} newFB = newFB.DecodeIntValues() - newCol.Histogram = *statistics.UpdateHistogram(&col.Histogram, newFB) + newCol.Histogram = *statistics.UpdateHistogram(&col.Histogram, newFB, statistics.Version1) newCol.Flag = statistics.ResetAnalyzeFlag(newCol.Flag) newTblStats.Columns[fb.Hist.ID] = &newCol } @@ -720,6 +729,7 @@ func (h *Handle) handleSingleHistogramUpdate(is infoschema.InfoSchema, rows []ch idx, ok := tbl.Indices[histID] statsVer = idx.StatsVer if ok && idx.Histogram.Len() > 0 { + statsVer = idx.StatsVer idxHist := idx.Histogram hist = &idxHist cms = idx.CMSketch.Copy() @@ -763,7 +773,7 @@ func (h *Handle) deleteOutdatedFeedback(tableID, histID, isIndex int64) error { } func (h *Handle) dumpStatsUpdateToKV(tableID, isIndex int64, q *statistics.QueryFeedback, hist *statistics.Histogram, cms *statistics.CMSketch, topN *statistics.TopN, statsVersion int64) error { - hist = statistics.UpdateHistogram(hist, q) + hist = statistics.UpdateHistogram(hist, q, int(statsVersion)) err := h.SaveStatsToStorage(tableID, -1, int(isIndex), hist, cms, topN, int(statsVersion), 0) metrics.UpdateStatsCounter.WithLabelValues(metrics.RetLabel(err)).Inc() return errors.Trace(err) diff --git a/statistics/handle/update_test.go b/statistics/handle/update_test.go index 34889cc4aa536..71b50da123995 100644 --- a/statistics/handle/update_test.go +++ b/statistics/handle/update_test.go @@ -772,25 +772,25 @@ func (s *testStatsSuite) TestQueryFeedback(c *C) { // test primary key feedback sql: "select * from t where t.a <= 5 order by a desc", hist: "column:1 ndv:4 totColSize:0\n" + - "num: 1 lower_bound: -9223372036854775808 upper_bound: 2 repeats: 0\n" + - "num: 2 lower_bound: 2 upper_bound: 4 repeats: 0\n" + - "num: 1 lower_bound: 4 upper_bound: 4 repeats: 1", + "num: 1 lower_bound: -9223372036854775808 upper_bound: 2 repeats: 0 ndv: 0\n" + + "num: 2 lower_bound: 2 upper_bound: 4 repeats: 0 ndv: 0\n" + + "num: 1 lower_bound: 4 upper_bound: 4 repeats: 1 ndv: 0", idxCols: 0, }, { // test index feedback by double read sql: "select * from t use index(idx) where t.b <= 5", hist: "index:1 ndv:2\n" + - "num: 3 lower_bound: -inf upper_bound: 5 repeats: 0\n" + - "num: 1 lower_bound: 5 upper_bound: 5 repeats: 1", + "num: 3 lower_bound: -inf upper_bound: 5 repeats: 0 ndv: 0\n" + + "num: 1 lower_bound: 5 upper_bound: 5 repeats: 1 ndv: 0", idxCols: 1, }, { // test index feedback by single read sql: "select b from t use index(idx) where t.b <= 5", hist: "index:1 ndv:2\n" + - "num: 3 lower_bound: -inf upper_bound: 5 repeats: 0\n" + - "num: 1 lower_bound: 5 upper_bound: 5 repeats: 1", + "num: 3 lower_bound: -inf upper_bound: 5 repeats: 0 ndv: 0\n" + + "num: 1 lower_bound: 5 upper_bound: 5 repeats: 1 ndv: 0", idxCols: 1, }, } @@ -892,22 +892,22 @@ func (s *testStatsSuite) TestQueryFeedbackForPartition(c *C) { // test primary key feedback sql: "select * from t where t.a <= 5", hist: "column:1 ndv:2 totColSize:0\n" + - "num: 1 lower_bound: -9223372036854775808 upper_bound: 2 repeats: 0\n" + - "num: 1 lower_bound: 2 upper_bound: 5 repeats: 0", + "num: 1 lower_bound: -9223372036854775808 upper_bound: 2 repeats: 0 ndv: 0\n" + + "num: 1 lower_bound: 2 upper_bound: 5 repeats: 0 ndv: 0", idxCols: 0, }, { // test index feedback by double read sql: "select * from t use index(idx) where t.b <= 5", hist: "index:1 ndv:1\n" + - "num: 2 lower_bound: -inf upper_bound: 6 repeats: 0", + "num: 2 lower_bound: -inf upper_bound: 6 repeats: 0 ndv: 0", idxCols: 1, }, { // test index feedback by single read sql: "select b from t use index(idx) where t.b <= 5", hist: "index:1 ndv:1\n" + - "num: 2 lower_bound: -inf upper_bound: 6 repeats: 0", + "num: 2 lower_bound: -inf upper_bound: 6 repeats: 0 ndv: 0", idxCols: 1, }, } @@ -1029,9 +1029,9 @@ func (s *testStatsSuite) TestUpdateStatsByLocalFeedback(c *C) { tbl := h.GetTableStats(tblInfo) c.Assert(tbl.Columns[tblInfo.Columns[0].ID].ToString(0), Equals, "column:1 ndv:3 totColSize:0\n"+ - "num: 1 lower_bound: 1 upper_bound: 1 repeats: 1\n"+ - "num: 2 lower_bound: 2 upper_bound: 4 repeats: 0\n"+ - "num: 1 lower_bound: 4 upper_bound: 9223372036854775807 repeats: 0") + "num: 1 lower_bound: 1 upper_bound: 1 repeats: 1 ndv: 0\n"+ + "num: 2 lower_bound: 2 upper_bound: 4 repeats: 0 ndv: 0\n"+ + "num: 1 lower_bound: 4 upper_bound: 9223372036854775807 repeats: 0 ndv: 0") sc := &stmtctx.StatementContext{TimeZone: time.Local} low, err := codec.EncodeKey(sc, nil, types.NewIntDatum(5)) c.Assert(err, IsNil) @@ -1039,8 +1039,8 @@ func (s *testStatsSuite) TestUpdateStatsByLocalFeedback(c *C) { c.Assert(tbl.Indices[tblInfo.Indices[0].ID].CMSketch.QueryBytes(low), Equals, uint64(2)) c.Assert(tbl.Indices[tblInfo.Indices[0].ID].ToString(1), Equals, "index:1 ndv:2\n"+ - "num: 2 lower_bound: -inf upper_bound: 5 repeats: 0\n"+ - "num: 1 lower_bound: 5 upper_bound: 5 repeats: 1") + "num: 2 lower_bound: -inf upper_bound: 5 repeats: 0 ndv: 0\n"+ + "num: 1 lower_bound: 5 upper_bound: 5 repeats: 1 ndv: 0") // Test that it won't cause panic after update. testKit.MustQuery("select * from t use index(idx) where b > 0") @@ -1085,9 +1085,9 @@ func (s *testStatsSuite) TestUpdatePartitionStatsByLocalFeedback(c *C) { tbl := h.GetPartitionStats(tblInfo, pid) c.Assert(tbl.Columns[tblInfo.Columns[0].ID].ToString(0), Equals, "column:1 ndv:3 totColSize:0\n"+ - "num: 1 lower_bound: 1 upper_bound: 1 repeats: 1\n"+ - "num: 2 lower_bound: 2 upper_bound: 4 repeats: 0\n"+ - "num: 1 lower_bound: 4 upper_bound: 9223372036854775807 repeats: 0") + "num: 1 lower_bound: 1 upper_bound: 1 repeats: 1 ndv: 0\n"+ + "num: 2 lower_bound: 2 upper_bound: 4 repeats: 0 ndv: 0\n"+ + "num: 1 lower_bound: 4 upper_bound: 9223372036854775807 repeats: 0 ndv: 0") } type logHook struct { @@ -1160,17 +1160,17 @@ func (s *testStatsSuite) TestLogDetailedInfo(c *C) { }{ { sql: "select * from t where t.a <= 15", - result: "[stats-feedback] test.t, column=a, rangeStr=range: [-inf,8), actual: 8, expected: 8, buckets: {num: 8 lower_bound: 0 upper_bound: 7 repeats: 1, num: 8 lower_bound: 8 upper_bound: 15 repeats: 1}" + - "[stats-feedback] test.t, column=a, rangeStr=range: [8,15), actual: 8, expected: 7, buckets: {num: 8 lower_bound: 8 upper_bound: 15 repeats: 1}", + result: "[stats-feedback] test.t, column=a, rangeStr=range: [-inf,8), actual: 8, expected: 8, buckets: {num: 8 lower_bound: 0 upper_bound: 7 repeats: 1 ndv: 0, num: 8 lower_bound: 8 upper_bound: 15 repeats: 1 ndv: 0}" + + "[stats-feedback] test.t, column=a, rangeStr=range: [8,15), actual: 8, expected: 7, buckets: {num: 8 lower_bound: 8 upper_bound: 15 repeats: 1 ndv: 0}", }, { sql: "select * from t use index(idx) where t.b <= 15", - result: "[stats-feedback] test.t, index=idx, rangeStr=range: [-inf,8), actual: 8, expected: 8, histogram: {num: 8 lower_bound: 0 upper_bound: 7 repeats: 1, num: 8 lower_bound: 8 upper_bound: 15 repeats: 1}" + - "[stats-feedback] test.t, index=idx, rangeStr=range: [8,16), actual: 8, expected: 8, histogram: {num: 8 lower_bound: 8 upper_bound: 15 repeats: 1, num: 4 lower_bound: 16 upper_bound: 19 repeats: 1}", + result: "[stats-feedback] test.t, index=idx, rangeStr=range: [-inf,8), actual: 8, expected: 8, histogram: {num: 8 lower_bound: 0 upper_bound: 7 repeats: 1 ndv: 0, num: 8 lower_bound: 8 upper_bound: 15 repeats: 1 ndv: 0}" + + "[stats-feedback] test.t, index=idx, rangeStr=range: [8,16), actual: 8, expected: 8, histogram: {num: 8 lower_bound: 8 upper_bound: 15 repeats: 1 ndv: 0, num: 4 lower_bound: 16 upper_bound: 19 repeats: 1 ndv: 0}", }, { sql: "select b from t use index(idx_ba) where b = 1 and a <= 5", - result: "[stats-feedback] test.t, index=idx_ba, actual=1, equality=1, expected equality=1, range=range: [-inf,6], actual: -1, expected: 6, buckets: {num: 8 lower_bound: 0 upper_bound: 7 repeats: 1}", + result: "[stats-feedback] test.t, index=idx_ba, actual=1, equality=1, expected equality=1, range=range: [-inf,6], actual: -1, expected: 6, buckets: {num: 8 lower_bound: 0 upper_bound: 7 repeats: 1 ndv: 0}", }, { sql: "select b from t use index(idx_bc) where b = 1 and c <= 5", @@ -1526,9 +1526,9 @@ func (s *testStatsSuite) TestAbnormalIndexFeedback(c *C) { // The real count of `a = 1` is 0. sql: "select * from t where a = 1 and b < 21", hist: "column:2 ndv:20 totColSize:20\n" + - "num: 5 lower_bound: -9223372036854775808 upper_bound: 7 repeats: 0\n" + - "num: 4 lower_bound: 7 upper_bound: 14 repeats: 0\n" + - "num: 4 lower_bound: 14 upper_bound: 21 repeats: 0", + "num: 5 lower_bound: -9223372036854775808 upper_bound: 7 repeats: 0 ndv: 0\n" + + "num: 4 lower_bound: 7 upper_bound: 14 repeats: 0 ndv: 0\n" + + "num: 4 lower_bound: 14 upper_bound: 21 repeats: 0 ndv: 0", rangeID: tblInfo.Columns[1].ID, idxID: tblInfo.Indices[0].ID, eqCount: 3, @@ -1537,9 +1537,9 @@ func (s *testStatsSuite) TestAbnormalIndexFeedback(c *C) { // The real count of `b > 10` is 0. sql: "select * from t where a = 2 and b > 10", hist: "column:2 ndv:20 totColSize:20\n" + - "num: 5 lower_bound: -9223372036854775808 upper_bound: 7 repeats: 0\n" + - "num: 4 lower_bound: 7 upper_bound: 14 repeats: 0\n" + - "num: 5 lower_bound: 14 upper_bound: 9223372036854775807 repeats: 0", + "num: 5 lower_bound: -9223372036854775808 upper_bound: 7 repeats: 0 ndv: 0\n" + + "num: 4 lower_bound: 7 upper_bound: 14 repeats: 0 ndv: 0\n" + + "num: 5 lower_bound: 14 upper_bound: 9223372036854775807 repeats: 0 ndv: 0", rangeID: tblInfo.Columns[1].ID, idxID: tblInfo.Indices[0].ID, eqCount: 3, @@ -1597,25 +1597,25 @@ func (s *testStatsSuite) TestFeedbackRanges(c *C) { { sql: "select * from t where a <= 50 or (a > 130 and a < 140)", hist: "column:1 ndv:30 totColSize:0\n" + - "num: 8 lower_bound: -128 upper_bound: 8 repeats: 0\n" + - "num: 8 lower_bound: 8 upper_bound: 16 repeats: 0\n" + - "num: 14 lower_bound: 16 upper_bound: 50 repeats: 0", + "num: 8 lower_bound: -128 upper_bound: 8 repeats: 0 ndv: 0\n" + + "num: 8 lower_bound: 8 upper_bound: 16 repeats: 0 ndv: 0\n" + + "num: 14 lower_bound: 16 upper_bound: 50 repeats: 0 ndv: 0", colID: 1, }, { sql: "select * from t where a >= 10", hist: "column:1 ndv:30 totColSize:0\n" + - "num: 8 lower_bound: -128 upper_bound: 8 repeats: 0\n" + - "num: 8 lower_bound: 8 upper_bound: 16 repeats: 0\n" + - "num: 14 lower_bound: 16 upper_bound: 127 repeats: 0", + "num: 8 lower_bound: -128 upper_bound: 8 repeats: 0 ndv: 0\n" + + "num: 8 lower_bound: 8 upper_bound: 16 repeats: 0 ndv: 0\n" + + "num: 14 lower_bound: 16 upper_bound: 127 repeats: 0 ndv: 0", colID: 1, }, { sql: "select * from t use index(idx) where a = 1 and (b <= 50 or (b > 130 and b < 140))", hist: "column:2 ndv:20 totColSize:30\n" + - "num: 8 lower_bound: -128 upper_bound: 7 repeats: 0\n" + - "num: 8 lower_bound: 7 upper_bound: 14 repeats: 0\n" + - "num: 7 lower_bound: 14 upper_bound: 51 repeats: 0", + "num: 8 lower_bound: -128 upper_bound: 7 repeats: 0 ndv: 0\n" + + "num: 8 lower_bound: 7 upper_bound: 14 repeats: 0 ndv: 0\n" + + "num: 7 lower_bound: 14 upper_bound: 51 repeats: 0 ndv: 0", colID: 2, }, } @@ -1677,33 +1677,33 @@ func (s *testStatsSuite) TestUnsignedFeedbackRanges(c *C) { { sql: "select * from t where a <= 50", hist: "column:1 ndv:30 totColSize:10\n" + - "num: 8 lower_bound: 0 upper_bound: 8 repeats: 0\n" + - "num: 8 lower_bound: 8 upper_bound: 16 repeats: 0\n" + - "num: 14 lower_bound: 16 upper_bound: 50 repeats: 0", + "num: 8 lower_bound: 0 upper_bound: 8 repeats: 0 ndv: 0\n" + + "num: 8 lower_bound: 8 upper_bound: 16 repeats: 0 ndv: 0\n" + + "num: 14 lower_bound: 16 upper_bound: 50 repeats: 0 ndv: 0", tblName: "t", }, { sql: "select count(*) from t", hist: "column:1 ndv:30 totColSize:10\n" + - "num: 8 lower_bound: 0 upper_bound: 8 repeats: 0\n" + - "num: 8 lower_bound: 8 upper_bound: 16 repeats: 0\n" + - "num: 14 lower_bound: 16 upper_bound: 255 repeats: 0", + "num: 8 lower_bound: 0 upper_bound: 8 repeats: 0 ndv: 0\n" + + "num: 8 lower_bound: 8 upper_bound: 16 repeats: 0 ndv: 0\n" + + "num: 14 lower_bound: 16 upper_bound: 255 repeats: 0 ndv: 0", tblName: "t", }, { sql: "select * from t1 where a <= 50", hist: "column:1 ndv:30 totColSize:10\n" + - "num: 8 lower_bound: 0 upper_bound: 8 repeats: 0\n" + - "num: 8 lower_bound: 8 upper_bound: 16 repeats: 0\n" + - "num: 14 lower_bound: 16 upper_bound: 50 repeats: 0", + "num: 8 lower_bound: 0 upper_bound: 8 repeats: 0 ndv: 0\n" + + "num: 8 lower_bound: 8 upper_bound: 16 repeats: 0 ndv: 0\n" + + "num: 14 lower_bound: 16 upper_bound: 50 repeats: 0 ndv: 0", tblName: "t1", }, { sql: "select count(*) from t1", hist: "column:1 ndv:30 totColSize:10\n" + - "num: 8 lower_bound: 0 upper_bound: 8 repeats: 0\n" + - "num: 8 lower_bound: 8 upper_bound: 16 repeats: 0\n" + - "num: 14 lower_bound: 16 upper_bound: 18446744073709551615 repeats: 0", + "num: 8 lower_bound: 0 upper_bound: 8 repeats: 0 ndv: 0\n" + + "num: 8 lower_bound: 8 upper_bound: 16 repeats: 0 ndv: 0\n" + + "num: 14 lower_bound: 16 upper_bound: 18446744073709551615 repeats: 0 ndv: 0", tblName: "t1", }, } diff --git a/statistics/histogram.go b/statistics/histogram.go index 070590c33fc0f..7b71d0116463d 100644 --- a/statistics/histogram.go +++ b/statistics/histogram.go @@ -81,6 +81,7 @@ type Histogram struct { type Bucket struct { Count int64 Repeat int64 + NDV int64 } type scalar struct { @@ -210,16 +211,26 @@ func (c *Column) AvgColSizeListInDisk(count int64) float64 { // AppendBucket appends a bucket into `hg`. func (hg *Histogram) AppendBucket(lower *types.Datum, upper *types.Datum, count, repeat int64) { - hg.Buckets = append(hg.Buckets, Bucket{Count: count, Repeat: repeat}) + hg.AppendBucketWithNDV(lower, upper, count, repeat, 0) +} + +// AppendBucketWithNDV appends a bucket into `hg` and set value for field `NDV`. +func (hg *Histogram) AppendBucketWithNDV(lower *types.Datum, upper *types.Datum, count, repeat, ndv int64) { + hg.Buckets = append(hg.Buckets, Bucket{Count: count, Repeat: repeat, NDV: ndv}) hg.Bounds.AppendDatum(0, lower) hg.Bounds.AppendDatum(0, upper) } -func (hg *Histogram) updateLastBucket(upper *types.Datum, count, repeat int64) { - len := hg.Len() - hg.Bounds.TruncateTo(2*len - 1) +func (hg *Histogram) updateLastBucket(upper *types.Datum, count, repeat int64, needBucketNDV bool) { + l := hg.Len() + hg.Bounds.TruncateTo(2*l - 1) hg.Bounds.AppendDatum(0, upper) - hg.Buckets[len-1] = Bucket{Count: count, Repeat: repeat} + // The sampling case doesn't hold NDV since the low sampling rate. So check the NDV here. + if needBucketNDV && hg.Buckets[l-1].NDV > 0 { + hg.Buckets[l-1].NDV++ + } + hg.Buckets[l-1].Count = count + hg.Buckets[l-1].Repeat = repeat } // DecodeTo decodes the histogram bucket values into `Tp`. @@ -270,6 +281,7 @@ func HistogramEqual(a, b *Histogram, ignoreID bool) bool { } // constants for stats version. These const can be used for solving compatibility issue. +// If the version number is 0, it means the most original statistics. const ( Version0 = 0 // In Version1 @@ -327,7 +339,7 @@ func (hg *Histogram) BucketToString(bktID, idxCols int) string { terror.Log(errors.Trace(err)) lowerVal, err := ValueToString(nil, hg.GetLower(bktID), idxCols, nil) terror.Log(errors.Trace(err)) - return fmt.Sprintf("num: %d lower_bound: %s upper_bound: %s repeats: %d", hg.bucketCount(bktID), lowerVal, upperVal, hg.Buckets[bktID].Repeat) + return fmt.Sprintf("num: %d lower_bound: %s upper_bound: %s repeats: %d ndv: %d", hg.bucketCount(bktID), lowerVal, upperVal, hg.Buckets[bktID].Repeat, hg.Buckets[bktID].NDV) } // RemoveIdxVals remove the given values from the histogram. @@ -346,6 +358,9 @@ func (hg *Histogram) RemoveIdxVals(idxValCntPairs []TopNMeta) { break } totalSubCnt += int64(idxValCntPairs[pairIdx].Count) + if hg.Buckets[bktIdx].NDV > 0 { + hg.Buckets[bktIdx].NDV-- + } pairIdx++ if cmpResult == 0 { hg.Buckets[bktIdx].Repeat = 0 @@ -374,13 +389,16 @@ func (hg *Histogram) ToString(idxCols int) string { } // equalRowCount estimates the row count where the column equals to value. -func (hg *Histogram) equalRowCount(value types.Datum) float64 { +func (hg *Histogram) equalRowCount(value types.Datum, hasBucketNDV bool) float64 { index, match := hg.Bounds.LowerBound(0, &value) // Since we store the lower and upper bound together, if the index is an odd number, then it points to a upper bound. if index%2 == 1 { if match { return float64(hg.Buckets[index/2].Repeat) } + if hasBucketNDV && hg.Buckets[index/2].NDV > 1 { + return float64(hg.bucketCount(index/2)-hg.Buckets[index/2].Repeat) / float64(hg.Buckets[index/2].NDV-1) + } return hg.notNullCount() / float64(hg.NDV) } if match { @@ -388,14 +406,18 @@ func (hg *Histogram) equalRowCount(value types.Datum) float64 { if cmp(hg.Bounds.GetRow(index), 0, hg.Bounds.GetRow(index+1), 0) == 0 { return float64(hg.Buckets[index/2].Repeat) } + if hasBucketNDV && hg.Buckets[index/2].NDV > 1 { + return float64(hg.bucketCount(index/2)-hg.Buckets[index/2].Repeat) / float64(hg.Buckets[index/2].NDV-1) + } return hg.notNullCount() / float64(hg.NDV) } return 0 } // greaterRowCount estimates the row count where the column greater than value. +// It's deprecated. Only used for test. func (hg *Histogram) greaterRowCount(value types.Datum) float64 { - gtCount := hg.notNullCount() - hg.lessRowCount(value) - hg.equalRowCount(value) + gtCount := hg.notNullCount() - hg.lessRowCount(value) - hg.equalRowCount(value, false) return math.Max(0, gtCount) } @@ -481,7 +503,9 @@ func (hg *Histogram) mergeBuckets(bucketIdx int) { curBuck := 0 c := chunk.NewChunkWithCapacity([]*types.FieldType{hg.Tp}, bucketIdx) for i := 0; i+1 <= bucketIdx; i += 2 { - hg.Buckets[curBuck] = hg.Buckets[i+1] + hg.Buckets[curBuck].NDV = hg.Buckets[i+1].NDV + hg.Buckets[i].NDV + hg.Buckets[curBuck].Count = hg.Buckets[i+1].Count + hg.Buckets[curBuck].Repeat = hg.Buckets[i+1].Repeat c.AppendDatum(0, hg.GetLower(i)) c.AppendDatum(0, hg.GetUpper(i+1)) curBuck++ @@ -667,6 +691,7 @@ func HistogramToProto(hg *Histogram) *tipb.Histogram { LowerBound: hg.GetLower(i).GetBytes(), UpperBound: hg.GetUpper(i).GetBytes(), Repeats: hg.Buckets[i].Repeat, + Ndv: &hg.Buckets[i].NDV, } protoHg.Buckets = append(protoHg.Buckets, bkt) } @@ -681,7 +706,11 @@ func HistogramFromProto(protoHg *tipb.Histogram) *Histogram { hg := NewHistogram(0, protoHg.Ndv, 0, 0, tp, len(protoHg.Buckets), 0) for _, bucket := range protoHg.Buckets { lower, upper := types.NewBytesDatum(bucket.LowerBound), types.NewBytesDatum(bucket.UpperBound) - hg.AppendBucket(&lower, &upper, bucket.Count, bucket.Repeats) + if bucket.Ndv != nil { + hg.AppendBucketWithNDV(&lower, &upper, bucket.Count, bucket.Repeats, *bucket.Ndv) + } else { + hg.AppendBucket(&lower, &upper, bucket.Count, bucket.Repeats) + } } return hg } @@ -699,7 +728,7 @@ func (hg *Histogram) IsIndexHist() bool { } // MergeHistograms merges two histograms. -func MergeHistograms(sc *stmtctx.StatementContext, lh *Histogram, rh *Histogram, bucketSize int) (*Histogram, error) { +func MergeHistograms(sc *stmtctx.StatementContext, lh *Histogram, rh *Histogram, bucketSize int, statsVer int) (*Histogram, error) { if lh.Len() == 0 { return rh, nil } @@ -715,7 +744,10 @@ func MergeHistograms(sc *stmtctx.StatementContext, lh *Histogram, rh *Histogram, offset := int64(0) if cmp == 0 { lh.NDV-- - lh.updateLastBucket(rh.GetUpper(0), lh.Buckets[lLen-1].Count+rh.Buckets[0].Count, rh.Buckets[0].Repeat) + if rh.Buckets[0].NDV > 0 { + lh.Buckets[lLen-1].NDV += rh.Buckets[0].NDV - 1 + } + lh.updateLastBucket(rh.GetUpper(0), lh.Buckets[lLen-1].Count+rh.Buckets[0].Count, rh.Buckets[0].Repeat, false) offset = rh.Buckets[0].Count rh.popFirstBucket() } @@ -741,6 +773,10 @@ func MergeHistograms(sc *stmtctx.StatementContext, lh *Histogram, rh *Histogram, rAvg *= 2 } for i := 0; i < rh.Len(); i++ { + if statsVer == Version2 { + lh.AppendBucketWithNDV(rh.GetLower(i), rh.GetUpper(i), rh.Buckets[i].Count+lCount-offset, rh.Buckets[i].Repeat, rh.Buckets[i].NDV) + continue + } lh.AppendBucket(rh.GetLower(i), rh.GetUpper(i), rh.Buckets[i].Count+lCount-offset, rh.Buckets[i].Repeat) } for lh.Len() > bucketSize { @@ -910,7 +946,7 @@ func (c *Column) equalRowCount(sc *stmtctx.StatementContext, val types.Datum, mo count, err := queryValue(sc, c.CMSketch, c.TopN, val) return float64(count), errors.Trace(err) } - return c.Histogram.equalRowCount(val), nil + return c.Histogram.equalRowCount(val, false), nil } // Stats version == 2 // 1. try to find this value in TopN @@ -1078,20 +1114,28 @@ func (idx *Index) MemoryUsage() (sum int64) { var nullKeyBytes, _ = codec.EncodeKey(nil, nil, types.NewDatum(nil)) -func (idx *Index) equalRowCount(sc *stmtctx.StatementContext, b []byte, modifyCount int64) (float64, error) { +func (idx *Index) equalRowCount(b []byte, modifyCount int64) float64 { if len(idx.Info.Columns) == 1 { if bytes.Equal(b, nullKeyBytes) { - return float64(idx.NullCount), nil + return float64(idx.NullCount) } } val := types.NewBytesDatum(b) if idx.NDV > 0 && idx.outOfRange(val) { - return outOfRangeEQSelectivity(idx.NDV, modifyCount, int64(idx.TotalRowCount())) * idx.TotalRowCount(), nil + return outOfRangeEQSelectivity(idx.NDV, modifyCount, int64(idx.TotalRowCount())) * idx.TotalRowCount() } - if idx.CMSketch != nil { - return float64(idx.QueryBytes(b)), nil + if idx.CMSketch != nil && idx.StatsVer < Version2 { + return float64(idx.QueryBytes(b)) } - return idx.Histogram.equalRowCount(val), nil + // If it's version2, query the top-n first. + if idx.StatsVer == Version2 { + count, found := idx.TopN.QueryTopN(b) + if found { + return float64(count) + } + return idx.Histogram.equalRowCount(val, true) + } + return idx.Histogram.equalRowCount(val, false) } // QueryBytes is used to query the count of specified bytes. @@ -1128,10 +1172,7 @@ func (idx *Index) GetRowCount(sc *stmtctx.StatementContext, indexRanges []*range totalCount += 1 continue } - count, err := idx.equalRowCount(sc, lb, modifyCount) - if err != nil { - return 0, err - } + count := idx.equalRowCount(lb, modifyCount) totalCount += count continue } diff --git a/statistics/histogram_test.go b/statistics/histogram_test.go index cd0196501a1d4..b017fe1bcf0f8 100644 --- a/statistics/histogram_test.go +++ b/statistics/histogram_test.go @@ -49,11 +49,11 @@ func (s *testStatisticsSuite) TestNewHistogramBySelectivity(c *C) { node.Ranges = append(node.Ranges, &ranger.Range{LowVal: types.MakeDatums(13), HighVal: types.MakeDatums(13)}) node.Ranges = append(node.Ranges, &ranger.Range{LowVal: types.MakeDatums(25), HighVal: []types.Datum{types.MaxValueDatum()}}) intColResult := `column:1 ndv:16 totColSize:0 -num: 30 lower_bound: 0 upper_bound: 2 repeats: 10 -num: 11 lower_bound: 6 upper_bound: 8 repeats: 0 -num: 30 lower_bound: 9 upper_bound: 11 repeats: 0 -num: 1 lower_bound: 12 upper_bound: 14 repeats: 0 -num: 30 lower_bound: 27 upper_bound: 29 repeats: 0` +num: 30 lower_bound: 0 upper_bound: 2 repeats: 10 ndv: 0 +num: 11 lower_bound: 6 upper_bound: 8 repeats: 0 ndv: 0 +num: 30 lower_bound: 9 upper_bound: 11 repeats: 0 ndv: 0 +num: 1 lower_bound: 12 upper_bound: 14 repeats: 0 ndv: 0 +num: 30 lower_bound: 27 upper_bound: 29 repeats: 0 ndv: 0` stringCol := &Column{} stringCol.Histogram = *NewHistogram(2, 15, 30, 0, types.NewFieldType(mysql.TypeString), chunk.InitialCapacity, 0) @@ -82,11 +82,11 @@ num: 30 lower_bound: 27 upper_bound: 29 repeats: 0` node2.Ranges = append(node2.Ranges, &ranger.Range{LowVal: types.MakeDatums("ddd"), HighVal: types.MakeDatums("fff")}) node2.Ranges = append(node2.Ranges, &ranger.Range{LowVal: types.MakeDatums("ggg"), HighVal: []types.Datum{types.MaxValueDatum()}}) stringColResult := `column:2 ndv:9 totColSize:0 -num: 60 lower_bound: a upper_bound: aaaabbbb repeats: 0 -num: 52 lower_bound: bbbb upper_bound: fdsfdsfds repeats: 0 -num: 54 lower_bound: kkkkk upper_bound: ooooo repeats: 0 -num: 60 lower_bound: oooooo upper_bound: sssss repeats: 0 -num: 60 lower_bound: ssssssu upper_bound: yyyyy repeats: 0` +num: 60 lower_bound: a upper_bound: aaaabbbb repeats: 0 ndv: 0 +num: 52 lower_bound: bbbb upper_bound: fdsfdsfds repeats: 0 ndv: 0 +num: 54 lower_bound: kkkkk upper_bound: ooooo repeats: 0 ndv: 0 +num: 60 lower_bound: oooooo upper_bound: sssss repeats: 0 ndv: 0 +num: 60 lower_bound: ssssssu upper_bound: yyyyy repeats: 0 ndv: 0` newColl := coll.NewHistCollBySelectivity(sc, []*StatsNode{node, node2}) c.Assert(newColl.Columns[1].String(), Equals, intColResult) @@ -110,10 +110,10 @@ num: 60 lower_bound: ssssssu upper_bound: yyyyy repeats: 0` node3.Ranges = append(node3.Ranges, &ranger.Range{LowVal: types.MakeDatums(10), HighVal: types.MakeDatums(13)}) idxResult := `index:0 ndv:7 -num: 30 lower_bound: 0 upper_bound: 2 repeats: 10 -num: 30 lower_bound: 3 upper_bound: 5 repeats: 10 -num: 30 lower_bound: 9 upper_bound: 11 repeats: 10 -num: 30 lower_bound: 12 upper_bound: 14 repeats: 10` +num: 30 lower_bound: 0 upper_bound: 2 repeats: 10 ndv: 0 +num: 30 lower_bound: 3 upper_bound: 5 repeats: 10 ndv: 0 +num: 30 lower_bound: 9 upper_bound: 11 repeats: 10 ndv: 0 +num: 30 lower_bound: 12 upper_bound: 14 repeats: 10 ndv: 0` newColl = coll.NewHistCollBySelectivity(sc, []*StatsNode{node3}) c.Assert(newColl.Indices[0].String(), Equals, idxResult) diff --git a/statistics/sample_test.go b/statistics/sample_test.go index 34d3f31117db9..1a9647505b547 100644 --- a/statistics/sample_test.go +++ b/statistics/sample_test.go @@ -60,7 +60,7 @@ func (s *testSampleSuite) TestCollectColumnStats(c *C) { Sc: sc, RecordSet: s.rs, ColLen: 1, - PkBuilder: NewSortedBuilder(sc, 256, 1, types.NewFieldType(mysql.TypeLonglong)), + PkBuilder: NewSortedBuilder(sc, 256, 1, types.NewFieldType(mysql.TypeLonglong), Version2), MaxSampleSize: 10000, MaxBucketSize: 256, MaxFMSketchSize: 1000, diff --git a/statistics/statistics_test.go b/statistics/statistics_test.go index d7d6aa6bb2226..255914c3caf4a 100644 --- a/statistics/statistics_test.go +++ b/statistics/statistics_test.go @@ -180,7 +180,7 @@ func encodeKey(key types.Datum) types.Datum { } func buildPK(sctx sessionctx.Context, numBuckets, id int64, records sqlexec.RecordSet) (int64, *Histogram, error) { - b := NewSortedBuilder(sctx.GetSessionVars().StmtCtx, numBuckets, id, types.NewFieldType(mysql.TypeLonglong)) + b := NewSortedBuilder(sctx.GetSessionVars().StmtCtx, numBuckets, id, types.NewFieldType(mysql.TypeLonglong), Version1) ctx := context.Background() for { req := records.NewChunk() @@ -204,7 +204,7 @@ func buildPK(sctx sessionctx.Context, numBuckets, id int64, records sqlexec.Reco } func buildIndex(sctx sessionctx.Context, numBuckets, id int64, records sqlexec.RecordSet) (int64, *Histogram, *CMSketch, error) { - b := NewSortedBuilder(sctx.GetSessionVars().StmtCtx, numBuckets, id, types.NewFieldType(mysql.TypeBlob)) + b := NewSortedBuilder(sctx.GetSessionVars().StmtCtx, numBuckets, id, types.NewFieldType(mysql.TypeBlob), Version1) cms := NewCMSketch(8, 2048) ctx := context.Background() req := records.NewChunk() @@ -259,7 +259,7 @@ func (s *testStatisticsSuite) TestBuild(c *C) { checkRepeats(c, col) col.PreCalculateScalar() c.Check(col.Len(), Equals, 226) - count := col.equalRowCount(types.NewIntDatum(1000)) + count := col.equalRowCount(types.NewIntDatum(1000), false) c.Check(int(count), Equals, 0) count = col.lessRowCount(types.NewIntDatum(1000)) c.Check(int(count), Equals, 10000) @@ -271,7 +271,7 @@ func (s *testStatisticsSuite) TestBuild(c *C) { c.Check(int(count), Equals, 100000) count = col.greaterRowCount(types.NewIntDatum(200000000)) c.Check(count, Equals, 0.0) - count = col.equalRowCount(types.NewIntDatum(200000000)) + count = col.equalRowCount(types.NewIntDatum(200000000), false) c.Check(count, Equals, 0.0) count = col.BetweenRowCount(types.NewIntDatum(3000), types.NewIntDatum(3500)) c.Check(int(count), Equals, 4994) @@ -324,7 +324,7 @@ func (s *testStatisticsSuite) TestBuild(c *C) { checkRepeats(c, col) col.PreCalculateScalar() c.Check(int(tblCount), Equals, 100000) - count = col.equalRowCount(encodeKey(types.NewIntDatum(10000))) + count = col.equalRowCount(encodeKey(types.NewIntDatum(10000)), false) c.Check(int(count), Equals, 1) count = col.lessRowCount(encodeKey(types.NewIntDatum(20000))) c.Check(int(count), Equals, 19999) @@ -341,7 +341,7 @@ func (s *testStatisticsSuite) TestBuild(c *C) { checkRepeats(c, col) col.PreCalculateScalar() c.Check(int(tblCount), Equals, 100000) - count = col.equalRowCount(types.NewIntDatum(10000)) + count = col.equalRowCount(types.NewIntDatum(10000), false) c.Check(int(count), Equals, 1) count = col.lessRowCount(types.NewIntDatum(20000)) c.Check(int(count), Equals, 20000) @@ -427,7 +427,7 @@ func (s *testStatisticsSuite) TestMergeHistogram(c *C) { for _, t := range tests { lh := mockHistogram(t.leftLower, t.leftNum) rh := mockHistogram(t.rightLower, t.rightNum) - h, err := MergeHistograms(sc, lh, rh, bucketCount) + h, err := MergeHistograms(sc, lh, rh, bucketCount, Version1) c.Assert(err, IsNil) c.Assert(h.NDV, Equals, t.ndv) c.Assert(h.Len(), Equals, t.bucketNum) diff --git a/statistics/table.go b/statistics/table.go index 220d7dc0e162c..91884a9f604c0 100644 --- a/statistics/table.go +++ b/statistics/table.go @@ -326,7 +326,7 @@ func (coll *HistColl) GetRowCountByIndexRanges(sc *stmtctx.StatementContext, idx } var result float64 var err error - if idx.CMSketch != nil && idx.StatsVer != Version0 { + if idx.CMSketch != nil && idx.StatsVer == Version1 { result, err = coll.getIndexRowCount(sc, idxID, indexRanges) } else { result, err = idx.GetRowCount(sc, indexRanges, coll.ModifyCount) diff --git a/statistics/testdata/stats_suite_out.json b/statistics/testdata/stats_suite_out.json index 624f8b2a65a10..2a9895c8cc238 100644 --- a/statistics/testdata/stats_suite_out.json +++ b/statistics/testdata/stats_suite_out.json @@ -92,8 +92,8 @@ "Name": "TestCollationColumnEstimate", "Cases": [ [ - "test t a 0 0 2 2 \u0000A\u0000A\u0000A \u0000A\u0000A\u0000A", - "test t a 0 1 4 2 \u0000B\u0000B\u0000B \u0000B\u0000B\u0000B" + "test t a 0 0 2 2 \u0000A\u0000A\u0000A \u0000A\u0000A\u0000A 0", + "test t a 0 1 4 2 \u0000B\u0000B\u0000B \u0000B\u0000B\u0000B 0" ], [ "TableReader_7 2.00 root data:Selection_6", diff --git a/store/mockstore/mocktikv/analyze.go b/store/mockstore/mocktikv/analyze.go index fa0d9384694c8..a575f5536015d 100644 --- a/store/mockstore/mocktikv/analyze.go +++ b/store/mockstore/mocktikv/analyze.go @@ -81,7 +81,7 @@ func (h *rpcHandler) handleAnalyzeIndexReq(req *coprocessor.Request, analyzeReq execDetail: new(execDetail), hdStatus: tablecodec.HandleNotNeeded, } - statsBuilder := statistics.NewSortedBuilder(flagsToStatementContext(analyzeReq.Flags), analyzeReq.IdxReq.BucketSize, 0, types.NewFieldType(mysql.TypeBlob)) + statsBuilder := statistics.NewSortedBuilder(flagsToStatementContext(analyzeReq.Flags), analyzeReq.IdxReq.BucketSize, 0, types.NewFieldType(mysql.TypeBlob), statistics.Version1) var cms *statistics.CMSketch if analyzeReq.IdxReq.CmsketchDepth != nil && analyzeReq.IdxReq.CmsketchWidth != nil { cms = statistics.NewCMSketch(*analyzeReq.IdxReq.CmsketchDepth, *analyzeReq.IdxReq.CmsketchWidth) @@ -212,7 +212,7 @@ func (h *rpcHandler) handleAnalyzeColumnsReq(req *coprocessor.Request, analyzeRe ColsFieldType: fts, } if pkID != -1 { - builder.PkBuilder = statistics.NewSortedBuilder(sc, builder.MaxBucketSize, pkID, types.NewFieldType(mysql.TypeBlob)) + builder.PkBuilder = statistics.NewSortedBuilder(sc, builder.MaxBucketSize, pkID, types.NewFieldType(mysql.TypeBlob), statistics.Version1) } if colReq.CmsketchWidth != nil && colReq.CmsketchDepth != nil { builder.CMSketchWidth = *colReq.CmsketchWidth diff --git a/store/mockstore/unistore/cophandler/analyze.go b/store/mockstore/unistore/cophandler/analyze.go index 329335a70af5e..f2e980023163b 100644 --- a/store/mockstore/unistore/cophandler/analyze.go +++ b/store/mockstore/unistore/cophandler/analyze.go @@ -83,7 +83,7 @@ func handleAnalyzeIndexReq(dbReader *dbreader.DBReader, rans []kv.KeyRange, anal } processor := &analyzeIndexProcessor{ colLen: int(analyzeReq.IdxReq.NumColumns), - statsBuilder: statistics.NewSortedBuilder(flagsToStatementContext(analyzeReq.Flags), analyzeReq.IdxReq.BucketSize, 0, types.NewFieldType(mysql.TypeBlob)), + statsBuilder: statistics.NewSortedBuilder(flagsToStatementContext(analyzeReq.Flags), analyzeReq.IdxReq.BucketSize, 0, types.NewFieldType(mysql.TypeBlob), int(statsVer)), statsVer: statsVer, } if analyzeReq.IdxReq.TopNSize != nil { @@ -133,9 +133,13 @@ func handleAnalyzeIndexReq(dbReader *dbreader.DBReader, rans []kv.KeyRange, anal } func handleAnalyzeCommonHandleReq(dbReader *dbreader.DBReader, rans []kv.KeyRange, analyzeReq *tipb.AnalyzeReq, startTS uint64) (*coprocessor.Response, error) { + statsVer := statistics.Version1 + if analyzeReq.IdxReq.Version != nil { + statsVer = int(*analyzeReq.IdxReq.Version) + } processor := &analyzeCommonHandleProcessor{ colLen: int(analyzeReq.IdxReq.NumColumns), - statsBuilder: statistics.NewSortedBuilder(flagsToStatementContext(analyzeReq.Flags), analyzeReq.IdxReq.BucketSize, 0, types.NewFieldType(mysql.TypeBlob)), + statsBuilder: statistics.NewSortedBuilder(flagsToStatementContext(analyzeReq.Flags), analyzeReq.IdxReq.BucketSize, 0, types.NewFieldType(mysql.TypeBlob), statsVer), } if analyzeReq.IdxReq.CmsketchDepth != nil && analyzeReq.IdxReq.CmsketchWidth != nil { processor.cms = statistics.NewCMSketch(*analyzeReq.IdxReq.CmsketchDepth, *analyzeReq.IdxReq.CmsketchWidth) @@ -308,8 +312,12 @@ func handleAnalyzeColumnsReq(dbReader *dbreader.DBReader, rans []kv.KeyRange, an Collators: collators, ColsFieldType: fts, } + statsVer := statistics.Version1 + if analyzeReq.ColReq.Version != nil { + statsVer = int(*analyzeReq.ColReq.Version) + } if pkID != -1 { - builder.PkBuilder = statistics.NewSortedBuilder(sc, builder.MaxBucketSize, pkID, types.NewFieldType(mysql.TypeBlob)) + builder.PkBuilder = statistics.NewSortedBuilder(sc, builder.MaxBucketSize, pkID, types.NewFieldType(mysql.TypeBlob), statsVer) } if colReq.CmsketchWidth != nil && colReq.CmsketchDepth != nil { builder.CMSketchWidth = *colReq.CmsketchWidth diff --git a/store/mockstore/unistore/cophandler/closure_exec.go b/store/mockstore/unistore/cophandler/closure_exec.go index 144ecbd1cf22e..58e20d393a63c 100644 --- a/store/mockstore/unistore/cophandler/closure_exec.go +++ b/store/mockstore/unistore/cophandler/closure_exec.go @@ -296,6 +296,10 @@ func newClosureExecutor(dagCtx *dagContext, outputOffsets []uint32, scanExec *ti e.unique = idxScan.GetUnique() e.scanCtx.desc = idxScan.Desc e.initIdxScanCtx(idxScan) + if collectRangeCounts { + e.idxScanCtx.collectNDV = true + e.idxScanCtx.prevVals = make([][]byte, e.idxScanCtx.columnLen) + } e.scanType = IndexScan case tipb.ExecType_TypeExchangeReceiver: dagCtx.fillColumnInfo(scanExec.ExchangeReceiver.FieldTypes) @@ -320,6 +324,7 @@ func newClosureExecutor(dagCtx *dagContext, outputOffsets []uint32, scanExec *ti } if collectRangeCounts { e.counts = make([]int64, len(ranges)) + e.ndvs = make([]int64, len(ranges)) } e.kvRanges = ranges e.scanCtx.chk = chunk.NewChunkWithCapacity(e.fieldTps, 32) @@ -603,6 +608,8 @@ type closureExecutor struct { processor closureProcessor counts []int64 + ndvs []int64 + curNdv int64 } func pbChunkToChunk(pbChk tipb.Chunk, chk *chunk.Chunk, fieldTypes []*types.FieldType) error { @@ -837,6 +844,8 @@ type idxScanCtx struct { colInfos []rowcodec.ColInfo primaryColumnIds []int64 execDetail *execDetail + collectNDV bool + prevVals [][]byte } type aggCtx struct { @@ -922,6 +931,7 @@ func (e *closureExecutor) execute() ([]tipb.Chunk, error) { } dbReader := e.dbReader for i, ran := range e.kvRanges { + e.curNdv = 0 if e.isPointGetRange(ran) { val, err := dbReader.Get(ran.StartKey, e.startTS) if err != nil { @@ -932,6 +942,7 @@ func (e *closureExecutor) execute() ([]tipb.Chunk, error) { } if e.counts != nil { e.counts[i]++ + e.ndvs[i] = 1 } err = e.processor.Process(ran.StartKey, val) if err != nil { @@ -947,6 +958,7 @@ func (e *closureExecutor) execute() ([]tipb.Chunk, error) { delta := int64(e.rowCount - oldCnt) if e.counts != nil { e.counts[i] += delta + e.ndvs[i] = e.curNdv } if err != nil { return nil, errors.Trace(err) @@ -1101,6 +1113,7 @@ func (e *tableScanProcessor) Process(key, value []byte) error { return dbreader.ScanBreak } e.rowCount++ + e.curNdv++ err := e.tableScanProcessCore(key, value) if e.scanCtx.chk.NumRows() == chunkMaxRows { err = e.chunkToOldChunk(e.scanCtx.chk) @@ -1252,6 +1265,15 @@ func (e *indexScanProcessor) Finish() error { return e.scanFinish() } +func (isc *idxScanCtx) checkVal(curVals [][]byte) bool { + for i := 0; i < isc.columnLen; i++ { + if !bytes.Equal(isc.prevVals[i], curVals[i]) { + return false + } + } + return true +} + func (e *closureExecutor) indexScanProcessCore(key, value []byte) error { gotRow := false defer func(begin time.Time) { @@ -1268,6 +1290,14 @@ func (e *closureExecutor) indexScanProcessCore(key, value []byte) error { if err != nil { return err } + if e.idxScanCtx.collectNDV { + if len(e.idxScanCtx.prevVals[0]) == 0 || !e.idxScanCtx.checkVal(values) { + e.curNdv++ + for i := 0; i < e.idxScanCtx.columnLen; i++ { + e.idxScanCtx.prevVals[i] = append(e.idxScanCtx.prevVals[i][:0], values[i]...) + } + } + } chk := e.scanCtx.chk decoder := codec.NewDecoder(chk, e.sc.TimeZone) for i, colVal := range values { diff --git a/store/mockstore/unistore/cophandler/cop_handler.go b/store/mockstore/unistore/cophandler/cop_handler.go index 67328f4f8dc43..1355aadb20fb9 100644 --- a/store/mockstore/unistore/cophandler/cop_handler.go +++ b/store/mockstore/unistore/cophandler/cop_handler.go @@ -109,7 +109,7 @@ func handleCopDAGRequest(dbReader *dbreader.DBReader, lockStore *lockstore.MemSt } closureExec, err := buildClosureExecutor(dagCtx, dagReq, mppCtx) if err != nil { - return buildResp(nil, nil, dagReq, err, dagCtx.sc.GetWarnings(), time.Since(startTime)) + return buildResp(nil, nil, nil, dagReq, err, dagCtx.sc.GetWarnings(), time.Since(startTime)) } chunks, err := closureExec.execute() if closureExec.exchangeSenderCtx != nil { @@ -147,7 +147,7 @@ func handleCopDAGRequest(dbReader *dbreader.DBReader, lockStore *lockstore.MemSt } return nil } - return buildResp(chunks, closureExec, dagReq, err, dagCtx.sc.GetWarnings(), time.Since(startTime)) + return buildResp(chunks, closureExec, closureExec.ndvs, dagReq, err, dagCtx.sc.GetWarnings(), time.Since(startTime)) } func buildDAG(reader *dbreader.DBReader, lockStore *lockstore.MemStore, req *coprocessor.Request) (*dagContext, *tipb.DAGRequest, error) { @@ -352,7 +352,7 @@ func (e *ErrLocked) Error() string { return fmt.Sprintf("key is locked, key: %q, Type: %v, primary: %q, startTS: %v", e.Key, e.LockType, e.Primary, e.StartTS) } -func buildResp(chunks []tipb.Chunk, closureExecutor *closureExecutor, dagReq *tipb.DAGRequest, err error, warnings []stmtctx.SQLWarn, dur time.Duration) *coprocessor.Response { +func buildResp(chunks []tipb.Chunk, closureExecutor *closureExecutor, ndvs []int64, dagReq *tipb.DAGRequest, err error, warnings []stmtctx.SQLWarn, dur time.Duration) *coprocessor.Response { resp := &coprocessor.Response{} var counts []int64 if closureExecutor != nil { @@ -362,6 +362,7 @@ func buildResp(chunks []tipb.Chunk, closureExecutor *closureExecutor, dagReq *ti Error: toPBError(err), Chunks: chunks, OutputCounts: counts, + Ndvs: ndvs, } executors := dagReq.Executors if dagReq.CollectExecutionSummaries != nil && *dagReq.CollectExecutionSummaries { diff --git a/util/testkit/testkit.go b/util/testkit/testkit.go index a3437f3a27a97..1b52f78549678 100644 --- a/util/testkit/testkit.go +++ b/util/testkit/testkit.go @@ -319,7 +319,7 @@ func (tk *TestKit) ResultSetToResult(rs sqlexec.RecordSet, comment check.Comment // ResultSetToResultWithCtx converts sqlexec.RecordSet to testkit.Result. func (tk *TestKit) ResultSetToResultWithCtx(ctx context.Context, rs sqlexec.RecordSet, comment check.CommentInterface) *Result { sRows, err := session.ResultSetToStringSlice(ctx, tk.Se, rs) - tk.c.Check(err, check.IsNil, comment) + tk.c.Check(errors.ErrorStack(err), check.Equals, "", comment) return &Result{rows: sRows, c: tk.c, comment: comment} }