From bb49dc17021924d5880bee0c7be06cce44b49a85 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Tue, 19 Sep 2023 23:48:14 +0800 Subject: [PATCH] statstics: reuse fmsketch (#47070) close pingcap/tidb#47071 --- executor/BUILD.bazel | 1 + executor/analyze_global_stats.go | 9 +++- statistics/fmsketch.go | 49 +++++++++++++------ statistics/handle/globalstats/global_stats.go | 25 ++++++---- statistics/table.go | 12 +++++ 5 files changed, 70 insertions(+), 26 deletions(-) diff --git a/executor/BUILD.bazel b/executor/BUILD.bazel index cca4a87b7fc24..57f64bfc4ea7b 100644 --- a/executor/BUILD.bazel +++ b/executor/BUILD.bazel @@ -275,6 +275,7 @@ go_library( "@org_golang_google_grpc//credentials", "@org_golang_google_grpc//credentials/insecure", "@org_golang_google_grpc//status", + "@org_golang_x_exp//maps", "@org_golang_x_sync//errgroup", "@org_uber_go_atomic//:atomic", "@org_uber_go_zap//:zap", diff --git a/executor/analyze_global_stats.go b/executor/analyze_global_stats.go index 48cdd100c12c7..16c6a1fb901a1 100644 --- a/executor/analyze_global_stats.go +++ b/executor/analyze_global_stats.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" + "golang.org/x/exp/maps" ) type globalStatsKey struct { @@ -53,10 +54,10 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, globalStatsMap glob statsHandle := domain.GetDomain(e.Ctx()).StatsHandle() tableIDs := make(map[int64]struct{}, len(globalStatsTableIDs)) - + tableAllPartitionStats := make(map[int64]*statistics.Table) for tableID := range globalStatsTableIDs { tableIDs[tableID] = struct{}{} - tableAllPartitionStats := make(map[int64]*statistics.Table) + maps.Clear(tableAllPartitionStats) for globalStatsID, info := range globalStatsMap { if globalStatsID.tableID != tableID { @@ -126,6 +127,10 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, globalStatsMap glob FinishAnalyzeMergeJob(e.Ctx(), job, mergeStatsErr) } + + for _, value := range tableAllPartitionStats { + value.ReleaseAndPutToPool() + } } for tableID := range tableIDs { diff --git a/statistics/fmsketch.go b/statistics/fmsketch.go index 6a3c0c89cb9de..485df1d98cc9b 100644 --- a/statistics/fmsketch.go +++ b/statistics/fmsketch.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tipb/go-tipb" "github.com/twmb/murmur3" + "golang.org/x/exp/maps" ) var murmur3Pool = sync.Pool{ @@ -32,6 +33,15 @@ var murmur3Pool = sync.Pool{ }, } +var fmSketchPool = sync.Pool{ + New: func() any { + return &FMSketch{ + hashset: make(map[uint64]bool), + maxSize: 0, + } + }, +} + // FMSketch is used to count the number of distinct elements in a set. type FMSketch struct { hashset map[uint64]bool @@ -41,10 +51,9 @@ type FMSketch struct { // NewFMSketch returns a new FM sketch. func NewFMSketch(maxSize int) *FMSketch { - return &FMSketch{ - hashset: make(map[uint64]bool), - maxSize: maxSize, - } + result := fmSketchPool.Get().(*FMSketch) + result.maxSize = maxSize + return result } // Copy makes a copy for current FMSketch. @@ -52,15 +61,12 @@ func (s *FMSketch) Copy() *FMSketch { if s == nil { return nil } - hashset := make(map[uint64]bool) + result := NewFMSketch(s.maxSize) for key, value := range s.hashset { - hashset[key] = value - } - return &FMSketch{ - hashset: hashset, - mask: s.mask, - maxSize: s.maxSize, + result.hashset[key] = value } + result.mask = s.mask + return result } // NDV returns the ndv of the sketch. @@ -159,10 +165,8 @@ func FMSketchFromProto(protoSketch *tipb.FMSketch) *FMSketch { if protoSketch == nil { return nil } - sketch := &FMSketch{ - hashset: make(map[uint64]bool, len(protoSketch.Hashset)), - mask: protoSketch.Mask, - } + sketch := fmSketchPool.Get().(*FMSketch) + sketch.mask = protoSketch.Mask for _, val := range protoSketch.Hashset { sketch.hashset[val] = true } @@ -201,3 +205,18 @@ func (s *FMSketch) MemoryUsage() (sum int64) { sum = int64(16 + 9*len(s.hashset)) return } + +func (s *FMSketch) reset() { + maps.Clear(s.hashset) + s.mask = 0 + s.maxSize = 0 +} + +// DestroyAndPutToPool resets the FMSketch and puts it to the pool. +func (s *FMSketch) DestroyAndPutToPool() { + if s == nil { + return + } + s.reset() + fmSketchPool.Put(s) +} diff --git a/statistics/handle/globalstats/global_stats.go b/statistics/handle/globalstats/global_stats.go index 67d15ade3d00c..d72d3dd30fa96 100644 --- a/statistics/handle/globalstats/global_stats.go +++ b/statistics/handle/globalstats/global_stats.go @@ -93,6 +93,12 @@ func (g *GlobalStatusHandler) MergePartitionStats2GlobalStats( loadTablePartitionStatsFn loadTablePartitionStatsFunc, ) (globalStats *GlobalStats, err error) { partitionNum := len(globalTableInfo.Partition.Definitions) + externalCache := false + if allPartitionStats == nil { + allPartitionStats = make(map[int64]*statistics.Table) + } else { + externalCache = true + } if len(histIDs) == 0 { for _, col := range globalTableInfo.Columns { // The virtual generated column stats can not be merged to the global stats. @@ -133,12 +139,10 @@ func (g *GlobalStatusHandler) MergePartitionStats2GlobalStats( tableInfo := partitionTable.Meta() var partitionStats *statistics.Table - if allPartitionStats != nil { - partitionStats, ok = allPartitionStats[partitionID] - } - // If preload partition stats isn't provided, then we load partition stats directly and set it into allPartitionStats. - if allPartitionStats == nil || partitionStats == nil || !ok { + partitionStats, ok = allPartitionStats[partitionID] + // If pre-load partition stats isn't provided, then we load partition stats directly and set it into allPartitionStats + if !ok { var err1 error partitionStats, err1 = loadTablePartitionStatsFn(tableInfo, &def) if err1 != nil { @@ -149,9 +153,6 @@ func (g *GlobalStatusHandler) MergePartitionStats2GlobalStats( err = err1 return } - if allPartitionStats == nil { - allPartitionStats = make(map[int64]*statistics.Table) - } allPartitionStats[partitionID] = partitionStats } @@ -249,6 +250,7 @@ func (g *GlobalStatusHandler) MergePartitionStats2GlobalStats( globalStats.Fms[i] = allFms[i][0].Copy() for j := 1; j < len(allFms[i]); j++ { globalStats.Fms[i].MergeFMSketch(allFms[i][j]) + allFms[i][j].DestroyAndPutToPool() } // Update the global NDV. @@ -256,9 +258,14 @@ func (g *GlobalStatusHandler) MergePartitionStats2GlobalStats( if globalStatsNDV > globalStats.Count { globalStatsNDV = globalStats.Count } + globalStats.Fms[i].DestroyAndPutToPool() globalStats.Hg[i].NDV = globalStatsNDV } - + if !externalCache { + for _, value := range allPartitionStats { + value.ReleaseAndPutToPool() + } + } return } diff --git a/statistics/table.go b/statistics/table.go index 6e9bb620addd6..a5d44331db76d 100644 --- a/statistics/table.go +++ b/statistics/table.go @@ -505,6 +505,18 @@ func (t *Table) IsOutdated() bool { return false } +// ReleaseAndPutToPool releases data structures of Table and put itself back to pool. +func (t *Table) ReleaseAndPutToPool() { + for _, col := range t.Columns { + col.FMSketch.DestroyAndPutToPool() + } + maps.Clear(t.Columns) + for _, idx := range t.Indices { + idx.FMSketch.DestroyAndPutToPool() + } + maps.Clear(t.Indices) +} + // ID2UniqueID generates a new HistColl whose `Columns` is built from UniqueID of given columns. func (coll *HistColl) ID2UniqueID(columns []*expression.Column) *HistColl { cols := make(map[int64]*Column)