Skip to content

Commit

Permalink
statstics: reuse fmsketch (#47070)
Browse files Browse the repository at this point in the history
close #47071
  • Loading branch information
hawkingrei authored Sep 19, 2023
1 parent b3ec110 commit bb49dc1
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 26 deletions.
1 change: 1 addition & 0 deletions executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ go_library(
"@org_golang_google_grpc//credentials",
"@org_golang_google_grpc//credentials/insecure",
"@org_golang_google_grpc//status",
"@org_golang_x_exp//maps",
"@org_golang_x_sync//errgroup",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
Expand Down
9 changes: 7 additions & 2 deletions executor/analyze_global_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)

type globalStatsKey struct {
Expand Down Expand Up @@ -53,10 +54,10 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, globalStatsMap glob

statsHandle := domain.GetDomain(e.Ctx()).StatsHandle()
tableIDs := make(map[int64]struct{}, len(globalStatsTableIDs))

tableAllPartitionStats := make(map[int64]*statistics.Table)
for tableID := range globalStatsTableIDs {
tableIDs[tableID] = struct{}{}
tableAllPartitionStats := make(map[int64]*statistics.Table)
maps.Clear(tableAllPartitionStats)

for globalStatsID, info := range globalStatsMap {
if globalStatsID.tableID != tableID {
Expand Down Expand Up @@ -126,6 +127,10 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, globalStatsMap glob

FinishAnalyzeMergeJob(e.Ctx(), job, mergeStatsErr)
}

for _, value := range tableAllPartitionStats {
value.ReleaseAndPutToPool()
}
}

for tableID := range tableIDs {
Expand Down
49 changes: 34 additions & 15 deletions statistics/fmsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tipb/go-tipb"
"github.com/twmb/murmur3"
"golang.org/x/exp/maps"
)

var murmur3Pool = sync.Pool{
Expand All @@ -32,6 +33,15 @@ var murmur3Pool = sync.Pool{
},
}

var fmSketchPool = sync.Pool{
New: func() any {
return &FMSketch{
hashset: make(map[uint64]bool),
maxSize: 0,
}
},
}

// FMSketch is used to count the number of distinct elements in a set.
type FMSketch struct {
hashset map[uint64]bool
Expand All @@ -41,26 +51,22 @@ type FMSketch struct {

// NewFMSketch returns a new FM sketch.
func NewFMSketch(maxSize int) *FMSketch {
return &FMSketch{
hashset: make(map[uint64]bool),
maxSize: maxSize,
}
result := fmSketchPool.Get().(*FMSketch)
result.maxSize = maxSize
return result
}

// Copy makes a copy for current FMSketch.
func (s *FMSketch) Copy() *FMSketch {
if s == nil {
return nil
}
hashset := make(map[uint64]bool)
result := NewFMSketch(s.maxSize)
for key, value := range s.hashset {
hashset[key] = value
}
return &FMSketch{
hashset: hashset,
mask: s.mask,
maxSize: s.maxSize,
result.hashset[key] = value
}
result.mask = s.mask
return result
}

// NDV returns the ndv of the sketch.
Expand Down Expand Up @@ -159,10 +165,8 @@ func FMSketchFromProto(protoSketch *tipb.FMSketch) *FMSketch {
if protoSketch == nil {
return nil
}
sketch := &FMSketch{
hashset: make(map[uint64]bool, len(protoSketch.Hashset)),
mask: protoSketch.Mask,
}
sketch := fmSketchPool.Get().(*FMSketch)
sketch.mask = protoSketch.Mask
for _, val := range protoSketch.Hashset {
sketch.hashset[val] = true
}
Expand Down Expand Up @@ -201,3 +205,18 @@ func (s *FMSketch) MemoryUsage() (sum int64) {
sum = int64(16 + 9*len(s.hashset))
return
}

func (s *FMSketch) reset() {
maps.Clear(s.hashset)
s.mask = 0
s.maxSize = 0
}

// DestroyAndPutToPool resets the FMSketch and puts it to the pool.
func (s *FMSketch) DestroyAndPutToPool() {
if s == nil {
return
}
s.reset()
fmSketchPool.Put(s)
}
25 changes: 16 additions & 9 deletions statistics/handle/globalstats/global_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ func (g *GlobalStatusHandler) MergePartitionStats2GlobalStats(
loadTablePartitionStatsFn loadTablePartitionStatsFunc,
) (globalStats *GlobalStats, err error) {
partitionNum := len(globalTableInfo.Partition.Definitions)
externalCache := false
if allPartitionStats == nil {
allPartitionStats = make(map[int64]*statistics.Table)
} else {
externalCache = true
}
if len(histIDs) == 0 {
for _, col := range globalTableInfo.Columns {
// The virtual generated column stats can not be merged to the global stats.
Expand Down Expand Up @@ -133,12 +139,10 @@ func (g *GlobalStatusHandler) MergePartitionStats2GlobalStats(

tableInfo := partitionTable.Meta()
var partitionStats *statistics.Table
if allPartitionStats != nil {
partitionStats, ok = allPartitionStats[partitionID]
}

// If preload partition stats isn't provided, then we load partition stats directly and set it into allPartitionStats.
if allPartitionStats == nil || partitionStats == nil || !ok {
partitionStats, ok = allPartitionStats[partitionID]
// If pre-load partition stats isn't provided, then we load partition stats directly and set it into allPartitionStats
if !ok {
var err1 error
partitionStats, err1 = loadTablePartitionStatsFn(tableInfo, &def)
if err1 != nil {
Expand All @@ -149,9 +153,6 @@ func (g *GlobalStatusHandler) MergePartitionStats2GlobalStats(
err = err1
return
}
if allPartitionStats == nil {
allPartitionStats = make(map[int64]*statistics.Table)
}
allPartitionStats[partitionID] = partitionStats
}

Expand Down Expand Up @@ -249,16 +250,22 @@ func (g *GlobalStatusHandler) MergePartitionStats2GlobalStats(
globalStats.Fms[i] = allFms[i][0].Copy()
for j := 1; j < len(allFms[i]); j++ {
globalStats.Fms[i].MergeFMSketch(allFms[i][j])
allFms[i][j].DestroyAndPutToPool()
}

// Update the global NDV.
globalStatsNDV := globalStats.Fms[i].NDV()
if globalStatsNDV > globalStats.Count {
globalStatsNDV = globalStats.Count
}
globalStats.Fms[i].DestroyAndPutToPool()
globalStats.Hg[i].NDV = globalStatsNDV
}

if !externalCache {
for _, value := range allPartitionStats {
value.ReleaseAndPutToPool()
}
}
return
}

Expand Down
12 changes: 12 additions & 0 deletions statistics/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,18 @@ func (t *Table) IsOutdated() bool {
return false
}

// ReleaseAndPutToPool releases data structures of Table and put itself back to pool.
func (t *Table) ReleaseAndPutToPool() {
for _, col := range t.Columns {
col.FMSketch.DestroyAndPutToPool()
}
maps.Clear(t.Columns)
for _, idx := range t.Indices {
idx.FMSketch.DestroyAndPutToPool()
}
maps.Clear(t.Indices)
}

// ID2UniqueID generates a new HistColl whose `Columns` is built from UniqueID of given columns.
func (coll *HistColl) ID2UniqueID(columns []*expression.Column) *HistColl {
cols := make(map[int64]*Column)
Expand Down

0 comments on commit bb49dc1

Please sign in to comment.