Skip to content

Commit

Permalink
statistics: support reject for LFU cache (#46090)
Browse files Browse the repository at this point in the history
close #46112
  • Loading branch information
hawkingrei authored Aug 15, 2023
1 parent 04f96b5 commit b69fa21
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 40 deletions.
4 changes: 0 additions & 4 deletions statistics/handle/cache/internal/inner.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ type StatsCacheInner interface {
Copy() StatsCacheInner
// SetCapacity sets the capacity of the cache
SetCapacity(int64)

// Front returns the front element's owner tableID, only used for test
// TODO: this method is mainly for test, remove it in the future.
Front() int64
// Close stops the cache
Close()
}
3 changes: 2 additions & 1 deletion statistics/handle/cache/internal/lfu/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ go_test(
embed = [":lfu"],
flaky = True,
race = "on",
shard_count = 6,
shard_count = 7,
deps = [
"//statistics",
"//statistics/handle/cache/internal/testutil",
"@com_github_stretchr_testify//require",
],
Expand Down
11 changes: 1 addition & 10 deletions statistics/handle/cache/internal/lfu/key_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,10 @@ func (ks *keySet) Len() int {
return result
}

func (ks *keySet) AddKeyValue(key int64, value *statistics.Table) (cost int64) {
func (ks *keySet) AddKeyValue(key int64, value *statistics.Table) {
ks.mu.Lock()
if v, ok := ks.set[key]; ok && v != nil {
cost = v.MemoryUsage().TotalTrackingMemUsage()
}
ks.set[key] = value
ks.mu.Unlock()
if value != nil {
cost = value.MemoryUsage().TotalTrackingMemUsage() - cost
} else {
cost = -cost
}
return cost
}

func (ks *keySet) Get(key int64) (*statistics.Table, bool) {
Expand Down
26 changes: 13 additions & 13 deletions statistics/handle/cache/internal/lfu/lfu_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,15 @@ func (s *LFU) Get(tid int64, _ bool) (*statistics.Table, bool) {

// Put implements statsCacheInner
func (s *LFU) Put(tblID int64, tbl *statistics.Table) bool {
ok := s.cache.Set(tblID, tbl, tbl.MemoryUsage().TotalTrackingMemUsage())
if ok { // NOTE: `s.cache` and `s.resultKeySet` may be inconsistent since the update operation is not atomic, but it's acceptable for our scenario
s.resultKeySet.AddKeyValue(tblID, tbl)
s.cost.Add(tbl.MemoryUsage().TotalTrackingMemUsage())
}
cost := tbl.MemoryUsage().TotalTrackingMemUsage()
// Here we need to insert resultKeySet first and then write to LFU,
// in order to prevent data race. If the LFU cost is already full,
// a rejection may occur, triggering the onEvict event.
// Both inserting into resultKeySet and evicting will modify the memory cost,
// so we need to stagger these two actions.
s.resultKeySet.AddKeyValue(tblID, tbl)
s.cost.Add(cost)
ok := s.cache.Set(tblID, tbl, cost)
metrics.CostGauge.Set(float64(s.cost.Load()))
return ok
}
Expand All @@ -102,8 +106,8 @@ func (s *LFU) Cost() int64 {
func (s *LFU) Values() []*statistics.Table {
result := make([]*statistics.Table, 0, 512)
for _, k := range s.resultKeySet.Keys() {
if value, ok := s.cache.Get(k); ok {
result = append(result, value.(*statistics.Table))
if value, ok := s.resultKeySet.Get(k); ok {
result = append(result, value)
}
}
return result
Expand All @@ -117,8 +121,9 @@ func DropEvicted(item statistics.TableCacheItem) {
item.DropUnnecessaryData()
}

func (*LFU) onReject(*ristretto.Item) {
func (s *LFU) onReject(item *ristretto.Item) {
metrics.RejectCounter.Add(1.0)
s.onEvict(item)
}

func (s *LFU) onEvict(item *ristretto.Item) {
Expand Down Expand Up @@ -159,11 +164,6 @@ func (s *LFU) Len() int {
return s.resultKeySet.Len()
}

// Front implements statsCacheInner
func (*LFU) Front() int64 {
return 0
}

// Copy implements statsCacheInner
func (s *LFU) Copy() internal.StatsCacheInner {
return s
Expand Down
37 changes: 35 additions & 2 deletions statistics/handle/cache/internal/lfu/lfu_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ package lfu
import (
"sync"
"testing"
"time"

"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle/cache/internal/testutil"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -46,11 +48,14 @@ func TestLFUPutGetDel(t *testing.T) {
}

func TestLFUFreshMemUsage(t *testing.T) {
lfu, err := NewLFU(1000)
lfu, err := NewLFU(10000)
require.NoError(t, err)
t1 := testutil.NewMockStatisticsTable(1, 1, true, false, false)
require.Equal(t, mockCMSMemoryUsage+mockCMSMemoryUsage, t1.MemoryUsage().TotalMemUsage)
t2 := testutil.NewMockStatisticsTable(2, 2, true, false, false)
require.Equal(t, 2*mockCMSMemoryUsage+2*mockCMSMemoryUsage, t2.MemoryUsage().TotalMemUsage)
t3 := testutil.NewMockStatisticsTable(3, 3, true, false, false)
require.Equal(t, 3*mockCMSMemoryUsage+3*mockCMSMemoryUsage, t3.MemoryUsage().TotalMemUsage)
lfu.Put(int64(1), t1)
lfu.Put(int64(2), t2)
lfu.Put(int64(3), t3)
Expand All @@ -59,7 +64,7 @@ func TestLFUFreshMemUsage(t *testing.T) {
t4 := testutil.NewMockStatisticsTable(2, 1, true, false, false)
lfu.Put(int64(1), t4)
lfu.wait()
require.Equal(t, lfu.Cost(), 6*mockCMSMemoryUsage+7*mockCMSMemoryUsage)
require.Equal(t, lfu.Cost(), 7*mockCMSMemoryUsage+6*mockCMSMemoryUsage)
t5 := testutil.NewMockStatisticsTable(2, 2, true, false, false)
lfu.Put(int64(1), t5)
lfu.wait()
Expand Down Expand Up @@ -164,3 +169,31 @@ func TestLFUCachePutGetWithManyConcurrency2(t *testing.T) {
require.Equal(t, uint64(lfu.Cost()), lfu.metrics().CostAdded()-lfu.metrics().CostEvicted())
require.Equal(t, 1000, len(lfu.Values()))
}

func TestLFUReject(t *testing.T) {
capacity := int64(100000000000)
lfu, err := NewLFU(capacity)
require.NoError(t, err)
t1 := testutil.NewMockStatisticsTable(2, 1, true, false, false)
require.Equal(t, 2*mockCMSMemoryUsage+mockCMSMemoryUsage, t1.MemoryUsage().TotalTrackingMemUsage())
lfu.Put(1, t1)
lfu.wait()
require.Equal(t, lfu.Cost(), 2*mockCMSMemoryUsage+mockCMSMemoryUsage)

lfu.SetCapacity(2*mockCMSMemoryUsage + mockCMSMemoryUsage - 1)

t2 := testutil.NewMockStatisticsTable(2, 1, true, false, false)
require.True(t, lfu.Put(2, t2))
lfu.wait()
time.Sleep(3 * time.Second)
require.Equal(t, int64(12), lfu.Cost())
require.Len(t, lfu.Values(), 2)
v, ok := lfu.Get(2, false)
require.True(t, ok)
for _, c := range v.Columns {
require.Equal(t, c.GetEvictedStatus(), statistics.AllEvicted)
}
for _, i := range v.Indices {
require.Equal(t, i.GetEvictedStatus(), statistics.AllEvicted)
}
}
5 changes: 0 additions & 5 deletions statistics/handle/cache/internal/mapcache/map_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,5 @@ func (m *MapCache) Copy() internal.StatsCacheInner {
// SetCapacity implements StatsCacheInner
func (*MapCache) SetCapacity(int64) {}

// Front implements StatsCacheInner
func (*MapCache) Front() int64 {
return 0
}

// Close implements StatsCacheInner
func (*MapCache) Close() {}
5 changes: 0 additions & 5 deletions statistics/handle/cache/statscacheinner.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,6 @@ func (sc *StatsCache) Version() uint64 {
return sc.maxTblStatsVer.Load()
}

// Front returns the front element's owner tableID, only used for test.
func (sc *StatsCache) Front() int64 {
return sc.c.Front()
}

// CopyAndUpdate copies a new cache and updates the new statistics table cache. It is only used in the COW mode.
func (sc *StatsCache) CopyAndUpdate(tables []*statistics.Table, deletedIDs []int64, opts ...TableStatsOpt) *StatsCache {
option := &TableStatsOption{}
Expand Down

0 comments on commit b69fa21

Please sign in to comment.