diff --git a/config/config.go b/config/config.go index 9fa4a4be4695d..bda3c05bda1d9 100644 --- a/config/config.go +++ b/config/config.go @@ -52,6 +52,8 @@ const ( DefMaxIndexLength = 3072 // DefMaxOfMaxIndexLength is the maximum index length(in bytes) for TiDB v3.0.7 and previous version. DefMaxOfMaxIndexLength = 3072 * 4 + // DefMinQuotaStatistics is the minimum statistic memory quota(in bytes). + DefMinQuotaStatistics = 32 << 30 // DefPort is the default port of TiDB DefPort = 4000 // DefStatusPort is the default status port of TiDB @@ -98,6 +100,7 @@ type Config struct { TempStoragePath string `toml:"tmp-storage-path" json:"tmp-storage-path"` OOMAction string `toml:"oom-action" json:"oom-action"` MemQuotaQuery int64 `toml:"mem-quota-query" json:"mem-quota-query"` + MemQuotaStatistics int64 `toml:"mem-quota-statistics" json:"mem-quota-statistics"` NestedLoopJoinCacheCapacity int64 `toml:"nested-loop-join-cache-capacity" json:"nested-loop-join-cache-capacity"` // TempStorageQuota describe the temporary storage Quota during query exector when OOMUseTmpStorage is enabled // If the quota exceed the capacity of the TempStoragePath, the tidb-server would exit with fatal error @@ -614,6 +617,7 @@ var defaultConf = Config{ TempStoragePath: tempStorageDirName, OOMAction: OOMActionCancel, MemQuotaQuery: 1 << 30, + MemQuotaStatistics: 32 << 30, NestedLoopJoinCacheCapacity: 20971520, EnableStreaming: false, EnableBatchDML: false, @@ -952,6 +956,9 @@ func (c *Config) Valid() error { if c.PreparedPlanCache.MemoryGuardRatio < 0 || c.PreparedPlanCache.MemoryGuardRatio > 1 { return fmt.Errorf("memory-guard-ratio in [prepared-plan-cache] must be NOT less than 0 and more than 1") } + if c.MemQuotaStatistics < DefMinQuotaStatistics { + return fmt.Errorf("memory-quota-statistics should be greater than %dB", DefMinQuotaStatistics) + } if len(c.IsolationRead.Engines) < 1 { return fmt.Errorf("the number of [isolation-read]engines for isolation read should be at least 1") } diff --git a/config/config.toml.example b/config/config.toml.example index a178bacf00746..60118e65a31ea 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -34,6 +34,10 @@ token-limit = 1000 # The maximum memory available for a single SQL statement. Default: 1GB mem-quota-query = 1073741824 +# The maximum memory limitation for statistics. Default: 32GB +# This value must not be less than 32GB. +mem-quota-statistics = 34359738368 + # The maximum number available of a NLJ cache for a single SQL statement. Default: 20MB nested-loop-join-cache-capacity = 20971520 diff --git a/config/config_test.go b/config/config_test.go index af1e9c64d81a2..bce88c5cd2cdb 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -188,6 +188,7 @@ server-version = "test_version" repair-mode = true max-server-connections = 200 mem-quota-query = 10000 +mem-quota-statistics = 10000 nested-loop-join-cache-capacity = 100 max-index-length = 3080 skip-register-to-dashboard = true @@ -258,6 +259,7 @@ spilled-file-encryption-method = "plaintext" c.Assert(conf.RepairMode, Equals, true) c.Assert(conf.MaxServerConnections, Equals, uint32(200)) c.Assert(conf.MemQuotaQuery, Equals, int64(10000)) + c.Assert(conf.MemQuotaStatistics, Equals, int64(10000)) c.Assert(conf.NestedLoopJoinCacheCapacity, Equals, int64(100)) c.Assert(conf.IsolationRead.Engines, DeepEquals, []string{"tiflash"}) c.Assert(conf.MaxIndexLength, Equals, 3080) diff --git a/config/config_util_test.go b/config/config_util_test.go index 7972fcf706000..1df11a8c5d5f8 100644 --- a/config/config_util_test.go +++ b/config/config_util_test.go @@ -56,6 +56,7 @@ func (s *testConfigSuite) TestMergeConfigItems(c *C) { newConf.Performance.PseudoEstimateRatio = 123 newConf.OOMAction = "panic" newConf.MemQuotaQuery = 123 + newConf.MemQuotaStatistics = 123 newConf.TiKVClient.StoreLimit = 123 // rejected @@ -66,7 +67,7 @@ func (s *testConfigSuite) TestMergeConfigItems(c *C) { as, rs := MergeConfigItems(oldConf, newConf) c.Assert(len(as), Equals, 10) - c.Assert(len(rs), Equals, 3) + c.Assert(len(rs), Equals, 4) for _, a := range as { _, ok := dynamicConfigItems[a] c.Assert(ok, IsTrue) diff --git a/executor/infoschema_reader_test.go b/executor/infoschema_reader_test.go index 670afa8e8b49d..abbf560a8281e 100644 --- a/executor/infoschema_reader_test.go +++ b/executor/infoschema_reader_test.go @@ -365,7 +365,7 @@ func (s *testInfoschemaTableSerialSuite) TestDataForTableStatsField(c *C) { defer func() { executor.TableStatsCacheExpiry = oldExpiryTime }() do := s.dom h := do.StatsHandle() - h.Clear() + h.Clear4Test() is := do.InfoSchema() tk := testkit.NewTestKit(c, s.store) @@ -414,7 +414,7 @@ func (s *testInfoschemaTableSerialSuite) TestPartitionsTable(c *C) { defer func() { executor.TableStatsCacheExpiry = oldExpiryTime }() do := s.dom h := do.StatsHandle() - h.Clear() + h.Clear4Test() is := do.InfoSchema() tk := testkit.NewTestKit(c, s.store) diff --git a/executor/simple_test.go b/executor/simple_test.go index 926534468bf88..fca62dedef2d7 100644 --- a/executor/simple_test.go +++ b/executor/simple_test.go @@ -564,7 +564,7 @@ func (s *testSuite3) TestDropStats(c *C) { c.Assert(err, IsNil) tableInfo := tbl.Meta() h := do.StatsHandle() - h.Clear() + h.Clear4Test() testKit.MustExec("analyze table t") statsTbl := h.GetTableStats(tableInfo) c.Assert(statsTbl.Pseudo, IsFalse) diff --git a/go.sum b/go.sum index eb8163809e0e0..744fa5476091a 100644 --- a/go.sum +++ b/go.sum @@ -79,7 +79,9 @@ github.com/coocood/bbloom v0.0.0-20190830030839-58deb6228d64 h1:W1SHiII3e0jVwvaQ github.com/coocood/bbloom v0.0.0-20190830030839-58deb6228d64/go.mod h1:F86k/6c7aDUdwSUevnLpHS/3Q9hzYCE99jGk2xsHnt0= github.com/coocood/rtutil v0.0.0-20190304133409-c84515f646f2 h1:NnLfQ77q0G4k2Of2c1ceQ0ec6MkLQyDp+IGdVM0D8XM= github.com/coocood/rtutil v0.0.0-20190304133409-c84515f646f2/go.mod h1:7qG7YFnOALvsx6tKTNmQot8d7cGFXM9TidzvRFLWYwM= +github.com/coreos/bbolt v1.3.2 h1:wZwiHHUieZCquLkDL0B8UhzreNWsPHooDAG3q34zk0s= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= +github.com/coreos/etcd v3.3.10+incompatible h1:jFneRYjIvLMLhDLCzuTuU4rSJUjRplcJQ7pD7MnhC04= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= @@ -566,10 +568,12 @@ github.com/uber/jaeger-lib v2.4.0+incompatible h1:fY7QsGQWiCt8pajv4r7JEvmATdCVaW github.com/uber/jaeger-lib v2.4.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= github.com/ugorji/go v1.1.5-pre/go.mod h1:FwP/aQVg39TXzItUBMwnWp9T9gPQnXw4Poh4/oBQZ/0= +github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo= github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= github.com/ugorji/go/codec v0.0.0-20181022190402-e5e69e061d4f/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/ugorji/go/codec v1.1.5-pre/go.mod h1:tULtS6Gy1AE1yCENaw4Vb//HLH5njI2tfCQDUqRd8fI= +github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= github.com/unrolled/render v0.0.0-20171102162132-65450fb6b2d3/go.mod h1:tu82oB5W2ykJRVioYsB+IQKcft7ryBr7w12qMBUPyXg= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= diff --git a/planner/core/cbo_test.go b/planner/core/cbo_test.go index ee857e8e44881..4394a6e006a38 100644 --- a/planner/core/cbo_test.go +++ b/planner/core/cbo_test.go @@ -496,7 +496,7 @@ func (s *testAnalyzeSuite) TestNullCount(c *C) { testKit.MustQuery(input[i]).Check(testkit.Rows(output[i]...)) } h := dom.StatsHandle() - h.Clear() + h.Clear4Test() c.Assert(h.Update(dom.InfoSchema()), IsNil) for i := 2; i < 4; i++ { s.testData.OnRecord(func() { @@ -552,7 +552,7 @@ func (s *testAnalyzeSuite) TestInconsistentEstimation(c *C) { tk.MustExec("analyze table t with 2 buckets") // Force using the histogram to estimate. tk.MustExec("update mysql.stats_histograms set stats_ver = 0") - dom.StatsHandle().Clear() + dom.StatsHandle().Clear4Test() dom.StatsHandle().Update(dom.InfoSchema()) // Using the histogram (a, b) to estimate `a = 5` will get 1.22, while using the CM Sketch to estimate // the `a = 5 and c = 5` will get 10, it is not consistent. diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index c6a2ae423e8cc..e8bb45331ca02 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -1222,7 +1222,7 @@ func getColumnRangeCounts(sc *stmtctx.StatementContext, colID int64, ranges []*r for i, ran := range ranges { if idxID >= 0 { idxHist := histColl.Indices[idxID] - if idxHist == nil || idxHist.IsInvalid(false) { + if idxHist == nil || idxHist.IsInvalid(sc, false) { return nil, false } count, err = histColl.GetRowCountByIndexRanges(sc, idxID, []*ranger.Range{ran}) diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 9873208a81bd1..498c25e3cd59a 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -16,7 +16,6 @@ package core import ( "bytes" "context" - "encoding/binary" "fmt" "strings" "time" @@ -42,7 +41,7 @@ import ( "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" - driver "github.com/pingcap/tidb/types/parser_driver" + "github.com/pingcap/tidb/types/parser_driver" util2 "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/execdetails" @@ -1688,43 +1687,25 @@ func (b *PlanBuilder) buildAnalyzeAllIndex(as *ast.AnalyzeTableStmt, opts map[as return p, nil } -var cmSketchSizeLimit = kv.TxnEntrySizeLimit / binary.MaxVarintLen32 - -var analyzeOptionLimit = map[ast.AnalyzeOptionType]uint64{ - ast.AnalyzeOptNumBuckets: 1024, - ast.AnalyzeOptNumTopN: 1024, - ast.AnalyzeOptCMSketchWidth: cmSketchSizeLimit, - ast.AnalyzeOptCMSketchDepth: cmSketchSizeLimit, - ast.AnalyzeOptNumSamples: 100000, -} - -var analyzeOptionDefault = map[ast.AnalyzeOptionType]uint64{ - ast.AnalyzeOptNumBuckets: 256, - ast.AnalyzeOptNumTopN: 20, - ast.AnalyzeOptCMSketchWidth: 2048, - ast.AnalyzeOptCMSketchDepth: 5, - ast.AnalyzeOptNumSamples: 10000, -} - func handleAnalyzeOptions(opts []ast.AnalyzeOpt) (map[ast.AnalyzeOptionType]uint64, error) { - optMap := make(map[ast.AnalyzeOptionType]uint64, len(analyzeOptionDefault)) - for key, val := range analyzeOptionDefault { + optMap := make(map[ast.AnalyzeOptionType]uint64, len(statistics.AnalyzeOptionDefault)) + for key, val := range statistics.AnalyzeOptionDefault { optMap[key] = val } for _, opt := range opts { if opt.Type == ast.AnalyzeOptNumTopN { - if opt.Value > analyzeOptionLimit[opt.Type] { - return nil, errors.Errorf("value of analyze option %s should not larger than %d", ast.AnalyzeOptionString[opt.Type], analyzeOptionLimit[opt.Type]) + if opt.Value > statistics.AnalyzeOptionLimit[opt.Type] { + return nil, errors.Errorf("value of analyze option %s should not larger than %d", ast.AnalyzeOptionString[opt.Type], statistics.AnalyzeOptionLimit[opt.Type]) } } else { - if opt.Value == 0 || opt.Value > analyzeOptionLimit[opt.Type] { - return nil, errors.Errorf("value of analyze option %s should be positive and not larger than %d", ast.AnalyzeOptionString[opt.Type], analyzeOptionLimit[opt.Type]) + if opt.Value == 0 || opt.Value > statistics.AnalyzeOptionLimit[opt.Type] { + return nil, errors.Errorf("value of analyze option %s should be positive and not larger than %d", ast.AnalyzeOptionString[opt.Type], statistics.AnalyzeOptionLimit[opt.Type]) } } optMap[opt.Type] = opt.Value } - if optMap[ast.AnalyzeOptCMSketchWidth]*optMap[ast.AnalyzeOptCMSketchDepth] > cmSketchSizeLimit { - return nil, errors.Errorf("cm sketch size(depth * width) should not larger than %d", cmSketchSizeLimit) + if optMap[ast.AnalyzeOptCMSketchWidth]*optMap[ast.AnalyzeOptCMSketchDepth] > statistics.CMSketchSizeLimit { + return nil, errors.Errorf("cm sketch size(depth * width) should not larger than %d", statistics.CMSketchSizeLimit) } return optMap, nil } diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 60abf5bfb176c..b25b8a46d7a86 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -858,6 +858,7 @@ func NewSessionVars() *SessionVars { } vars.MemQuota = MemQuota{ MemQuotaQuery: config.GetGlobalConfig().MemQuotaQuery, + MemQuotaStatistics: config.GetGlobalConfig().MemQuotaStatistics, NestedLoopJoinCacheCapacity: config.GetGlobalConfig().NestedLoopJoinCacheCapacity, // The variables below do not take any effect anymore, it's remaining for compatibility. @@ -1303,6 +1304,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { s.InitChunkSize = tidbOptPositiveInt32(val, DefInitChunkSize) case TIDBMemQuotaQuery: s.MemQuotaQuery = tidbOptInt64(val, config.GetGlobalConfig().MemQuotaQuery) + case TIDBMemQuotaStatistics: + s.MemQuotaStatistics = tidbOptInt64(val, config.GetGlobalConfig().MemQuotaStatistics) case TIDBNestedLoopJoinCacheCapacity: s.NestedLoopJoinCacheCapacity = tidbOptInt64(val, config.GetGlobalConfig().NestedLoopJoinCacheCapacity) case TIDBMemQuotaHashJoin: @@ -1728,7 +1731,8 @@ func (c *Concurrency) UnionConcurrency() int { type MemQuota struct { // MemQuotaQuery defines the memory quota for a query. MemQuotaQuery int64 - + // MemQuotaStatistics defines the memory quota for the statistic Cache. + MemQuotaStatistics int64 // NestedLoopJoinCacheCapacity defines the memory capacity for apply cache. NestedLoopJoinCacheCapacity int64 diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index bb1314269ba05..0ee616108fc3e 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -683,9 +683,11 @@ var defaultSysVars = []*SysVar{ {Scope: ScopeGlobal | ScopeSession, Name: TiDBMaxChunkSize, Value: strconv.Itoa(DefMaxChunkSize)}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBAllowBatchCop, Value: strconv.Itoa(DefTiDBAllowBatchCop)}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBInitChunkSize, Value: strconv.Itoa(DefInitChunkSize)}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableCascadesPlanner, Value: "0", Type: TypeBool}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableIndexMerge, Value: "0", Type: TypeBool}, {Scope: ScopeSession, Name: TIDBMemQuotaQuery, Value: strconv.FormatInt(config.GetGlobalConfig().MemQuotaQuery, 10), Type: TypeInt, MinValue: -1, MaxValue: math.MaxInt64}, + {Scope: ScopeGlobal, Name: TIDBMemQuotaStatistics, Value: strconv.FormatInt(config.GetGlobalConfig().MemQuotaStatistics, 10), Type: TypeInt, MinValue: int64(32 << 30), MaxValue: math.MaxInt64}, {Scope: ScopeSession, Name: TIDBMemQuotaHashJoin, Value: strconv.FormatInt(DefTiDBMemQuotaHashJoin, 10), Type: TypeInt, MinValue: -1, MaxValue: math.MaxInt64}, {Scope: ScopeSession, Name: TIDBMemQuotaMergeJoin, Value: strconv.FormatInt(DefTiDBMemQuotaMergeJoin, 10), Type: TypeInt, MinValue: -1, MaxValue: math.MaxInt64}, {Scope: ScopeSession, Name: TIDBMemQuotaSort, Value: strconv.FormatInt(DefTiDBMemQuotaSort, 10), Type: TypeInt, MinValue: -1, MaxValue: math.MaxInt64}, @@ -695,6 +697,7 @@ var defaultSysVars = []*SysVar{ {Scope: ScopeSession, Name: TIDBMemQuotaNestedLoopApply, Value: strconv.FormatInt(DefTiDBMemQuotaNestedLoopApply, 10), Type: TypeInt, MinValue: -1, MaxValue: math.MaxInt64}, {Scope: ScopeSession, Name: TiDBEnableStreaming, Value: "0", Type: TypeBool}, {Scope: ScopeSession, Name: TiDBEnableChunkRPC, Value: "1", Type: TypeBool}, + {Scope: ScopeSession, Name: TxnIsolationOneShot, Value: ""}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableTablePartition, Value: "on"}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBHashJoinConcurrency, Value: strconv.Itoa(DefTiDBHashJoinConcurrency)}, diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index a3f890e7ef017..4415512766d7d 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -96,6 +96,7 @@ const ( // The following session variables controls the memory quota during query execution. // "tidb_mem_quota_query": control the memory quota of a query. TIDBMemQuotaQuery = "tidb_mem_quota_query" // Bytes. + TIDBMemQuotaStatistics = "tidb_mem_quota_statistics" TIDBNestedLoopJoinCacheCapacity = "tidb_nested_loop_join_cache_capacity" // TODO: remove them below sometime, it should have only one Quota(TIDBMemQuotaQuery). TIDBMemQuotaHashJoin = "tidb_mem_quota_hashjoin" // Bytes. diff --git a/sessionctx/variable/varsutil_test.go b/sessionctx/variable/varsutil_test.go index 946d12bd86cb8..dd4a0db50c550 100644 --- a/sessionctx/variable/varsutil_test.go +++ b/sessionctx/variable/varsutil_test.go @@ -84,6 +84,7 @@ func (s *testVarsutilSuite) TestNewSessionVars(c *C) { c.Assert(vars.MaxChunkSize, Equals, DefMaxChunkSize) c.Assert(vars.DMLBatchSize, Equals, DefDMLBatchSize) c.Assert(vars.MemQuotaQuery, Equals, config.GetGlobalConfig().MemQuotaQuery) + c.Assert(vars.MemQuotaStatistics, Equals, config.GetGlobalConfig().MemQuotaStatistics) c.Assert(vars.MemQuotaHashJoin, Equals, int64(DefTiDBMemQuotaHashJoin)) c.Assert(vars.MemQuotaMergeJoin, Equals, int64(DefTiDBMemQuotaMergeJoin)) c.Assert(vars.MemQuotaSort, Equals, int64(DefTiDBMemQuotaSort)) diff --git a/statistics/handle/bootstrap.go b/statistics/handle/bootstrap.go index f545fda4741cd..a896c3e9e18d9 100644 --- a/statistics/handle/bootstrap.go +++ b/statistics/handle/bootstrap.go @@ -15,10 +15,12 @@ package handle import ( "context" + "encoding/binary" "fmt" "github.com/cznic/mathutil" "github.com/pingcap/errors" + "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" @@ -32,7 +34,11 @@ import ( "go.uber.org/zap" ) -func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, cache *statsCache, iter *chunk.Iterator4Chunk) { +// defaultCMSAndHistSize is the default statistics data size for one (CMSKetch + Histogram). +var defaultCMSAndHistSize = int64(statistics.AnalyzeOptionDefault[ast.AnalyzeOptCMSketchWidth]*statistics.AnalyzeOptionDefault[ast.AnalyzeOptCMSketchDepth]*binary.MaxVarintLen32) + + int64(statistics.AnalyzeOptionDefault[ast.AnalyzeOptNumBuckets]*2*binary.MaxVarintLen64) + +func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, tables map[int64]*statistics.Table, iter *chunk.Iterator4Chunk) { for row := iter.Begin(); row != iter.End(); row = iter.Next() { physicalID := row.GetInt64(1) table, ok := h.getTableByPhysicalID(is, physicalID) @@ -54,43 +60,45 @@ func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, cache *statsCache Version: row.GetUint64(0), Name: getFullTableName(is, tableInfo), } - cache.tables[physicalID] = tbl + // Ignore the memory usage, it will be calculated later. + tables[tbl.PhysicalID] = tbl } } -func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (statsCache, error) { +func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (map[int64]*statistics.Table, error) { sql := "select HIGH_PRIORITY version, table_id, modify_count, count from mysql.stats_meta" rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) if len(rc) > 0 { defer terror.Call(rc[0].Close) } if err != nil { - return statsCache{}, errors.Trace(err) + return nil, errors.Trace(err) } - tables := statsCache{tables: make(map[int64]*statistics.Table)} + tables := make(map[int64]*statistics.Table) + req := rc[0].NewChunk() iter := chunk.NewIterator4Chunk(req) for { err := rc[0].Next(context.TODO(), req) if err != nil { - return statsCache{}, errors.Trace(err) + return nil, errors.Trace(err) } if req.NumRows() == 0 { break } - h.initStatsMeta4Chunk(is, &tables, iter) + h.initStatsMeta4Chunk(is, tables, iter) } return tables, nil } -func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache *statsCache, iter *chunk.Iterator4Chunk) { +func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, tables map[int64]*statistics.Table, iter *chunk.Iterator4Chunk) { for row := iter.Begin(); row != iter.End(); row = iter.Next() { - table, ok := cache.tables[row.GetInt64(0)] + table, ok := tables[row.GetInt64(0)] if !ok { continue } - id, ndv, nullCount, version, totColSize := row.GetInt64(2), row.GetInt64(3), row.GetInt64(5), row.GetUint64(4), row.GetInt64(7) - lastAnalyzePos := row.GetDatum(11, types.NewFieldType(mysql.TypeBlob)) + id, ndv, nullCount, version, totColSize := row.GetInt64(2), row.GetInt64(3), row.GetInt64(5), row.GetUint64(4), row.GetInt64(6) + lastAnalyzePos := row.GetDatum(10, types.NewFieldType(mysql.TypeBlob)) tbl, _ := h.getTableByPhysicalID(is, table.PhysicalID) if row.GetInt64(1) > 0 { var idxInfo *model.IndexInfo @@ -103,18 +111,13 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache *stat if idxInfo == nil { continue } - cms, err := statistics.DecodeCMSketch(row.GetBytes(6), nil) - if err != nil { - cms = nil - terror.Log(errors.Trace(err)) - } - hist := statistics.NewHistogram(id, ndv, nullCount, version, types.NewFieldType(mysql.TypeBlob), chunk.InitialCapacity, 0) + hist := statistics.NewHistogram(id, ndv, nullCount, version, types.NewFieldType(mysql.TypeBlob), 0, 0) index := &statistics.Index{ - Histogram: *hist, - CMSketch: cms, - Info: idxInfo, - StatsVer: row.GetInt64(8), - Flag: row.GetInt64(10), + Histogram: *hist, + PhysicalID: table.PhysicalID, + Info: idxInfo, + StatsVer: row.GetInt64(7), + Flag: row.GetInt64(9), } lastAnalyzePos.Copy(&index.LastAnalyzePos) table.Indices[hist.ID] = index @@ -130,14 +133,14 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache *stat continue } hist := statistics.NewHistogram(id, ndv, nullCount, version, &colInfo.FieldType, 0, totColSize) - hist.Correlation = row.GetFloat64(9) + hist.Correlation = row.GetFloat64(8) col := &statistics.Column{ Histogram: *hist, PhysicalID: table.PhysicalID, Info: colInfo, Count: nullCount, IsHandle: tbl.Meta().PKIsHandle && mysql.HasPriKeyFlag(colInfo.Flag), - Flag: row.GetInt64(10), + Flag: row.GetInt64(9), } lastAnalyzePos.Copy(&col.LastAnalyzePos) table.Columns[hist.ID] = col @@ -145,8 +148,72 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache *stat } } -func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache *statsCache) error { - sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms" +// initStatsHistograms loads ALL the meta data except cm_sketch. +func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, tables map[int64]*statistics.Table) error { + sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version," + + " null_count, tot_col_size, stats_ver, correlation, flag, last_analyze_pos " + + "from mysql.stats_histograms" + rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) + if len(rc) > 0 { + defer terror.Call(rc[0].Close) + } + if err != nil { + return errors.Trace(err) + } + req := rc[0].NewChunk() + iter := chunk.NewIterator4Chunk(req) + for { + err := rc[0].Next(context.TODO(), req) + if err != nil { + return errors.Trace(err) + } + if req.NumRows() == 0 { + break + } + h.initStatsHistograms4Chunk(is, tables, iter) + } + return nil +} + +func (h *Handle) initCMSketch4Indices4Chunk(is infoschema.InfoSchema, tables map[int64]*statistics.Table, iter *chunk.Iterator4Chunk) { + for row := iter.Begin(); row != iter.End(); row = iter.Next() { + table, ok := tables[row.GetInt64(0)] + if !ok { + continue + } + id := row.GetInt64(2) + tbl, _ := h.getTableByPhysicalID(is, table.PhysicalID) + if row.GetInt64(1) > 0 { + var idxInfo *model.IndexInfo + for _, idx := range tbl.Meta().Indices { + if idx.ID == id { + idxInfo = idx + break + } + } + if idxInfo == nil { + continue + } + idx := table.Indices[id] + if idx == nil { + continue + } + cms, err := statistics.DecodeCMSketch(row.GetBytes(3), nil) + if err != nil { + cms = nil + terror.Log(errors.Trace(err)) + } + idx.CMSketch = cms + } + } +} + +func (h *Handle) initCMSketch4Indices(is infoschema.InfoSchema, tables map[int64]*statistics.Table) error { + // indcies should be loaded first + limitSize := h.mu.ctx.GetSessionVars().MemQuotaStatistics / defaultCMSAndHistSize + sql := "select HIGH_PRIORITY table_id, is_index, hist_id, cm_sketch " + + "from mysql.stats_histograms where is_index = 1 " + + fmt.Sprintf("order by table_id, hist_id limit %d", limitSize) rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) if len(rc) > 0 { defer terror.Call(rc[0].Close) @@ -164,18 +231,19 @@ func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache *statsCache if req.NumRows() == 0 { break } - h.initStatsHistograms4Chunk(is, cache, iter) + h.initCMSketch4Indices4Chunk(is, tables, iter) } return nil } -func (h *Handle) initStatsTopN4Chunk(cache *statsCache, iter *chunk.Iterator4Chunk) { +func (h *Handle) initStatsTopN4Chunk(tables map[int64]*statistics.Table, iter *chunk.Iterator4Chunk) { for row := iter.Begin(); row != iter.End(); row = iter.Next() { - table, ok := cache.tables[row.GetInt64(0)] + table, ok := tables[row.GetInt64(0)] if !ok { continue } idx, ok := table.Indices[row.GetInt64(1)] + // If idx.CMSketch == nil, the index is not loaded. if !ok || idx.CMSketch == nil { continue } @@ -185,8 +253,59 @@ func (h *Handle) initStatsTopN4Chunk(cache *statsCache, iter *chunk.Iterator4Chu } } -func (h *Handle) initStatsTopN(cache *statsCache) error { - sql := "select HIGH_PRIORITY table_id, hist_id, value, count from mysql.stats_top_n where is_index = 1" +func (h *Handle) initStatsTopN(tables map[int64]*statistics.Table) error { + limitSize := (h.mu.ctx.GetSessionVars().MemQuotaStatistics / defaultCMSAndHistSize) * int64(statistics.AnalyzeOptionDefault[ast.AnalyzeOptNumTopN]) + sql := "select HIGH_PRIORITY table_id, hist_id, value, count " + + "from mysql.stats_top_n " + + fmt.Sprintf("where is_index = 1 order by table_id, hist_id limit %d", limitSize) + rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) + if len(rc) > 0 { + defer terror.Call(rc[0].Close) + } + if err != nil { + return errors.Trace(err) + } + req := rc[0].NewChunk() + iter := chunk.NewIterator4Chunk(req) + for { + err := rc[0].Next(context.TODO(), req) + if err != nil { + return errors.Trace(err) + } + if req.NumRows() == 0 { + break + } + h.initStatsTopN4Chunk(tables, iter) + } + return nil +} + +func initColumnCountMeta4Chunk(tables map[int64]*statistics.Table, iter *chunk.Iterator4Chunk) error { + for row := iter.Begin(); row != iter.End(); row = iter.Next() { + tableID, histID, decimalCount := row.GetInt64(0), row.GetInt64(1), row.GetMyDecimal(2) + table, ok := tables[tableID] + count, err := decimalCount.ToInt() + if !ok { + continue + } + if err != nil { + return err + } + column, ok := table.Columns[histID] + if !ok { + continue + } + column.Count += count + } + return nil +} + +// initColumnCount loads row count for each column. +func (h *Handle) initColumnCount(tables map[int64]*statistics.Table) (err error) { + sql := "select HIGH_PRIORITY table_id, hist_id, sum(count) " + + "from mysql.stats_buckets " + + "where is_index = 0 " + + "group by table_id, hist_id " rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) if len(rc) > 0 { defer terror.Call(rc[0].Close) @@ -204,15 +323,18 @@ func (h *Handle) initStatsTopN(cache *statsCache) error { if req.NumRows() == 0 { break } - h.initStatsTopN4Chunk(cache, iter) + err = initColumnCountMeta4Chunk(tables, iter) + if err != nil { + return err + } } return nil } -func initStatsBuckets4Chunk(ctx sessionctx.Context, cache *statsCache, iter *chunk.Iterator4Chunk) { +func initStatsBuckets4Chunk(ctx sessionctx.Context, tables map[int64]*statistics.Table, iter *chunk.Iterator4Chunk) { for row := iter.Begin(); row != iter.End(); row = iter.Next() { tableID, isIndex, histID := row.GetInt64(0), row.GetInt64(1), row.GetInt64(2) - table, ok := cache.tables[tableID] + table, ok := tables[tableID] if !ok { continue } @@ -220,18 +342,14 @@ func initStatsBuckets4Chunk(ctx sessionctx.Context, cache *statsCache, iter *chu var hist *statistics.Histogram if isIndex > 0 { index, ok := table.Indices[histID] - if !ok { + if !ok || index.CMSketch == nil { continue } hist = &index.Histogram lower, upper = types.NewBytesDatum(row.GetBytes(5)), types.NewBytesDatum(row.GetBytes(6)) } else { column, ok := table.Columns[histID] - if !ok { - continue - } - column.Count += row.GetInt64(3) - if !mysql.HasPriKeyFlag(column.Info.Flag) { + if !ok || !mysql.HasPriKeyFlag(column.Info.Flag) { continue } hist = &column.Histogram @@ -253,10 +371,14 @@ func initStatsBuckets4Chunk(ctx sessionctx.Context, cache *statsCache, iter *chu } hist.AppendBucket(&lower, &upper, row.GetInt64(3), row.GetInt64(4)) } + } -func (h *Handle) initStatsBuckets(cache *statsCache) error { - sql := "select HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound from mysql.stats_buckets order by table_id, is_index, hist_id, bucket_id" +func (h *Handle) initStatsBuckets(tables map[int64]*statistics.Table) (err error) { + limitSize := (h.mu.ctx.GetSessionVars().MemQuotaStatistics / defaultCMSAndHistSize) * int64(statistics.AnalyzeOptionDefault[ast.AnalyzeOptNumBuckets]) + sql := "select HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound," + + "upper_bound from mysql.stats_buckets " + + fmt.Sprintf("order by is_index desc, table_id, hist_id, bucket_id limit %d", limitSize) rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) if len(rc) > 0 { defer terror.Call(rc[0].Close) @@ -266,18 +388,31 @@ func (h *Handle) initStatsBuckets(cache *statsCache) error { } req := rc[0].NewChunk() iter := chunk.NewIterator4Chunk(req) + var lastTableID int64 = -1 + var totalRowsCnt int64 for { err := rc[0].Next(context.TODO(), req) + totalRowsCnt += int64(req.NumRows()) if err != nil { return errors.Trace(err) } if req.NumRows() == 0 { + if limitSize <= totalRowsCnt && lastTableID != -1 { + // remove the stats buckets of the last table_id because it may + // not be loaded fully. + tables[lastTableID] = tables[lastTableID].CopyWithoutBucketsAndCMS() + } break } - initStatsBuckets4Chunk(h.mu.ctx, cache, iter) + lastTableID = req.GetRow(req.NumRows() - 1).GetInt64(0) + initStatsBuckets4Chunk(h.mu.ctx, tables, iter) } - lastVersion := uint64(0) - for _, table := range cache.tables { + return nil +} + +func (h *Handle) preCalcScalar4StatsBuckets(tables map[int64]*statistics.Table) (lastVersion uint64, err error) { + lastVersion = uint64(0) + for _, table := range tables { lastVersion = mathutil.MaxUint64(lastVersion, table.Version) for _, idx := range table.Indices { for i := 1; i < idx.Len(); i++ { @@ -292,8 +427,7 @@ func (h *Handle) initStatsBuckets(cache *statsCache) error { col.PreCalculateScalar() } } - cache.version = lastVersion - return nil + return lastVersion, nil } // InitStats will init the stats cache using full load strategy. @@ -310,24 +444,35 @@ func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) { if err != nil { return err } - cache, err := h.initStatsMeta(is) + tables, err := h.initStatsMeta(is) + if err != nil { + return errors.Trace(err) + } + err = h.initStatsHistograms(is, tables) if err != nil { return errors.Trace(err) } - err = h.initStatsHistograms(is, &cache) + err = h.initCMSketch4Indices(is, tables) if err != nil { return errors.Trace(err) } - err = h.initStatsTopN(&cache) + err = h.initStatsTopN(tables) if err != nil { return err } - err = h.initStatsBuckets(&cache) + err = h.initColumnCount(tables) + if err != nil { + return errors.Trace(err) + } + err = h.initStatsBuckets(tables) + if err != nil { + return errors.Trace(err) + } + version, err := h.preCalcScalar4StatsBuckets(tables) if err != nil { return errors.Trace(err) } - cache.initMemoryUsage() - h.updateStatsCache(cache) + h.statsCache.initStatsCache(tables, version) return nil } diff --git a/statistics/handle/dump_test.go b/statistics/handle/dump_test.go index d68181e0e8cab..9f4cde719a3ec 100644 --- a/statistics/handle/dump_test.go +++ b/statistics/handle/dump_test.go @@ -98,7 +98,7 @@ PARTITION BY RANGE ( a ) ( tk.MustExec("delete from mysql.stats_meta") tk.MustExec("delete from mysql.stats_histograms") tk.MustExec("delete from mysql.stats_buckets") - h.Clear() + h.Clear4Test() err = h.LoadStatsFromJSON(s.do.InfoSchema(), jsonTbl) c.Assert(err, IsNil) diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index 1322068479a11..11b381a7a8f1c 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -18,7 +18,6 @@ import ( "encoding/json" "fmt" "sync" - "sync/atomic" "time" "github.com/cznic/mathutil" @@ -40,20 +39,11 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/logutil" - "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/sqlexec" atomic2 "go.uber.org/atomic" "go.uber.org/zap" ) -// statsCache caches the tables in memory for Handle. -type statsCache struct { - tables map[int64]*statistics.Table - // version is the latest version of cache. - version uint64 - memUsage int64 -} - // Handle can update stats info periodically. type Handle struct { mu struct { @@ -69,11 +59,7 @@ type Handle struct { // It can be read by multiple readers at the same time without acquiring lock, but it can be // written only after acquiring the lock. - statsCache struct { - sync.Mutex - atomic.Value - memTracker *memory.Tracker - } + statsCache *statsCache restrictedExec sqlexec.RestrictedSQLExecutor @@ -90,13 +76,11 @@ type Handle struct { lease atomic2.Duration } -// Clear the statsCache, only for test. -func (h *Handle) Clear() { +// Clear4Test the statsCache, only for test. +func (h *Handle) Clear4Test() { h.mu.Lock() - h.statsCache.Lock() - h.statsCache.Store(statsCache{tables: make(map[int64]*statistics.Table)}) - h.statsCache.memTracker = memory.NewTracker(memory.LabelForStatsCache, -1) - h.statsCache.Unlock() + h.SetBytesLimit4Test(h.mu.ctx.GetSessionVars().MemQuotaStatistics) + h.statsCache.Clear() for len(h.ddlEventCh) > 0 { <-h.ddlEventCh } @@ -124,10 +108,9 @@ func NewHandle(ctx sessionctx.Context, lease time.Duration) (*Handle, error) { if exec, ok := ctx.(sqlexec.RestrictedSQLExecutor); ok { handle.restrictedExec = exec } - handle.statsCache.memTracker = memory.NewTracker(memory.LabelForStatsCache, -1) + handle.statsCache = newStatsCache(ctx.GetSessionVars().MemQuotaStatistics) handle.mu.ctx = ctx handle.mu.rateMap = make(errorRateDeltaMap) - handle.statsCache.Store(statsCache{tables: make(map[int64]*statistics.Table)}) err := handle.RefreshVars() if err != nil { return nil, err @@ -160,15 +143,14 @@ func DurationToTS(d time.Duration) uint64 { // Update reads stats meta from store and updates the stats map. func (h *Handle) Update(is infoschema.InfoSchema) error { - oldCache := h.statsCache.Load().(statsCache) - lastVersion := oldCache.version + lastVersion := h.statsCache.GetVersion() // We need this because for two tables, the smaller version may write later than the one with larger version. // Consider the case that there are two tables A and B, their version and commit time is (A0, A1) and (B0, B1), // and A0 < B0 < B1 < A1. We will first read the stats of B, and update the lastVersion to B0, but we cannot read // the table stats of A0 if we read stats that greater than lastVersion which is B0. // We can read the stats if the diff between commit time and version is less than three lease. offset := DurationToTS(3 * h.Lease()) - if oldCache.version >= offset { + if lastVersion >= offset { lastVersion = lastVersion - offset } else { lastVersion = 0 @@ -212,7 +194,7 @@ func (h *Handle) Update(is infoschema.InfoSchema) error { tbl.Name = getFullTableName(is, tableInfo) tables = append(tables, tbl) } - h.updateStatsCache(oldCache.update(tables, deletedTableIDs, lastVersion)) + h.statsCache.Update(tables, deletedTableIDs, lastVersion) return nil } @@ -246,17 +228,25 @@ func buildPartitionID2TableID(is infoschema.InfoSchema) map[int64]int64 { // GetMemConsumed returns the mem size of statscache consumed func (h *Handle) GetMemConsumed() (size int64) { + h.statsCache.mu.Lock() size = h.statsCache.memTracker.BytesConsumed() + h.statsCache.mu.Unlock() return } -// GetAllTableStatsMemUsage get all the mem usage with true table. -// only used by test. -func (h *Handle) GetAllTableStatsMemUsage() int64 { - data := h.statsCache.Value.Load().(statsCache) - cache := data.copy() +// EraseTable4Test erase a table by ID and add new empty (with Meta) table. +// ONLY used for test. +func (h *Handle) EraseTable4Test(ID int64) { + table, _ := h.statsCache.Lookup(ID) + h.statsCache.Insert(table.CopyWithoutBucketsAndCMS()) +} + +// GetAllTableStatsMemUsage4Test get all the mem usage with true table. +// ONLY used for test. +func (h *Handle) GetAllTableStatsMemUsage4Test() int64 { + data := h.statsCache.GetAll() allUsage := int64(0) - for _, t := range cache.tables { + for _, t := range data { allUsage += t.MemoryUsage() } return allUsage @@ -269,72 +259,49 @@ func (h *Handle) GetTableStats(tblInfo *model.TableInfo) *statistics.Table { // GetPartitionStats retrieves the partition stats from cache. func (h *Handle) GetPartitionStats(tblInfo *model.TableInfo, pid int64) *statistics.Table { - statsCache := h.statsCache.Load().(statsCache) - tbl, ok := statsCache.tables[pid] + tbl, ok := h.statsCache.Lookup(pid) if !ok { tbl = statistics.PseudoTable(tblInfo) tbl.PhysicalID = pid - h.updateStatsCache(statsCache.update([]*statistics.Table{tbl}, nil, statsCache.version)) + h.statsCache.Update([]*statistics.Table{tbl}, nil, h.statsCache.GetVersion()) return tbl } return tbl } -func (h *Handle) updateStatsCache(newCache statsCache) { - h.statsCache.Lock() - oldCache := h.statsCache.Load().(statsCache) - if oldCache.version <= newCache.version { - h.statsCache.memTracker.Consume(newCache.memUsage - oldCache.memUsage) - h.statsCache.Store(newCache) - } - h.statsCache.Unlock() -} - -func (sc statsCache) copy() statsCache { - newCache := statsCache{tables: make(map[int64]*statistics.Table, len(sc.tables)), - version: sc.version, - memUsage: sc.memUsage} - for k, v := range sc.tables { - newCache.tables[k] = v - } - return newCache -} - -//initMemoryUsage calc total memory usage of statsCache and set statsCache.memUsage -//should be called after the tables and their stats are initilazed -func (sc statsCache) initMemoryUsage() { - sum := int64(0) - for _, tb := range sc.tables { - sum += tb.MemoryUsage() - } - sc.memUsage = sum - return +// SetBytesLimit4Test sets the bytes limit for this tracker. "bytesLimit <= 0" means no limit. +// Only used for test. +func (h *Handle) SetBytesLimit4Test(bytesLimit int64) { + h.statsCache.mu.Lock() + h.statsCache.memTracker.SetBytesLimit(bytesLimit) + h.statsCache.memCapacity = bytesLimit + h.statsCache.mu.Unlock() } -// update updates the statistics table cache using copy on write. -func (sc statsCache) update(tables []*statistics.Table, deletedIDs []int64, newVersion uint64) statsCache { - newCache := sc.copy() - newCache.version = newVersion - for _, tbl := range tables { - id := tbl.PhysicalID - if ptbl, ok := newCache.tables[id]; ok { - newCache.memUsage -= ptbl.MemoryUsage() - } - newCache.tables[id] = tbl - newCache.memUsage += tbl.MemoryUsage() +// CanRuntimePrune indicates whether tbl support runtime prune for table and first partition id. +func (h *Handle) CanRuntimePrune(tid, p0Id int64) bool { + if h == nil { + return false } - for _, id := range deletedIDs { - if ptbl, ok := newCache.tables[id]; ok { - newCache.memUsage -= ptbl.MemoryUsage() - } - delete(newCache.tables, id) + if tid == p0Id { + return false + } + _, tblExists := h.statsCache.Lookup(tid) + if tblExists { + return true } - return newCache + _, partExists := h.statsCache.Lookup(p0Id) + if !partExists { + return true + } + return false + } // LoadNeededHistograms will load histograms for those needed columns. func (h *Handle) LoadNeededHistograms() (err error) { cols := statistics.HistogramNeededColumns.AllCols() + idxs := statistics.HistogramNeededIndices.AllIdxs() reader, err := h.getStatsReader(nil) if err != nil { return err @@ -348,8 +315,7 @@ func (h *Handle) LoadNeededHistograms() (err error) { }() for _, col := range cols { - statsCache := h.statsCache.Load().(statsCache) - tbl, ok := statsCache.tables[col.TableID] + tbl, ok := h.statsCache.Lookup(col.TableID) if !ok { continue } @@ -375,21 +341,51 @@ func (h *Handle) LoadNeededHistograms() (err error) { Count: int64(hg.TotalRowCount()), IsHandle: c.IsHandle, } - h.updateStatsCache(statsCache.update([]*statistics.Table{tbl}, nil, statsCache.version)) + h.statsCache.Update([]*statistics.Table{tbl}, nil, h.statsCache.GetVersion()) statistics.HistogramNeededColumns.Delete(col) } + + for _, pidx := range idxs { + tbl, ok := h.statsCache.Lookup(pidx.TableID) + if !ok { + continue + } + tbl = tbl.Copy() + idx, ok := tbl.Indices[pidx.IndexID] + if !ok || idx.Len() > 0 { + statistics.HistogramNeededIndices.Delete(pidx) + continue + } + hg, err := h.histogramFromStorage(reader, pidx.TableID, idx.ID, types.NewFieldType(mysql.TypeBlob), idx.NDV, 1, idx.LastUpdateVersion, idx.NullCount, 0, 0) + if err != nil { + return errors.Trace(err) + } + cms, err := h.cmSketchFromStorage(reader, pidx.TableID, 1, pidx.IndexID) + if err != nil { + return errors.Trace(err) + } + tbl.Indices[idx.ID] = &statistics.Index{ + Histogram: *hg, + CMSketch: cms, + PhysicalID: pidx.TableID, + Info: idx.Info, + StatsVer: idx.StatsVer, + Flag: idx.Flag, + } + h.statsCache.Update([]*statistics.Table{tbl}, nil, h.statsCache.GetVersion()) + statistics.HistogramNeededIndices.Delete(pidx) + } return nil } // LastUpdateVersion gets the last update version. func (h *Handle) LastUpdateVersion() uint64 { - return h.statsCache.Load().(statsCache).version + return h.statsCache.GetVersion() } // SetLastUpdateVersion sets the last update version. func (h *Handle) SetLastUpdateVersion(version uint64) { - statsCache := h.statsCache.Load().(statsCache) - h.updateStatsCache(statsCache.update(nil, nil, version)) + h.statsCache.Update(nil, nil, version) } // FlushStats flushes the cached stats update into store. @@ -449,7 +445,7 @@ func (h *Handle) indexStatsFromStorage(reader *statsReader, row chunk.Row, table if err != nil { return errors.Trace(err) } - idx = &statistics.Index{Histogram: *hg, CMSketch: cms, Info: idxInfo, ErrorRate: errorRate, StatsVer: row.GetInt64(7), Flag: flag} + idx = &statistics.Index{Histogram: *hg, CMSketch: cms, Info: idxInfo, ErrorRate: errorRate, StatsVer: row.GetInt64(7), Flag: flag, PhysicalID: table.PhysicalID} lastAnalyzePos.Copy(&idx.LastAnalyzePos) } break @@ -562,7 +558,8 @@ func (h *Handle) tableStatsFromStorage(tableInfo *model.TableInfo, physicalID in err = err1 } }() - table, ok := h.statsCache.Load().(statsCache).tables[physicalID] + table, ok := h.statsCache.Lookup(physicalID) + // If table stats is pseudo, we also need to copy it, since we will use the column stats when // the average error rate of it is small. if !ok || historyStatsExec != nil { @@ -954,10 +951,11 @@ func (h *Handle) ReloadExtendedStatistics() error { if err != nil { return err } - oldCache := h.statsCache.Load().(statsCache) - tables := make([]*statistics.Table, 0, len(oldCache.tables)) - for physicalID, tbl := range oldCache.tables { - t, err := h.extendedStatsFromStorage(reader, tbl.Copy(), physicalID, true) + allTables := h.statsCache.GetAll() + tables := make([]*statistics.Table, 0, len(allTables)) + for _, tbl := range allTables { + t, err := h.extendedStatsFromStorage(reader, tbl.Copy(), tbl.PhysicalID, true) + if err != nil { return err } @@ -968,7 +966,7 @@ func (h *Handle) ReloadExtendedStatistics() error { return err } // Note that this update may fail when the statsCache.version has been modified by others. - h.updateStatsCache(oldCache.update(tables, nil, oldCache.version)) + h.statsCache.Update(tables, nil, h.statsCache.GetVersion()) return nil } diff --git a/statistics/handle/handle_test.go b/statistics/handle/handle_test.go index e5c3d6e573182..a56f068a4d047 100644 --- a/statistics/handle/handle_test.go +++ b/statistics/handle/handle_test.go @@ -52,7 +52,7 @@ func cleanEnv(c *C, store kv.Storage, do *domain.Domain) { tk.MustExec("delete from mysql.stats_histograms") tk.MustExec("delete from mysql.stats_buckets") tk.MustExec("delete from mysql.stats_extended") - do.StatsHandle().Clear() + do.StatsHandle().Clear4Test() } func (s *testStatsSuite) TestStatsCache(c *C) { @@ -85,7 +85,7 @@ func (s *testStatsSuite) TestStatsCache(c *C) { // If the new schema drop a column, the table stats can still work. testKit.MustExec("alter table t drop column c2") is = do.InfoSchema() - do.StatsHandle().Clear() + do.StatsHandle().Clear4Test() do.StatsHandle().Update(is) statsTbl = do.StatsHandle().GetTableStats(tableInfo) c.Assert(statsTbl.Pseudo, IsFalse) @@ -94,7 +94,7 @@ func (s *testStatsSuite) TestStatsCache(c *C) { testKit.MustExec("alter table t add column c10 int") is = do.InfoSchema() - do.StatsHandle().Clear() + do.StatsHandle().Clear4Test() do.StatsHandle().Update(is) statsTbl = do.StatsHandle().GetTableStats(tableInfo) c.Assert(statsTbl.Pseudo, IsFalse) @@ -117,7 +117,7 @@ func (s *testStatsSuite) TestStatsCacheMemTracker(c *C) { statsTbl = do.StatsHandle().GetTableStats(tableInfo) c.Assert(statsTbl.MemoryUsage() > 0, IsTrue) - c.Assert(do.StatsHandle().GetAllTableStatsMemUsage(), Equals, do.StatsHandle().GetMemConsumed()) + c.Assert(do.StatsHandle().GetAllTableStatsMemUsage4Test(), Equals, do.StatsHandle().GetMemConsumed()) statsTbl = do.StatsHandle().GetTableStats(tableInfo) @@ -139,23 +139,24 @@ func (s *testStatsSuite) TestStatsCacheMemTracker(c *C) { // If the new schema drop a column, the table stats can still work. testKit.MustExec("alter table t drop column c2") is = do.InfoSchema() - do.StatsHandle().Clear() + do.StatsHandle().Clear4Test() do.StatsHandle().Update(is) statsTbl = do.StatsHandle().GetTableStats(tableInfo) - c.Assert(statsTbl.MemoryUsage() > 0, IsTrue) - c.Assert(do.StatsHandle().GetAllTableStatsMemUsage(), Equals, do.StatsHandle().GetMemConsumed()) + c.Assert(statsTbl.MemoryUsage() >= 0, IsTrue) + + c.Assert(do.StatsHandle().GetAllTableStatsMemUsage4Test(), Equals, do.StatsHandle().GetMemConsumed()) c.Assert(statsTbl.Pseudo, IsFalse) // If the new schema add a column, the table stats can still work. testKit.MustExec("alter table t add column c10 int") is = do.InfoSchema() - do.StatsHandle().Clear() + do.StatsHandle().Clear4Test() do.StatsHandle().Update(is) statsTbl = do.StatsHandle().GetTableStats(tableInfo) c.Assert(statsTbl.Pseudo, IsFalse) - c.Assert(do.StatsHandle().GetAllTableStatsMemUsage(), Equals, do.StatsHandle().GetMemConsumed()) + c.Assert(do.StatsHandle().GetAllTableStatsMemUsage4Test(), Equals, do.StatsHandle().GetMemConsumed()) } func assertTableEqual(c *C, a *statistics.Table, b *statistics.Table) { @@ -231,7 +232,7 @@ func (s *testStatsSuite) TestStatsStoreAndLoad(c *C) { testKit.MustExec("analyze table t") statsTbl1 := do.StatsHandle().GetTableStats(tableInfo) - do.StatsHandle().Clear() + do.StatsHandle().Clear4Test() do.StatsHandle().Update(is) statsTbl2 := do.StatsHandle().GetTableStats(tableInfo) c.Assert(statsTbl2.Pseudo, IsFalse) @@ -276,7 +277,7 @@ func (s *testStatsSuite) TestColumnIDs(c *C) { // Drop a column and the offset changed, testKit.MustExec("alter table t drop column c1") is = do.InfoSchema() - do.StatsHandle().Clear() + do.StatsHandle().Clear4Test() do.StatsHandle().Update(is) tbl, err = is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) c.Assert(err, IsNil) @@ -494,14 +495,14 @@ func (s *testStatsSuite) TestInitStats(c *C) { // `Lease` is not 0, so here we just change it. h.SetLease(time.Millisecond) - h.Clear() + h.Clear4Test() c.Assert(h.InitStats(is), IsNil) table0 := h.GetTableStats(tbl.Meta()) cols := table0.Columns c.Assert(cols[1].LastAnalyzePos.GetBytes()[0], Equals, uint8(0x36)) c.Assert(cols[2].LastAnalyzePos.GetBytes()[0], Equals, uint8(0x37)) c.Assert(cols[3].LastAnalyzePos.GetBytes()[0], Equals, uint8(0x38)) - h.Clear() + h.Clear4Test() c.Assert(h.Update(is), IsNil) table1 := h.GetTableStats(tbl.Meta()) assertTableEqual(c, table0, table1) @@ -686,7 +687,7 @@ func (s *testStatsSuite) TestExtendedStatsOps(c *C) { c.Assert(len(statsTbl.ExtendedStats.Stats), Equals, 0) tk.MustExec("update mysql.stats_extended set status = 1 where stats_name = 's1' and db = 'test'") - do.StatsHandle().Clear() + do.StatsHandle().Clear4Test() do.StatsHandle().Update(is) statsTbl = do.StatsHandle().GetTableStats(tableInfo) c.Assert(statsTbl, NotNil) @@ -726,7 +727,7 @@ func (s *testStatsSuite) TestAdminReloadStatistics(c *C) { c.Assert(len(statsTbl.ExtendedStats.Stats), Equals, 0) tk.MustExec("update mysql.stats_extended set status = 1 where stats_name = 's1' and db = 'test'") - do.StatsHandle().Clear() + do.StatsHandle().Clear4Test() do.StatsHandle().Update(is) statsTbl = do.StatsHandle().GetTableStats(tableInfo) c.Assert(statsTbl, NotNil) diff --git a/statistics/handle/statscache.go b/statistics/handle/statscache.go new file mode 100644 index 0000000000000..38dd17038034b --- /dev/null +++ b/statistics/handle/statscache.go @@ -0,0 +1,170 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package handle + +import ( + "encoding/binary" + "sync" + + "github.com/pingcap/tidb/statistics" + "github.com/pingcap/tidb/util/kvcache" + "github.com/pingcap/tidb/util/memory" +) + +// statsCache caches table statistics. +type statsCache struct { + mu sync.Mutex + cache *kvcache.SimpleLRUCache + memCapacity int64 + version uint64 + memTracker *memory.Tracker +} + +type statsCacheKey int64 + +func (key statsCacheKey) Hash() []byte { + var buf = make([]byte, 8) + binary.BigEndian.PutUint64(buf, uint64(key)) + return buf +} + +// newStatsCache returns a new statsCache with capacity maxMemoryLimit. +func newStatsCache(memoryLimit int64) *statsCache { + // Since newStatsCache controls the memory usage by itself, set the capacity of + // the underlying LRUCache to max to close its memory control + cache := kvcache.NewSimpleLRUCache(uint(memoryLimit), 0.1, 0) + c := statsCache{ + cache: cache, + memCapacity: memoryLimit, + memTracker: memory.NewTracker(memory.LabelForStatsCache, -1), + } + return &c +} + +// Clear clears the statsCache. +func (sc *statsCache) Clear() { + // Since newStatsCache controls the memory usage by itself, set the capacity of + // the underlying LRUCache to max to close its memory control + sc.mu.Lock() + defer sc.mu.Unlock() + cache := kvcache.NewSimpleLRUCache(uint(sc.memCapacity), 0.1, 0) + sc.memTracker.ReplaceBytesUsed(0) + sc.cache = cache + sc.version = 0 +} + +// GetAll get all the tables point. +func (sc *statsCache) GetAll() []*statistics.Table { + sc.mu.Lock() + defer sc.mu.Unlock() + values := sc.cache.GetAll() + tables := make([]*statistics.Table, 0) + for _, v := range values { + if t, ok := v.(*statistics.Table); ok && t != nil { + tables = append(tables, t) + } + } + return tables +} + +// lookupUnsafe get table with id without Lock. +func (sc *statsCache) lookupUnsafe(id int64) (*statistics.Table, bool) { + var key = statsCacheKey(id) + value, hit := sc.cache.Get(key) + if !hit { + return nil, false + } + table := value.(*statistics.Table) + return table, true +} + +// Lookup get table with id. +func (sc *statsCache) Lookup(id int64) (*statistics.Table, bool) { + sc.mu.Lock() + defer sc.mu.Unlock() + return sc.lookupUnsafe(id) +} + +// Insert inserts a new table to the statsCache. +// If the memory consumption exceeds the capacity, remove the buckets and +// CMSketch of the oldest cache and add metadata of it +func (sc *statsCache) Insert(table *statistics.Table) { + if table == nil { + return + } + var key = statsCacheKey(table.PhysicalID) + mem := table.MemoryUsage() + // We do not need to check whether mem > sc.memCapacity, because the lower + // bound of statistics is set, it's almost impossible the stats memory usage + // of one table exceeds the capacity. + for mem+sc.memTracker.BytesConsumed() > sc.memCapacity { + evictedKey, evictedValue, evicted := sc.cache.RemoveOldest() + if !evicted { + return + } + sc.memTracker.Consume(-evictedValue.(*statistics.Table).MemoryUsage()) + sc.cache.Put(evictedKey, evictedValue.(*statistics.Table).CopyWithoutBucketsAndCMS()) + } + // erase the old element since the value may be different from the existing one. + sc.Erase(table.PhysicalID) + sc.cache.Put(key, table) + sc.memTracker.Consume(mem) + return +} + +// Erase erase a stateCache with physical id. +func (sc *statsCache) Erase(deletedID int64) bool { + table, hit := sc.lookupUnsafe(deletedID) + if !hit { + return false + } + + key := statsCacheKey(deletedID) + sc.cache.Delete(key) + sc.memTracker.Consume(-table.MemoryUsage()) + return true +} + +// Update updates the statistics table cache. +func (sc *statsCache) Update(tables []*statistics.Table, deletedIDs []int64, newVersion uint64) { + sc.mu.Lock() + defer sc.mu.Unlock() + if sc.version <= newVersion { + sc.version = newVersion + for _, id := range deletedIDs { + sc.Erase(id) + } + for _, tbl := range tables { + sc.Insert(tbl) + } + } +} + +func (sc *statsCache) GetVersion() uint64 { + sc.mu.Lock() + defer sc.mu.Unlock() + return sc.version +} + +// initStatsCache should be invoked after the tables and their stats are initialized +// using tables map and version to init statsCache +func (sc *statsCache) initStatsCache(tables map[int64]*statistics.Table, version uint64) { + sc.mu.Lock() + defer sc.mu.Unlock() + for _, tbl := range tables { + sc.Insert(tbl) + } + sc.version = version + return +} diff --git a/statistics/handle/statscache_test.go b/statistics/handle/statscache_test.go new file mode 100644 index 0000000000000..f2c5f3439ad19 --- /dev/null +++ b/statistics/handle/statscache_test.go @@ -0,0 +1,262 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package handle_test + +import ( + "fmt" + "time" + + . "github.com/pingcap/check" + "github.com/pingcap/parser/model" + "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/statistics/handle" + "github.com/pingcap/tidb/util/testkit" +) + +func (s *testStatsSuite) TestStatsCacheMiniMemoryLimit(c *C) { + defer cleanEnv(c, s.store, s.do) + testKit := testkit.NewTestKit(c, s.store) + testKit.MustExec("use test") + testKit.MustExec("create table t1 (c1 int, c2 int)") + testKit.MustExec("insert into t1 values(1, 2)") + do := s.do + is := do.InfoSchema() + tbl1, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) + c.Assert(err, IsNil) + tableInfo1 := tbl1.Meta() + statsTbl1 := do.StatsHandle().GetTableStats(tableInfo1) + c.Assert(statsTbl1.Pseudo, IsTrue) + + testKit.MustExec("analyze table t1") + statsTbl1 = do.StatsHandle().GetTableStats(tableInfo1) + c.Assert(statsTbl1.Pseudo, IsFalse) + + // set new BytesLimit + BytesLimit := int64(90000) + + do.StatsHandle().SetBytesLimit4Test(BytesLimit) + // create t2 and kick t1 of cache + testKit.MustExec("create table t2 (c1 int, c2 int)") + testKit.MustExec("insert into t2 values(1, 2)") + do = s.do + is = do.InfoSchema() + tbl2, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t2")) + c.Assert(err, IsNil) + tableInfo2 := tbl2.Meta() + statsTbl2 := do.StatsHandle().GetTableStats(tableInfo2) + statsTbl1 = do.StatsHandle().GetTableStats(tableInfo1) + + c.Assert(statsTbl2.Pseudo, IsTrue) + testKit.MustExec("analyze table t2") + tbl2, err = is.TableByName(model.NewCIStr("test"), model.NewCIStr("t2")) + c.Assert(err, IsNil) + + statsTbl2 = do.StatsHandle().GetTableStats(tableInfo2) + c.Assert(statsTbl2.Pseudo, IsFalse) + + c.Assert(BytesLimit >= do.StatsHandle().GetMemConsumed(), IsTrue) +} + +func (s *testStatsSuite) TestLoadHistWithLimit(c *C) { + defer cleanEnv(c, s.store, s.do) + testKit := testkit.NewTestKit(c, s.store) + h := s.do.StatsHandle() + origLease := h.Lease() + h.SetLease(time.Second) + defer func() { h.SetLease(origLease) }() + BytesLimit := int64(300000) + h.SetBytesLimit4Test(BytesLimit) + + testKit.MustExec("use test") + testKit.MustExec("create table t1(c int)") + testKit.MustExec("insert into t1 values(1),(2),(3),(4),(5)") + c.Assert(h.DumpStatsDeltaToKV(handle.DumpAll), IsNil) + testKit.MustExec("analyze table t1") + h.Clear4Test() + h.SetBytesLimit4Test(BytesLimit) + + c.Assert(h.Update(s.do.InfoSchema()), IsNil) + result := testKit.MustQuery("show stats_histograms where Table_name = 't1'") + c.Assert(len(result.Rows()), Equals, 0) + testKit.MustExec("explain select * from t1 where c = 1") + c.Assert(h.LoadNeededHistograms(), IsNil) + result = testKit.MustQuery("show stats_histograms where Table_name = 't1'") + c.Assert(len(result.Rows()), Equals, 1) + c.Assert(result.Rows()[0][9], Equals, "1") + c.Assert(BytesLimit >= h.GetMemConsumed(), IsTrue) + + // create new table + testKit.MustExec("create table t2(c int)") + testKit.MustExec("insert into t2 values(1),(2),(3),(4),(5)") + c.Assert(h.DumpStatsDeltaToKV(handle.DumpAll), IsNil) + testKit.MustExec("analyze table t2") + c.Assert(BytesLimit >= h.GetMemConsumed(), IsTrue) + +} + +func (s *testStatsSuite) TestLoadHistWithInvalidIndex(c *C) { + defer cleanEnv(c, s.store, s.do) + testKit := testkit.NewTestKit(c, s.store) + h := s.do.StatsHandle() + origLease := h.Lease() + h.SetLease(time.Second) + defer func() { h.SetLease(origLease) }() + BytesLimit := int64(300000) + h.SetBytesLimit4Test(BytesLimit) + + testKit.MustExec("use test") + testKit.MustExec("create table t1(c int)") + testKit.MustExec("insert into t1 values(1),(2),(3),(4),(5)") + c.Assert(h.DumpStatsDeltaToKV(handle.DumpAll), IsNil) + testKit.MustExec("create index idx_t on t1(c)") + + testKit.MustExec("analyze table t1") + // update all information to statscache + c.Assert(h.Update(s.do.InfoSchema()), IsNil) + + tbl1, err := s.do.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) + c.Assert(err, IsNil) + tableInfo1 := tbl1.Meta() + + // erase old table + h.EraseTable4Test(tbl1.Meta().ID) + + // add empty table + statsTbl1 := h.GetTableStats(tableInfo1) + c.Assert(statsTbl1.Indices[tbl1.Meta().Indices[0].ID].Len() == 0, IsTrue) + // load index + for _, v := range statsTbl1.Indices { + c.Assert(v.IsInvalid(&stmtctx.StatementContext{}, false), IsTrue) + } + for _, v := range statsTbl1.Columns { + c.Assert(v.IsInvalid(&stmtctx.StatementContext{}, false), IsTrue) + } + c.Assert(h.LoadNeededHistograms(), IsNil) + c.Assert(BytesLimit >= h.GetMemConsumed(), IsTrue) + statsTbl1new := h.GetTableStats(tableInfo1) + c.Assert(statsTbl1new.Indices[tbl1.Meta().Indices[0].ID].Len() > 0, IsTrue) + + c.Assert(statsTbl1new.Indices[tbl1.Meta().Indices[0].ID].String(), Equals, "index:1 ndv:5\n"+ + "num: 1 lower_bound: 1 upper_bound: 1 repeats: 1\n"+ + "num: 1 lower_bound: 2 upper_bound: 2 repeats: 1\n"+ + "num: 1 lower_bound: 3 upper_bound: 3 repeats: 1\n"+ + "num: 1 lower_bound: 4 upper_bound: 4 repeats: 1\n"+ + "num: 1 lower_bound: 5 upper_bound: 5 repeats: 1") + c.Assert(statsTbl1new.Columns[tbl1.Meta().Columns[0].ID].String(), Equals, "column:1 ndv:5 totColSize:5\n"+ + "num: 1 lower_bound: 1 upper_bound: 1 repeats: 1\n"+ + "num: 1 lower_bound: 2 upper_bound: 2 repeats: 1\n"+ + "num: 1 lower_bound: 3 upper_bound: 3 repeats: 1\n"+ + "num: 1 lower_bound: 4 upper_bound: 4 repeats: 1\n"+ + "num: 1 lower_bound: 5 upper_bound: 5 repeats: 1") +} + +func (s *testStatsSuite) TestManyTableChange(c *C) { + defer cleanEnv(c, s.store, s.do) + testKit := testkit.NewTestKit(c, s.store) + h := s.do.StatsHandle() + origLease := h.Lease() + h.SetLease(time.Second) + defer func() { h.SetLease(origLease) }() + + BytesLimit := int64(300000) + h.SetBytesLimit4Test(BytesLimit) + tableSize := 100 + testKit.MustExec("use test") + for i := 0; i <= tableSize; i++ { + testKit.MustExec(fmt.Sprintf("create table t%d(c int)", i)) + testKit.MustExec(fmt.Sprintf("insert into t%d values(1),(2),(3)", i)) + testKit.MustExec(fmt.Sprintf("analyze table t%d", i)) + } + + // update all information to statscache + c.Assert(h.DumpStatsDeltaToKV(handle.DumpAll), IsNil) + + c.Assert(h.Update(s.do.InfoSchema()), IsNil) + for i := 0; i <= tableSize; i++ { + tableName := fmt.Sprintf("t%d", i) + tbl, err := s.do.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr(tableName)) + c.Assert(err, IsNil) + tableInfo := tbl.Meta() + + // add empty table + statsTbl := h.GetTableStats(tableInfo) + + // load indices and column + for _, v := range statsTbl.Indices { + v.IsInvalid(&stmtctx.StatementContext{}, false) + } + + for _, v := range statsTbl.Columns { + v.IsInvalid(&stmtctx.StatementContext{}, false) + } + c.Assert(h.LoadNeededHistograms(), IsNil) + c.Assert(BytesLimit >= h.GetMemConsumed(), IsTrue) + statsTblnew := h.GetTableStats(tableInfo) + c.Assert(statsTblnew.MemoryUsage() > 0, IsTrue) + + for _, v := range statsTblnew.Columns { + c.Assert(v.IsInvalid(&stmtctx.StatementContext{}, false), IsFalse) + } + for _, v := range statsTblnew.Indices { + c.Assert(v.IsInvalid(&stmtctx.StatementContext{}, false), IsFalse) + } + + } +} + +func (s *testStatsSuite) TestManyTableChangeWithQuery(c *C) { + defer cleanEnv(c, s.store, s.do) + testKit := testkit.NewTestKit(c, s.store) + h := s.do.StatsHandle() + origLease := h.Lease() + h.SetLease(time.Second) + defer func() { h.SetLease(origLease) }() + + BytesLimit := int64(300000) + h.SetBytesLimit4Test(BytesLimit) + tableSize := 100 + testKit.MustExec("use test") + for i := 0; i <= tableSize; i++ { + testKit.MustExec(fmt.Sprintf("create table t%d(a int,b int,index idx(b))", i)) + testKit.MustExec(fmt.Sprintf("insert into t%d values(1,2),(2,5),(3,5)", i)) + testKit.MustExec(fmt.Sprintf("analyze table t%d", i)) + } + + // update all information to statscache + c.Assert(h.DumpStatsDeltaToKV(handle.DumpAll), IsNil) + + c.Assert(h.Update(s.do.InfoSchema()), IsNil) + for i := 0; i <= tableSize; i++ { + tableName := fmt.Sprintf("t%d", i) + tbl, err := s.do.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr(tableName)) + c.Assert(err, IsNil) + tableInfo := tbl.Meta() + testKit.MustQuery(fmt.Sprintf("select * from t%d use index(idx) where b <= 5", i)) + testKit.MustQuery(fmt.Sprintf("select * from t%d where a > 1", i)) + testKit.MustQuery(fmt.Sprintf("select * from t%d use index(idx) where b = 5", i)) + + c.Assert(h.LoadNeededHistograms(), IsNil) + c.Assert(BytesLimit >= h.GetMemConsumed(), IsTrue) + statsTblNew := h.GetTableStats(tableInfo) + c.Assert(statsTblNew.MemoryUsage() > 0, IsTrue) + + for _, v := range statsTblNew.Columns { + c.Assert(v.IsInvalid(&stmtctx.StatementContext{}, false), IsFalse) + } + for _, v := range statsTblNew.Indices { + c.Assert(v.IsInvalid(&stmtctx.StatementContext{}, false), IsFalse) + } + + } +} diff --git a/statistics/handle/update.go b/statistics/handle/update.go index 38f9b3c70aae2..f16ffe66d5e03 100644 --- a/statistics/handle/update.go +++ b/statistics/handle/update.go @@ -221,7 +221,7 @@ func needDumpStatsDelta(h *Handle, id int64, item variable.TableDelta, currentTi if item.InitTime.IsZero() { item.InitTime = currentTime } - tbl, ok := h.statsCache.Load().(statsCache).tables[id] + tbl, ok := h.statsCache.Lookup(id) if !ok { // No need to dump if the stats is invalid. return false @@ -383,7 +383,7 @@ func (h *Handle) DumpStatsFeedbackToKV() error { if fb.Tp == statistics.PkType { err = h.DumpFeedbackToKV(fb) } else { - t, ok := h.statsCache.Load().(statsCache).tables[fb.PhysicalID] + t, ok := h.statsCache.Lookup(fb.PhysicalID) if ok { err = h.DumpFeedbackForIndex(fb, t) } @@ -464,8 +464,7 @@ func (h *Handle) UpdateStatsByLocalFeedback(is infoschema.InfoSchema) { newCol.Flag = statistics.ResetAnalyzeFlag(newCol.Flag) newTblStats.Columns[fb.Hist.ID] = &newCol } - oldCache := h.statsCache.Load().(statsCache) - h.updateStatsCache(oldCache.update([]*statistics.Table{newTblStats}, nil, oldCache.version)) + h.statsCache.Update([]*statistics.Table{newTblStats}, nil, h.statsCache.GetVersion()) } } } @@ -497,8 +496,7 @@ func (h *Handle) UpdateErrorRate(is infoschema.InfoSchema) { delete(h.mu.rateMap, id) } h.mu.Unlock() - oldCache := h.statsCache.Load().(statsCache) - h.updateStatsCache(oldCache.update(tbls, nil, oldCache.version)) + h.statsCache.Update(tbls, nil, h.statsCache.GetVersion()) } // HandleUpdateStats update the stats using feedback. @@ -899,7 +897,7 @@ func logForIndex(prefix string, t *statistics.Table, idx *statistics.Index, rang } func (h *Handle) logDetailedInfo(q *statistics.QueryFeedback) { - t, ok := h.statsCache.Load().(statsCache).tables[q.PhysicalID] + t, ok := h.statsCache.Lookup(q.PhysicalID) if !ok { return } @@ -940,7 +938,7 @@ func logForPK(prefix string, c *statistics.Column, ranges []*ranger.Range, actua // RecalculateExpectCount recalculates the expect row count if the origin row count is estimated by pseudo. func (h *Handle) RecalculateExpectCount(q *statistics.QueryFeedback) error { - t, ok := h.statsCache.Load().(statsCache).tables[q.PhysicalID] + t, ok := h.statsCache.Lookup(q.PhysicalID) if !ok { return nil } diff --git a/statistics/handle/update_test.go b/statistics/handle/update_test.go index 4c41057699a65..67d1ef7a1b063 100644 --- a/statistics/handle/update_test.go +++ b/statistics/handle/update_test.go @@ -532,7 +532,7 @@ func (s *testStatsSuite) TestTableAnalyzed(c *C) { statsTbl = h.GetTableStats(tableInfo) c.Assert(handle.TableAnalyzed(statsTbl), IsTrue) - h.Clear() + h.Clear4Test() oriLease := h.Lease() // set it to non-zero so we will use load by need strategy h.SetLease(1) @@ -1733,7 +1733,7 @@ func (s *testStatsSuite) TestLoadHistCorrelation(c *C) { testKit.MustExec("insert into t values(1),(2),(3),(4),(5)") c.Assert(h.DumpStatsDeltaToKV(handle.DumpAll), IsNil) testKit.MustExec("analyze table t") - h.Clear() + h.Clear4Test() c.Assert(h.Update(s.do.InfoSchema()), IsNil) result := testKit.MustQuery("show stats_histograms where Table_name = 't'") c.Assert(len(result.Rows()), Equals, 0) diff --git a/statistics/histogram.go b/statistics/histogram.go index 8ae95e64cb692..851f5a62eac1d 100644 --- a/statistics/histogram.go +++ b/statistics/histogram.go @@ -122,7 +122,10 @@ func (hg *Histogram) MemoryUsage() (sum int64) { if hg == nil { return } - sum = hg.Bounds.MemoryUsage() + int64(cap(hg.Buckets)*int(unsafe.Sizeof(Bucket{}))) + int64(cap(hg.scalars)*int(unsafe.Sizeof(scalar{}))) + //let the initial sum = 0 + sum = hg.Bounds.MemoryUsage() - chunk.NewChunkWithCapacity([]*types.FieldType{hg.Tp}, 0).MemoryUsage() + sum = sum + int64(cap(hg.Buckets)*int(unsafe.Sizeof(Bucket{}))) + int64(cap(hg.scalars)*int(unsafe.Sizeof(scalar{}))) + return } @@ -897,6 +900,7 @@ type Index struct { StatsVer int64 // StatsVer is the version of the current stats, used to maintain compatibility Info *model.IndexInfo Flag int64 + PhysicalID int64 // PhysicalID for lazy load LastAnalyzePos types.Datum } @@ -904,9 +908,22 @@ func (idx *Index) String() string { return idx.Histogram.ToString(len(idx.Info.Columns)) } -// IsInvalid checks if this index is invalid. -func (idx *Index) IsInvalid(collPseudo bool) bool { - return (collPseudo && idx.NotAccurate()) || idx.TotalRowCount() == 0 +// HistogramNeededIndices stores the Index whose Histograms need to be loaded from physical kv layer. +// Currently, we only load index/pk's Histogram from kv automatically. Columns' are loaded by needs. +var HistogramNeededIndices = neededIndexMap{idxs: map[tableIndexID]struct{}{}} + +// IsInvalid checks if this Index is invalid. +// If this Index has histogram but not loaded yet, then we mark it +// as need Index. +func (idx *Index) IsInvalid(sc *stmtctx.StatementContext, collPseudo bool) bool { + if collPseudo && idx.NotAccurate() { + return true + } + if idx.NDV > 0 && idx.Len() == 0 && sc != nil { + sc.SetHistogramsNotLoad() + HistogramNeededIndices.insert(tableIndexID{TableID: idx.PhysicalID, IndexID: idx.Info.ID}) + } + return idx.TotalRowCount() == 0 || (idx.NDV > 0 && idx.Len() == 0) } // MemoryUsage returns the total memory usage of a Histogram and CMSketch in Index. diff --git a/statistics/selectivity_test.go b/statistics/selectivity_test.go index 69ad9e178f4fb..22724876ee80c 100644 --- a/statistics/selectivity_test.go +++ b/statistics/selectivity_test.go @@ -146,7 +146,7 @@ func cleanEnv(c *C, store kv.Storage, do *domain.Domain) { tk.MustExec("delete from mysql.stats_meta") tk.MustExec("delete from mysql.stats_histograms") tk.MustExec("delete from mysql.stats_buckets") - do.StatsHandle().Clear() + do.StatsHandle().Clear4Test() } // generateIntDatum will generate a datum slice, every dimension is begin from 0, end with num - 1. diff --git a/statistics/table.go b/statistics/table.go index e080e755b1061..d32e6eafd6d51 100644 --- a/statistics/table.go +++ b/statistics/table.go @@ -14,6 +14,7 @@ package statistics import ( + "encoding/binary" "fmt" "math" "sort" @@ -22,6 +23,7 @@ import ( "github.com/cznic/mathutil" "github.com/pingcap/errors" + "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/expression" @@ -56,6 +58,27 @@ const ( PseudoRowCount = 10000 ) +// CMSketchSizeLimit indicates the max width and depth of CMSketch. +var CMSketchSizeLimit = kv.TxnEntrySizeLimit / binary.MaxVarintLen32 + +// AnalyzeOptionLimit indicates the upper bound of some attribute. +var AnalyzeOptionLimit = map[ast.AnalyzeOptionType]uint64{ + ast.AnalyzeOptNumBuckets: 1024, + ast.AnalyzeOptNumTopN: 1024, + ast.AnalyzeOptCMSketchWidth: CMSketchSizeLimit, + ast.AnalyzeOptCMSketchDepth: CMSketchSizeLimit, + ast.AnalyzeOptNumSamples: 100000, +} + +// AnalyzeOptionDefault indicates the default values of some attributes. +var AnalyzeOptionDefault = map[ast.AnalyzeOptionType]uint64{ + ast.AnalyzeOptNumBuckets: 256, + ast.AnalyzeOptNumTopN: 20, + ast.AnalyzeOptCMSketchWidth: 2048, + ast.AnalyzeOptCMSketchDepth: 5, + ast.AnalyzeOptNumSamples: 10000, +} + // Table represents statistics for a table. type Table struct { HistColl @@ -159,6 +182,48 @@ func (t *Table) Copy() *Table { return nt } +// CopyWithoutBucketsAndCMS copies the current table only with metadata. +func (t *Table) CopyWithoutBucketsAndCMS() *Table { + newHistColl := HistColl{ + PhysicalID: t.PhysicalID, + HavePhysicalID: t.HavePhysicalID, + Count: t.Count, + Columns: make(map[int64]*Column, len(t.Columns)), + Indices: make(map[int64]*Index, len(t.Indices)), + Pseudo: t.Pseudo, + ModifyCount: t.ModifyCount, + } + for id, col := range t.Columns { + oldHg := &col.Histogram + newHg := NewHistogram(oldHg.ID, oldHg.NDV, oldHg.NullCount, oldHg.LastUpdateVersion, oldHg.Tp, 0, oldHg.TotColSize) + newHistColl.Columns[id] = &Column{ + Histogram: *newHg, + PhysicalID: col.PhysicalID, + Info: col.Info, + Count: col.Count, + IsHandle: col.IsHandle, + Flag: col.Flag, + } + } + for id, idx := range t.Indices { + oldHg := &idx.Histogram + newHg := NewHistogram(oldHg.ID, oldHg.NDV, oldHg.NullCount, oldHg.LastUpdateVersion, oldHg.Tp, 0, oldHg.TotColSize) + newHistColl.Indices[id] = &Index{ + Histogram: *newHg, + PhysicalID: idx.PhysicalID, + Info: idx.Info, + StatsVer: idx.StatsVer, + Flag: idx.Flag, + } + } + nt := &Table{ + HistColl: newHistColl, + Version: t.Version, + Name: t.Name, + } + return nt +} + // String implements Stringer interface. func (t *Table) String() string { strs := make([]string, 0, len(t.Columns)+1) @@ -235,6 +300,40 @@ func (n *neededColumnMap) Delete(col tableColumnID) { n.m.Unlock() } +type tableIndexID struct { + TableID int64 + IndexID int64 +} + +type neededIndexMap struct { + m sync.Mutex + idxs map[tableIndexID]struct{} +} + +// AllIdxs returns all the idx with an array +func (n *neededIndexMap) AllIdxs() []tableIndexID { + n.m.Lock() + keys := make([]tableIndexID, 0, len(n.idxs)) + for key := range n.idxs { + keys = append(keys, key) + } + n.m.Unlock() + return keys +} + +func (n *neededIndexMap) insert(idx tableIndexID) { + n.m.Lock() + n.idxs[idx] = struct{}{} + n.m.Unlock() +} + +// Delete delete a idx from idxs +func (n *neededIndexMap) Delete(idx tableIndexID) { + n.m.Lock() + delete(n.idxs, idx) + n.m.Unlock() +} + // RatioOfPseudoEstimate means if modifyCount / statsTblCount is greater than this ratio, we think the stats is invalid // and use pseudo estimation. var RatioOfPseudoEstimate = atomic.NewFloat64(0.7) @@ -320,7 +419,7 @@ func (coll *HistColl) GetRowCountByColumnRanges(sc *stmtctx.StatementContext, co // GetRowCountByIndexRanges estimates the row count by a slice of Range. func (coll *HistColl) GetRowCountByIndexRanges(sc *stmtctx.StatementContext, idxID int64, indexRanges []*ranger.Range) (float64, error) { idx := coll.Indices[idxID] - if idx == nil || idx.IsInvalid(coll.Pseudo) { + if idx == nil || idx.IsInvalid(sc, coll.Pseudo) { colsLen := -1 if idx != nil && idx.Info.Unique { colsLen = len(idx.Info.Columns) diff --git a/tidb-server/main.go b/tidb-server/main.go index b73988b1c0bf3..33fdfb10e392e 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -556,6 +556,7 @@ func setGlobalVars() { variable.SetSysVar(variable.TiDBForcePriority, mysql.Priority2Str[priority]) variable.SetSysVar(variable.TiDBOptDistinctAggPushDown, variable.BoolToIntStr(cfg.Performance.DistinctAggPushDown)) variable.SetSysVar(variable.TIDBMemQuotaQuery, strconv.FormatInt(cfg.MemQuotaQuery, 10)) + variable.SetSysVar(variable.TIDBMemQuotaStatistics, strconv.FormatInt(cfg.MemQuotaStatistics, 10)) variable.SetSysVar("lower_case_table_names", strconv.Itoa(cfg.LowerCaseTableNames)) variable.SetSysVar(variable.LogBin, variable.BoolToIntStr(config.GetGlobalConfig().Binlog.Enable)) variable.SetSysVar(variable.Port, fmt.Sprintf("%d", cfg.Port)) diff --git a/util/kvcache/simple_lru.go b/util/kvcache/simple_lru.go index 46e19ecc5c712..58c0cdb899d39 100644 --- a/util/kvcache/simple_lru.go +++ b/util/kvcache/simple_lru.go @@ -76,6 +76,17 @@ func NewSimpleLRUCache(capacity uint, guard float64, quota uint64) *SimpleLRUCac } } +// GetAll try to get all value. +func (l *SimpleLRUCache) GetAll() []interface{} { + values := make([]interface{}, 0) + for _, v := range l.elements { + if nv, ok := v.Value.(*cacheEntry); ok { + values = append(values, nv.value) + } + } + return values +} + // Get tries to find the corresponding value according to the given key. func (l *SimpleLRUCache) Get(key Key) (value Value, ok bool) { element, exists := l.elements[string(key.Hash())]