Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

infoschema: fix issue of information schema cache miss cause by schema version gap #53445

Merged
merged 5 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,8 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64
if diff == nil {
// Empty diff means the txn of generating schema version is committed, but the txn of `runDDLJob` is not or fail.
// It is safe to skip the empty diff because the infoschema is new enough and consistent.
logutil.BgLogger().Info("diff load InfoSchema get empty schema diff", zap.Int64("version", usedVersion))
do.infoCache.InsertEmptySchemaVersion(usedVersion)
continue
}
diffs = append(diffs, diff)
Expand Down
68 changes: 60 additions & 8 deletions pkg/infoschema/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ type InfoCache struct {
// cache is sorted by both SchemaVersion and timestamp in descending order, assume they have same order
cache []schemaAndTimestamp

// emptySchemaVersions stores schema version which has no schema_diff.
emptySchemaVersions map[int64]struct{}

r autoid.Requirement
Data *Data
}
Expand All @@ -45,9 +48,10 @@ type schemaAndTimestamp struct {
func NewCache(r autoid.Requirement, capacity int) *InfoCache {
infoData := NewData()
return &InfoCache{
cache: make([]schemaAndTimestamp, 0, capacity),
r: r,
Data: infoData,
cache: make([]schemaAndTimestamp, 0, capacity),
emptySchemaVersions: make(map[int64]struct{}),
r: r,
Data: infoData,
}
}

Expand Down Expand Up @@ -100,6 +104,11 @@ func (h *InfoCache) Len() int {
return len(h.cache)
}

// GetEmptySchemaVersions returns emptySchemaVersions, exports for testing.
func (h *InfoCache) GetEmptySchemaVersions() map[int64]struct{} {
return h.emptySchemaVersions
}

func (h *InfoCache) getSchemaByTimestampNoLock(ts uint64) (InfoSchema, bool) {
logutil.BgLogger().Debug("SCHEMA CACHE get schema", zap.Uint64("timestamp", ts))
// search one by one instead of binary search, because the timestamp of a schema could be 0
Expand All @@ -117,11 +126,32 @@ func (h *InfoCache) getSchemaByTimestampNoLock(ts uint64) (InfoSchema, bool) {
// the first element is the latest schema, so we can return it directly.
return is.infoschema, true
}
if h.cache[i-1].infoschema.SchemaMetaVersion() == is.infoschema.SchemaMetaVersion()+1 && uint64(h.cache[i-1].timestamp) > ts {
// This first condition is to make sure the schema version is continuous. If last(cache[i-1]) schema-version is 10,
// but current(cache[i]) schema-version is not 9, then current schema is not suitable for ts.
// The second condition is to make sure the cache[i-1].timestamp > ts >= cache[i].timestamp, then the current schema is suitable for ts.
return is.infoschema, true

if uint64(h.cache[i-1].timestamp) > ts {
// The first condition is to make sure the cache[i-1].timestamp > ts >= cache[i].timestamp, then the current schema is suitable for ts.
lastVersion := h.cache[i-1].infoschema.SchemaMetaVersion()
currentVersion := is.infoschema.SchemaMetaVersion()
if lastVersion == currentVersion+1 {
// This condition is to make sure the schema version is continuous. If last(cache[i-1]) schema-version is 10,
// but current(cache[i]) schema-version is not 9, then current schema may not suitable for ts.
return is.infoschema, true
}
if lastVersion > currentVersion {
found := true
for ver := currentVersion + 1; ver < lastVersion; ver++ {
_, ok := h.emptySchemaVersions[ver]
if !ok {
found = false
break
}
}
if found {
// This condition is to make sure the schema version is continuous. If last(cache[i-1]) schema-version is 10, and
// current(cache[i]) schema-version is 8, then there is a gap exist, and if all the gap version can be found in cache.emptySchemaVersions
// which means those gap versions don't have schema info, then current schema is also suitable for ts.
return is.infoschema, true
}
}
}
// current schema is not suitable for ts, then break the loop to avoid the unnecessary search.
break
Expand Down Expand Up @@ -241,3 +271,25 @@ func (h *InfoCache) Insert(is InfoSchema, schemaTS uint64) bool {

return true
}

// InsertEmptySchemaVersion inserts empty schema version into a map. If exceeded the cache capacity, remove the oldest version.
func (h *InfoCache) InsertEmptySchemaVersion(version int64) {
h.mu.Lock()
defer h.mu.Unlock()

h.emptySchemaVersions[version] = struct{}{}
if len(h.emptySchemaVersions) > cap(h.cache) {
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
// remove oldest version.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without statistics of how those version are accessed, remove oldest is no better than remove random version, and more complex

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For stale-read query workload, it will always need recent schema version, so remove the oldest version is better.

versions := make([]int64, 0, len(h.emptySchemaVersions))
for ver := range h.emptySchemaVersions {
versions = append(versions, ver)
}
sort.Slice(versions, func(i, j int) bool { return versions[i] < versions[j] })
for _, ver := range versions {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is removing one version enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because empty schema version rarely appears, I think it's okay to delete just one here.

delete(h.emptySchemaVersions, ver)
if len(h.emptySchemaVersions) <= cap(h.cache) {
break
}
}
}
}
2 changes: 1 addition & 1 deletion pkg/infoschema/test/cachetest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go_test(
"main_test.go",
],
flaky = True,
shard_count = 7,
shard_count = 8,
deps = [
"//pkg/infoschema",
"//pkg/testkit/testsetup",
Expand Down
47 changes: 47 additions & 0 deletions pkg/infoschema/test/cachetest/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,4 +290,51 @@ func TestCacheWithSchemaTsZero(t *testing.T) {
checkFn(1, 84, false)
checkFn(85, 100, true)
require.Equal(t, 16, ic.Size())

// Test cache with schema version hole, which is cause by schema version doesn't has related schema-diff.
ic = infoschema.NewCache(nil, 16)
require.NotNil(t, ic)
for i := 1; i <= 8; i++ {
ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, int64(i)), uint64(i))
}
checkFn(1, 10, true)
// mock for schema version hole, schema-version 9 is missing.
ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 10), 10)
checkFn(1, 7, true)
// without empty schema version map, get snapshot by ts 8, 9 will both failed.
checkFn(8, 9, false)
checkFn(10, 10, true)
// add empty schema version 9.
ic.InsertEmptySchemaVersion(9)
// after set empty schema version, get snapshot by ts 8, 9 will both success.
checkFn(1, 8, true)
checkFn(10, 10, true)
is := ic.GetBySnapshotTS(uint64(9))
require.NotNil(t, is)
// since schema version 9 is empty, so get by ts 9 will get schema which version is 8.
require.Equal(t, int64(8), is.SchemaMetaVersion())
}

func TestCacheEmptySchemaVersion(t *testing.T) {
ic := infoschema.NewCache(nil, 16)
require.NotNil(t, ic)
require.Equal(t, 0, len(ic.GetEmptySchemaVersions()))
for i := 0; i < 16; i++ {
ic.InsertEmptySchemaVersion(int64(i))
}
emptyVersions := ic.GetEmptySchemaVersions()
require.Equal(t, 16, len(emptyVersions))
for i := 0; i < 16; i++ {
_, ok := emptyVersions[int64(i)]
require.True(t, ok)
}
for i := 16; i < 20; i++ {
ic.InsertEmptySchemaVersion(int64(i))
}
emptyVersions = ic.GetEmptySchemaVersions()
require.Equal(t, 16, len(emptyVersions))
for i := 4; i < 20; i++ {
_, ok := emptyVersions[int64(i)]
require.True(t, ok)
}
}