Skip to content

Commit

Permalink
planner: simplify StatsCacheInner (pingcap#45256)
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored Jul 10, 2023
1 parent 3443e52 commit cac0db2
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 76 deletions.
4 changes: 0 additions & 4 deletions statistics/handle/cache/internal/inner.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,10 @@ type StatsCacheInner interface {
Values() []*statistics.Table
// Len returns the length of the cache.
Len() int
FreshMemUsage()
// Copy returns a copy of the cache
Copy() StatsCacheInner
// SetCapacity sets the capacity of the cache
SetCapacity(int64)
// Version returns the version of the current cache, which is defined as
// the max table stats version the cache has in its lifecycle.
Version() (maxTableStatsVersion uint64)

// Front returns the front element's owner tableID, only used for test
// TODO: this method is mainly for test, remove it in the future.
Expand Down
21 changes: 0 additions & 21 deletions statistics/handle/cache/internal/lru/lru_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ type StatsInnerCache struct {
// lru maintains item lru cache
lru *innerItemLruCache
sync.RWMutex
maxTableStatsVersion uint64
}

// NewStatsLruCache creates a new LRU cache for statistics.
Expand Down Expand Up @@ -129,9 +128,6 @@ func (s *StatsInnerCache) Put(tblID int64, tbl *statistics.Table) {
}

func (s *StatsInnerCache) put(tblID int64, tbl *statistics.Table, tblMemUsage *statistics.TableMemoryUsage, needMove bool) {
if tbl.Version > s.maxTableStatsVersion {
s.maxTableStatsVersion = tbl.Version
}
element, exist := s.elements[tblID]
if exist {
s.updateColumns(tblID, tbl, tblMemUsage, needMove)
Expand Down Expand Up @@ -270,15 +266,6 @@ func (s *StatsInnerCache) Len() int {
return len(s.elements)
}

// FreshMemUsage implements statsCacheInner
func (s *StatsInnerCache) FreshMemUsage() {
s.Lock()
defer s.Unlock()
for tblID, element := range s.elements {
s.freshTableCost(tblID, element)
}
}

// Copy implements statsCacheInner
func (s *StatsInnerCache) Copy() internal.StatsCacheInner {
s.RLock()
Expand All @@ -289,17 +276,9 @@ func (s *StatsInnerCache) Copy() internal.StatsCacheInner {
newCache.elements[tblID] = element.copy()
}
newCache.lru.onEvict = newCache.onEvict
newCache.maxTableStatsVersion = s.maxTableStatsVersion
return newCache
}

// Version implements statsCacheInner
func (s *StatsInnerCache) Version() uint64 {
s.RLock()
defer s.RUnlock()
return s.maxTableStatsVersion
}

// SetCapacity implements statsCacheInner
func (s *StatsInnerCache) SetCapacity(c int64) {
s.Lock()
Expand Down
4 changes: 2 additions & 2 deletions statistics/handle/cache/internal/lru/lru_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,10 @@ func TestLRUFreshMemUsage(t *testing.T) {
lru.Put(int64(3), t3)
require.Equal(t, lru.Cost(), 6*mockCMSMemoryUsage+6*mockCMSMemoryUsage)
mockTableAppendColumn(t1)
lru.FreshMemUsage()
lru.Put(int64(1), t1)
require.Equal(t, lru.Cost(), 6*mockCMSMemoryUsage+7*mockCMSMemoryUsage)
mockTableAppendIndex(t1)
lru.FreshMemUsage()
lru.Put(int64(1), t1)
require.Equal(t, lru.Cost(), 7*mockCMSMemoryUsage+7*mockCMSMemoryUsage)

mockTableRemoveColumn(t1)
Expand Down
27 changes: 4 additions & 23 deletions statistics/handle/cache/internal/mapcache/map_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@ func (c cacheItem) copy() cacheItem {

// MapCache is a cache based on map.
type MapCache struct {
tables map[int64]cacheItem
memUsage int64
maxTableStatsVersion uint64
tables map[int64]cacheItem
memUsage int64
}

// NewMapCache creates a new map cache.
Expand Down Expand Up @@ -66,9 +65,6 @@ func (m *MapCache) PutByQuery(k int64, v *statistics.Table) {

// Put implements StatsCacheInner
func (m *MapCache) Put(k int64, v *statistics.Table) {
if v.Version > m.maxTableStatsVersion {
m.maxTableStatsVersion = v.Version
}
item, ok := m.tables[k]
if ok {
oldCost := item.cost
Expand Down Expand Up @@ -136,21 +132,11 @@ func (m *MapCache) Len() int {
return len(m.tables)
}

// FreshMemUsage implements StatsCacheInner
func (m *MapCache) FreshMemUsage() {
for _, v := range m.tables {
oldCost := v.cost
newCost := v.value.MemoryUsage().TotalMemUsage
m.memUsage += newCost - oldCost
}
}

// Copy implements StatsCacheInner
func (m *MapCache) Copy() internal.StatsCacheInner {
newM := &MapCache{
tables: make(map[int64]cacheItem, len(m.tables)),
memUsage: m.memUsage,
maxTableStatsVersion: m.maxTableStatsVersion,
tables: make(map[int64]cacheItem, len(m.tables)),
memUsage: m.memUsage,
}
for k, v := range m.tables {
newM.tables[k] = v.copy()
Expand All @@ -165,8 +151,3 @@ func (*MapCache) SetCapacity(int64) {}
func (*MapCache) Front() int64 {
return 0
}

// Version implements StatsCacheInner
func (m *MapCache) Version() uint64 {
return m.maxTableStatsVersion
}
53 changes: 36 additions & 17 deletions statistics/handle/cache/statscacheinner.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/pingcap/tidb/config"
Expand Down Expand Up @@ -72,6 +73,8 @@ func NewStatsCache() *StatsCache {
// StatsCache caches the tables in memory for Handle.
type StatsCache struct {
c internal.StatsCacheInner
// the max table stats version the cache has in its lifecycle.
maxTblStatsVer atomic.Uint64
}

// Len returns the number of tables in the cache.
Expand All @@ -80,6 +83,9 @@ func (sc *StatsCache) Len() int {
}

// Get returns the statistics of the specified Table ID.
// The returned value should be read-only, if you update it, don't forget to use Put to put it back again, otherwise the memory trace can be inaccurate.
//
// e.g. v := sc.Get(id); /* update the value */ v.Version = 123; sc.Put(id, v);
func (sc *StatsCache) Get(id int64) (*statistics.Table, bool) {
return sc.c.Get(id)
}
Expand All @@ -93,6 +99,13 @@ func (sc *StatsCache) GetByQuery(id int64) (*statistics.Table, bool) {
// Put puts the table statistics to the cache.
func (sc *StatsCache) Put(id int64, t *statistics.Table) {
sc.c.Put(id, t)

// update the maxTblStatsVer
for v := sc.maxTblStatsVer.Load(); v < t.Version; v = sc.maxTblStatsVer.Load() {
if sc.maxTblStatsVer.CompareAndSwap(v, t.Version) {
break
} // other goroutines have updated the sc.maxTblStatsVer, so we need to check again.
}
}

// Values returns all the cached statistics tables.
Expand All @@ -101,8 +114,13 @@ func (sc *StatsCache) Values() []*statistics.Table {
}

// FreshMemUsage refreshes the memory usage of the cache.
// Values in StatsCache should be read-only, but when initializing the cache, some values can be modified.
// To make the memory cost more accurate, we need to refresh the memory usage of the cache after finishing the initialization.
func (sc *StatsCache) FreshMemUsage() {
sc.c.FreshMemUsage()
values := sc.c.Values()
for _, v := range values {
sc.c.Put(v.PhysicalID, v)
}
}

// Cost returns the memory usage of the cache.
Expand All @@ -118,40 +136,41 @@ func (sc *StatsCache) SetCapacity(c int64) {
// Version returns the version of the current cache, which is defined as
// the max table stats version the cache has in its lifecycle.
func (sc *StatsCache) Version() uint64 {
return sc.c.Version()
return sc.maxTblStatsVer.Load()
}

// Front returns the front element's owner tableID, only used for test.
func (sc *StatsCache) Front() int64 {
return sc.c.Front()
}

// Update updates the statistics table cache using Copy on write.
func (sc *StatsCache) Update(tables []*statistics.Table, deletedIDs []int64, opts ...TableStatsOpt) *StatsCache {
// CopyAndUpdate copies a new cache and updates the new statistics table cache.
func (sc *StatsCache) CopyAndUpdate(tables []*statistics.Table, deletedIDs []int64, opts ...TableStatsOpt) *StatsCache {
option := &TableStatsOption{}
for _, opt := range opts {
opt(option)
}
newCache := &StatsCache{}
newCache.c = CopyAndUpdateStatsCache(sc.c, tables, deletedIDs, option.byQuery)
return newCache
}

// CopyAndUpdateStatsCache copies the base cache and applies some updates to the new copied cache, the base cache should keep unchanged.
func CopyAndUpdateStatsCache(base internal.StatsCacheInner, tables []*statistics.Table, deletedIDs []int64, byQuery bool) internal.StatsCacheInner {
c := base.Copy()
newCache := &StatsCache{c: sc.c.Copy()}
newCache.maxTblStatsVer.Store(sc.maxTblStatsVer.Load())
for _, tbl := range tables {
id := tbl.PhysicalID
if byQuery {
c.PutByQuery(id, tbl)
if option.byQuery {
newCache.c.PutByQuery(id, tbl)
} else {
c.Put(id, tbl)
newCache.c.Put(id, tbl)
}
}
for _, id := range deletedIDs {
c.Del(id)
newCache.c.Del(id)
}
return c

// update the maxTblStatsVer
for _, t := range tables {
if t.Version > newCache.maxTblStatsVer.Load() {
newCache.maxTblStatsVer.Store(t.Version)
}
}
return newCache
}

// TableRowStatsCache is the cache of table row count.
Expand Down
12 changes: 6 additions & 6 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ func (h *Handle) Update(is infoschema.InfoSchema, opts ...cache.TableStatsOpt) e
tbl.TblInfoUpdateTS = tableInfo.UpdateTS
tables = append(tables, tbl)
}
h.updateStatsCache(oldCache.Update(tables, deletedTableIDs, opts...))
h.updateStatsCache(oldCache.CopyAndUpdate(tables, deletedTableIDs, opts...))
return nil
}

Expand Down Expand Up @@ -996,7 +996,7 @@ func (h *Handle) GetPartitionStats(tblInfo *model.TableInfo, pid int64, opts ...
tbl = statistics.PseudoTable(tblInfo)
tbl.PhysicalID = pid
if tblInfo.GetPartitionInfo() == nil || h.statsCacheLen() < 64 {
h.updateStatsCache(statsCache.Update([]*statistics.Table{tbl}, nil))
h.updateStatsCache(statsCache.CopyAndUpdate([]*statistics.Table{tbl}, nil))
}
return tbl
}
Expand Down Expand Up @@ -1102,7 +1102,7 @@ func (h *Handle) loadNeededColumnHistograms(reader *statistics.StatsReader, col
}
tbl = tbl.Copy()
tbl.Columns[c.ID] = colHist
if h.updateStatsCache(oldCache.Update([]*statistics.Table{tbl}, nil)) {
if h.updateStatsCache(oldCache.CopyAndUpdate([]*statistics.Table{tbl}, nil)) {
statistics.HistogramNeededItems.Delete(col)
}
return nil
Expand Down Expand Up @@ -1155,7 +1155,7 @@ func (h *Handle) loadNeededIndexHistograms(reader *statistics.StatsReader, idx m
}
tbl = tbl.Copy()
tbl.Indices[idx.ID] = idxHist
if h.updateStatsCache(oldCache.Update([]*statistics.Table{tbl}, nil)) {
if h.updateStatsCache(oldCache.CopyAndUpdate([]*statistics.Table{tbl}, nil)) {
statistics.HistogramNeededItems.Delete(idx)
}
return nil
Expand Down Expand Up @@ -1798,7 +1798,7 @@ func (h *Handle) removeExtendedStatsItem(tableID int64, statsName string) {
}
newTbl := tbl.Copy()
delete(newTbl.ExtendedStats.Stats, statsName)
if h.updateStatsCache(oldCache.Update([]*statistics.Table{newTbl}, nil)) {
if h.updateStatsCache(oldCache.CopyAndUpdate([]*statistics.Table{newTbl}, nil)) {
return
}
if retry == 1 {
Expand Down Expand Up @@ -1831,7 +1831,7 @@ func (h *Handle) ReloadExtendedStatistics() error {
}
tables = append(tables, t)
}
if h.updateStatsCache(oldCache.Update(tables, nil)) {
if h.updateStatsCache(oldCache.CopyAndUpdate(tables, nil)) {
return nil
}
}
Expand Down
2 changes: 1 addition & 1 deletion statistics/handle/handle_hist.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ func (h *Handle) updateCachedItem(item model.TableItemID, colHist *statistics.Co
tbl = tbl.Copy()
tbl.Indices[item.ID] = idxHist
}
return h.updateStatsCache(oldCache.Update([]*statistics.Table{tbl}, nil, cache.WithTableStatsByQuery()))
return h.updateStatsCache(oldCache.CopyAndUpdate([]*statistics.Table{tbl}, nil, cache.WithTableStatsByQuery()))
}

func (h *Handle) setWorking(item model.TableItemID, resultCh chan stmtctx.StatsLoadResult) bool {
Expand Down
4 changes: 2 additions & 2 deletions statistics/handle/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ OUTER:
}
for retry := updateStatsCacheRetryCnt; retry > 0; retry-- {
oldCache := h.statsCache.Load()
if h.updateStatsCache(oldCache.Update([]*statistics.Table{newTblStats}, nil)) {
if h.updateStatsCache(oldCache.CopyAndUpdate([]*statistics.Table{newTblStats}, nil)) {
break
}
}
Expand Down Expand Up @@ -797,7 +797,7 @@ func (h *Handle) UpdateErrorRate(is infoschema.InfoSchema) {
h.mu.Unlock()
for retry := updateStatsCacheRetryCnt; retry > 0; retry-- {
oldCache := h.statsCache.Load()
if h.updateStatsCache(oldCache.Update(tbls, nil)) {
if h.updateStatsCache(oldCache.CopyAndUpdate(tbls, nil)) {
break
}
}
Expand Down

0 comments on commit cac0db2

Please sign in to comment.