Skip to content

Commit 7b80380

Browse files
authored
schema cache: cache schema version by timestamp (#40768)
close #40740
1 parent ba41d92 commit 7b80380

File tree

2 files changed

+117
-26
lines changed

2 files changed

+117
-26
lines changed

infoschema/cache.go

+56-23
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package infoschema
1616

1717
import (
18+
"fmt"
1819
"sort"
1920
"sync"
2021

@@ -36,22 +37,27 @@ var (
3637
// It only promised to cache the infoschema, if it is newer than all the cached.
3738
type InfoCache struct {
3839
mu sync.RWMutex
39-
// cache is sorted by SchemaVersion in descending order
40-
cache []InfoSchema
41-
// record SnapshotTS of the latest schema Insert.
42-
maxUpdatedSnapshotTS uint64
40+
// cache is sorted by both SchemaVersion and timestamp in descending order, assume they have same order
41+
cache []schemaAndTimestamp
42+
}
43+
44+
type schemaAndTimestamp struct {
45+
infoschema InfoSchema
46+
timestamp int64
4347
}
4448

4549
// NewCache creates a new InfoCache.
4650
func NewCache(capacity int) *InfoCache {
47-
return &InfoCache{cache: make([]InfoSchema, 0, capacity)}
51+
return &InfoCache{
52+
cache: make([]schemaAndTimestamp, 0, capacity),
53+
}
4854
}
4955

5056
// Reset resets the cache.
5157
func (h *InfoCache) Reset(capacity int) {
5258
h.mu.Lock()
5359
defer h.mu.Unlock()
54-
h.cache = make([]InfoSchema, 0, capacity)
60+
h.cache = make([]schemaAndTimestamp, 0, capacity)
5561
}
5662

5763
// GetLatest gets the newest information schema.
@@ -61,18 +67,40 @@ func (h *InfoCache) GetLatest() InfoSchema {
6167
getLatestCounter.Inc()
6268
if len(h.cache) > 0 {
6369
hitLatestCounter.Inc()
64-
return h.cache[0]
70+
return h.cache[0].infoschema
6571
}
6672
return nil
6773
}
6874

75+
// GetSchemaByTimestamp returns the schema used at the specific timestamp
76+
func (h *InfoCache) GetSchemaByTimestamp(ts uint64) (InfoSchema, error) {
77+
h.mu.RLock()
78+
defer h.mu.RUnlock()
79+
return h.getSchemaByTimestampNoLock(ts)
80+
}
81+
82+
func (h *InfoCache) getSchemaByTimestampNoLock(ts uint64) (InfoSchema, error) {
83+
i := sort.Search(len(h.cache), func(i int) bool {
84+
return uint64(h.cache[i].timestamp) <= ts
85+
})
86+
if i < len(h.cache) {
87+
return h.cache[i].infoschema, nil
88+
}
89+
90+
return nil, fmt.Errorf("no schema cached for timestamp %d", ts)
91+
}
92+
6993
// GetByVersion gets the information schema based on schemaVersion. Returns nil if it is not loaded.
7094
func (h *InfoCache) GetByVersion(version int64) InfoSchema {
7195
h.mu.RLock()
7296
defer h.mu.RUnlock()
97+
return h.getByVersionNoLock(version)
98+
}
99+
100+
func (h *InfoCache) getByVersionNoLock(version int64) InfoSchema {
73101
getVersionCounter.Inc()
74102
i := sort.Search(len(h.cache), func(i int) bool {
75-
return h.cache[i].SchemaMetaVersion() <= version
103+
return h.cache[i].infoschema.SchemaMetaVersion() <= version
76104
})
77105

78106
// `GetByVersion` is allowed to load the latest schema that is less than argument `version`.
@@ -93,9 +121,9 @@ func (h *InfoCache) GetByVersion(version int64) InfoSchema {
93121
// }
94122
// ```
95123

96-
if i < len(h.cache) && (i != 0 || h.cache[i].SchemaMetaVersion() == version) {
124+
if i < len(h.cache) && (i != 0 || h.cache[i].infoschema.SchemaMetaVersion() == version) {
97125
hitVersionCounter.Inc()
98-
return h.cache[i]
126+
return h.cache[i].infoschema
99127
}
100128
return nil
101129
}
@@ -108,11 +136,9 @@ func (h *InfoCache) GetBySnapshotTS(snapshotTS uint64) InfoSchema {
108136
defer h.mu.RUnlock()
109137

110138
getTSCounter.Inc()
111-
if snapshotTS >= h.maxUpdatedSnapshotTS {
112-
if len(h.cache) > 0 {
113-
hitTSCounter.Inc()
114-
return h.cache[0]
115-
}
139+
if schema, err := h.getSchemaByTimestampNoLock(snapshotTS); err == nil {
140+
hitTSCounter.Inc()
141+
return schema
116142
}
117143
return nil
118144
}
@@ -125,29 +151,36 @@ func (h *InfoCache) Insert(is InfoSchema, snapshotTS uint64) bool {
125151
defer h.mu.Unlock()
126152

127153
version := is.SchemaMetaVersion()
154+
155+
// assume this is the timestamp order as well
128156
i := sort.Search(len(h.cache), func(i int) bool {
129-
return h.cache[i].SchemaMetaVersion() <= version
157+
return h.cache[i].infoschema.SchemaMetaVersion() <= version
130158
})
131159

132-
if h.maxUpdatedSnapshotTS < snapshotTS {
133-
h.maxUpdatedSnapshotTS = snapshotTS
134-
}
135-
136160
// cached entry
137-
if i < len(h.cache) && h.cache[i].SchemaMetaVersion() == version {
161+
if i < len(h.cache) && h.cache[i].infoschema.SchemaMetaVersion() == version {
162+
if h.cache[i].timestamp > int64(snapshotTS) {
163+
h.cache[i].timestamp = int64(snapshotTS)
164+
}
138165
return true
139166
}
140167

141168
if len(h.cache) < cap(h.cache) {
142169
// has free space, grown the slice
143170
h.cache = h.cache[:len(h.cache)+1]
144171
copy(h.cache[i+1:], h.cache[i:])
145-
h.cache[i] = is
172+
h.cache[i] = schemaAndTimestamp{
173+
infoschema: is,
174+
timestamp: int64(snapshotTS),
175+
}
146176
return true
147177
} else if i < len(h.cache) {
148178
// drop older schema
149179
copy(h.cache[i+1:], h.cache[i:])
150-
h.cache[i] = is
180+
h.cache[i] = schemaAndTimestamp{
181+
infoschema: is,
182+
timestamp: int64(snapshotTS),
183+
}
151184
return true
152185
}
153186
// older than all cached schemas, refuse to cache it

infoschema/cache_test.go

+61-3
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func TestInsert(t *testing.T) {
4242
ic.Insert(is5, 5)
4343
require.Equal(t, is5, ic.GetByVersion(5))
4444
require.Equal(t, is2, ic.GetByVersion(2))
45-
require.Nil(t, ic.GetBySnapshotTS(2))
45+
require.Equal(t, is2, ic.GetBySnapshotTS(2))
4646
require.Equal(t, is5, ic.GetBySnapshotTS(10))
4747

4848
// older
@@ -59,7 +59,7 @@ func TestInsert(t *testing.T) {
5959
require.Equal(t, is5, ic.GetByVersion(5))
6060
require.Equal(t, is2, ic.GetByVersion(2))
6161
require.Nil(t, ic.GetByVersion(0))
62-
require.Nil(t, ic.GetBySnapshotTS(2))
62+
require.Equal(t, is2, ic.GetBySnapshotTS(2))
6363
require.Equal(t, is6, ic.GetBySnapshotTS(10))
6464

6565
// replace 2, drop 2
@@ -91,7 +91,7 @@ func TestInsert(t *testing.T) {
9191
require.Nil(t, ic.GetByVersion(2))
9292
require.Nil(t, ic.GetByVersion(0))
9393
require.Nil(t, ic.GetBySnapshotTS(2))
94-
require.Nil(t, ic.GetBySnapshotTS(5))
94+
require.Equal(t, is5, ic.GetBySnapshotTS(5))
9595
require.Equal(t, is6, ic.GetBySnapshotTS(10))
9696
}
9797

@@ -129,3 +129,61 @@ func TestGetLatest(t *testing.T) {
129129
ic.Insert(is0, 0)
130130
require.Equal(t, is2, ic.GetLatest())
131131
}
132+
133+
func TestGetByTimestamp(t *testing.T) {
134+
ic := infoschema.NewCache(16)
135+
require.NotNil(t, ic)
136+
require.Nil(t, ic.GetLatest())
137+
138+
is1 := infoschema.MockInfoSchemaWithSchemaVer(nil, 1)
139+
ic.Insert(is1, 1)
140+
require.Equal(t, is1, ic.GetLatest())
141+
_, err := ic.GetSchemaByTimestamp(0)
142+
require.NotNil(t, err)
143+
schema, err := ic.GetSchemaByTimestamp(1)
144+
require.Nil(t, err)
145+
require.Equal(t, int64(1), schema.SchemaMetaVersion())
146+
require.Equal(t, is1, ic.GetBySnapshotTS(1))
147+
schema, err = ic.GetSchemaByTimestamp(2)
148+
require.Nil(t, err)
149+
require.Equal(t, int64(1), schema.SchemaMetaVersion())
150+
require.Equal(t, is1, ic.GetBySnapshotTS(2))
151+
152+
is2 := infoschema.MockInfoSchemaWithSchemaVer(nil, 2)
153+
ic.Insert(is2, 2)
154+
require.Equal(t, is2, ic.GetLatest())
155+
_, err = ic.GetSchemaByTimestamp(0)
156+
require.NotNil(t, err)
157+
schema, err = ic.GetSchemaByTimestamp(1)
158+
require.Nil(t, err)
159+
require.Equal(t, int64(1), schema.SchemaMetaVersion())
160+
require.Equal(t, is1, ic.GetBySnapshotTS(1))
161+
schema, err = ic.GetSchemaByTimestamp(2)
162+
require.Nil(t, err)
163+
require.Equal(t, int64(2), schema.SchemaMetaVersion())
164+
require.Equal(t, is2, ic.GetBySnapshotTS(2))
165+
schema, err = ic.GetSchemaByTimestamp(3)
166+
require.Nil(t, err)
167+
require.Equal(t, int64(2), schema.SchemaMetaVersion())
168+
require.Equal(t, is2, ic.GetBySnapshotTS(3))
169+
170+
is0 := infoschema.MockInfoSchemaWithSchemaVer(nil, 0)
171+
ic.Insert(is0, 0)
172+
require.Equal(t, is2, ic.GetLatest())
173+
schema, err = ic.GetSchemaByTimestamp(0)
174+
require.Nil(t, err)
175+
require.Equal(t, int64(0), schema.SchemaMetaVersion())
176+
require.Equal(t, is0, ic.GetBySnapshotTS(0))
177+
schema, err = ic.GetSchemaByTimestamp(1)
178+
require.Nil(t, err)
179+
require.Equal(t, int64(1), schema.SchemaMetaVersion())
180+
require.Equal(t, is1, ic.GetBySnapshotTS(1))
181+
schema, err = ic.GetSchemaByTimestamp(2)
182+
require.Nil(t, err)
183+
require.Equal(t, int64(2), schema.SchemaMetaVersion())
184+
require.Equal(t, is2, ic.GetBySnapshotTS(2))
185+
schema, err = ic.GetSchemaByTimestamp(3)
186+
require.Nil(t, err)
187+
require.Equal(t, int64(2), schema.SchemaMetaVersion())
188+
require.Equal(t, is2, ic.GetBySnapshotTS(3))
189+
}

0 commit comments

Comments
 (0)