Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

infoschema: fix issue of information schema cache miss cause by schema version gap (#53445) #53562

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -3703,8 +3703,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:M0DudF9JLF1GbmdoxM7LK8pNaiLAReHihdk5nwnB8pg=",
version = "v2.0.4-0.20240125040124-4c64e5d5d57d",
sum = "h1:NXhdhxU4dibxCmMChAOpyBXzuqi2VKp17nSx1HUg4HU=",
version = "v2.0.4-0.20240521070200-f9fbc4c8f578",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down
17 changes: 11 additions & 6 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
// 3. There are less 100 diffs.
startTime := time.Now()
if currentSchemaVersion != 0 && neededSchemaVersion > currentSchemaVersion && neededSchemaVersion-currentSchemaVersion < 100 {
is, relatedChanges, err := do.tryLoadSchemaDiffs(m, currentSchemaVersion, neededSchemaVersion)
is, relatedChanges, diffTypes, err := do.tryLoadSchemaDiffs(m, currentSchemaVersion, neededSchemaVersion)
if err == nil {
loadSchemaDurationLoadDiff.Observe(time.Since(startTime).Seconds())
do.infoCache.Insert(is, uint64(schemaTs))
Expand All @@ -226,7 +226,8 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
zap.Int64("neededSchemaVersion", neededSchemaVersion),
zap.Duration("start time", time.Since(startTime)),
zap.Int64s("phyTblIDs", relatedChanges.PhyTblIDS),
zap.Uint64s("actionTypes", relatedChanges.ActionTypes))
zap.Uint64s("actionTypes", relatedChanges.ActionTypes),
zap.Strings("diffTypes", diffTypes))
return is, false, currentSchemaVersion, relatedChanges, nil
}
// We can fall back to full load, don't need to return the error.
Expand Down Expand Up @@ -369,17 +370,19 @@ func (do *Domain) fetchSchemasWithTables(schemas []*model.DBInfo, m *meta.Meta,
// Return true if the schema is loaded successfully.
// Return false if the schema can not be loaded by schema diff, then we need to do full load.
// The second returned value is the delta updated table and partition IDs.
func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64) (infoschema.InfoSchema, *transaction.RelatedSchemaChange, error) {
func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64) (infoschema.InfoSchema, *transaction.RelatedSchemaChange, []string, error) {
var diffs []*model.SchemaDiff
for usedVersion < newVersion {
usedVersion++
diff, err := m.GetSchemaDiff(usedVersion)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
if diff == nil {
// Empty diff means the txn of generating schema version is committed, but the txn of `runDDLJob` is not or fail.
// It is safe to skip the empty diff because the infoschema is new enough and consistent.
logutil.BgLogger().Info("diff load InfoSchema get empty schema diff", zap.Int64("version", usedVersion))
do.infoCache.InsertEmptySchemaVersion(usedVersion)
continue
}
diffs = append(diffs, diff)
Expand All @@ -388,14 +391,16 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64
builder.SetDeltaUpdateBundles()
phyTblIDs := make([]int64, 0, len(diffs))
actions := make([]uint64, 0, len(diffs))
diffTypes := make([]string, 0, len(diffs))
for _, diff := range diffs {
IDs, err := builder.ApplyDiff(m, diff)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
if canSkipSchemaCheckerDDL(diff.Type) {
continue
}
diffTypes = append(diffTypes, diff.Type.String())
phyTblIDs = append(phyTblIDs, IDs...)
for i := 0; i < len(IDs); i++ {
actions = append(actions, uint64(1<<diff.Type))
Expand All @@ -406,7 +411,7 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64
relatedChange := transaction.RelatedSchemaChange{}
relatedChange.PhyTblIDS = phyTblIDs
relatedChange.ActionTypes = actions
return is, &relatedChange, nil
return is, &relatedChange, diffTypes, nil
}

func canSkipSchemaCheckerDDL(tp model.ActionType) bool {
Expand Down
61 changes: 58 additions & 3 deletions infoschema/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type InfoCache struct {
mu sync.RWMutex
// cache is sorted by both SchemaVersion and timestamp in descending order, assume they have same order
cache []schemaAndTimestamp

// emptySchemaVersions stores schema version which has no schema_diff.
emptySchemaVersions map[int64]struct{}
}

type schemaAndTimestamp struct {
Expand All @@ -50,7 +53,8 @@ type schemaAndTimestamp struct {
// NewCache creates a new InfoCache.
func NewCache(capacity int) *InfoCache {
return &InfoCache{
cache: make([]schemaAndTimestamp, 0, capacity),
cache: make([]schemaAndTimestamp, 0, capacity),
emptySchemaVersions: make(map[int64]struct{}),
}
}

Expand Down Expand Up @@ -102,6 +106,11 @@ func (h *InfoCache) Len() int {
return len(h.cache)
}

// GetEmptySchemaVersions returns emptySchemaVersions, exports for testing.
func (h *InfoCache) GetEmptySchemaVersions() map[int64]struct{} {
return h.emptySchemaVersions
}

func (h *InfoCache) getSchemaByTimestampNoLock(ts uint64) (InfoSchema, bool) {
logutil.BgLogger().Debug("SCHEMA CACHE get schema", zap.Uint64("timestamp", ts))
// search one by one instead of binary search, because the timestamp of a schema could be 0
Expand All @@ -115,8 +124,32 @@ func (h *InfoCache) getSchemaByTimestampNoLock(ts uint64) (InfoSchema, bool) {
if i == 0 {
return is.infoschema, true
}
if h.cache[i-1].infoschema.SchemaMetaVersion() == is.infoschema.SchemaMetaVersion()+1 && uint64(h.cache[i-1].timestamp) > ts {
return is.infoschema, true

if uint64(h.cache[i-1].timestamp) > ts {
// The first condition is to make sure the cache[i-1].timestamp > ts >= cache[i].timestamp, then the current schema is suitable for ts.
lastVersion := h.cache[i-1].infoschema.SchemaMetaVersion()
currentVersion := is.infoschema.SchemaMetaVersion()
if lastVersion == currentVersion+1 {
// This condition is to make sure the schema version is continuous. If last(cache[i-1]) schema-version is 10,
// but current(cache[i]) schema-version is not 9, then current schema may not suitable for ts.
return is.infoschema, true
}
if lastVersion > currentVersion {
found := true
for ver := currentVersion + 1; ver < lastVersion; ver++ {
_, ok := h.emptySchemaVersions[ver]
if !ok {
found = false
break
}
}
if found {
// This condition is to make sure the schema version is continuous. If last(cache[i-1]) schema-version is 10, and
// current(cache[i]) schema-version is 8, then there is a gap exist, and if all the gap version can be found in cache.emptySchemaVersions
// which means those gap versions don't have schema info, then current schema is also suitable for ts.
return is.infoschema, true
}
}
}
break
}
Expand Down Expand Up @@ -225,3 +258,25 @@ func (h *InfoCache) Insert(is InfoSchema, schemaTS uint64) bool {

return true
}

// InsertEmptySchemaVersion inserts empty schema version into a map. If exceeded the cache capacity, remove the oldest version.
func (h *InfoCache) InsertEmptySchemaVersion(version int64) {
h.mu.Lock()
defer h.mu.Unlock()

h.emptySchemaVersions[version] = struct{}{}
if len(h.emptySchemaVersions) > cap(h.cache) {
// remove oldest version.
versions := make([]int64, 0, len(h.emptySchemaVersions))
for ver := range h.emptySchemaVersions {
versions = append(versions, ver)
}
sort.Slice(versions, func(i, j int) bool { return versions[i] < versions[j] })
for _, ver := range versions {
delete(h.emptySchemaVersions, ver)
if len(h.emptySchemaVersions) <= cap(h.cache) {
break
}
}
}
}
47 changes: 47 additions & 0 deletions infoschema/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,4 +290,51 @@ func TestCacheWithSchemaTsZero(t *testing.T) {
checkFn(1, 84, false)
checkFn(85, 100, true)
require.Equal(t, 16, ic.Size())

// Test cache with schema version hole, which is cause by schema version doesn't has related schema-diff.
ic = infoschema.NewCache(16)
require.NotNil(t, ic)
for i := 1; i <= 8; i++ {
ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, int64(i)), uint64(i))
}
checkFn(1, 10, true)
// mock for schema version hole, schema-version 9 is missing.
ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 10), 10)
checkFn(1, 7, true)
// without empty schema version map, get snapshot by ts 8, 9 will both failed.
checkFn(8, 9, false)
checkFn(10, 10, true)
// add empty schema version 9.
ic.InsertEmptySchemaVersion(9)
// after set empty schema version, get snapshot by ts 8, 9 will both success.
checkFn(1, 8, true)
checkFn(10, 10, true)
is := ic.GetBySnapshotTS(uint64(9))
require.NotNil(t, is)
// since schema version 9 is empty, so get by ts 9 will get schema which version is 8.
require.Equal(t, int64(8), is.SchemaMetaVersion())
}

func TestCacheEmptySchemaVersion(t *testing.T) {
ic := infoschema.NewCache(16)
require.NotNil(t, ic)
require.Equal(t, 0, len(ic.GetEmptySchemaVersions()))
for i := 0; i < 16; i++ {
ic.InsertEmptySchemaVersion(int64(i))
}
emptyVersions := ic.GetEmptySchemaVersions()
require.Equal(t, 16, len(emptyVersions))
for i := 0; i < 16; i++ {
_, ok := emptyVersions[int64(i)]
require.True(t, ok)
}
for i := 16; i < 20; i++ {
ic.InsertEmptySchemaVersion(int64(i))
}
emptyVersions = ic.GetEmptySchemaVersions()
require.Equal(t, 16, len(emptyVersions))
for i := 4; i < 20; i++ {
_, ok := emptyVersions[int64(i)]
require.True(t, ok)
}
}
1 change: 1 addition & 0 deletions util/logutil/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"@com_github_opentracing_opentracing_go//log",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_log//:log",
"@com_github_tikv_client_go_v2//tikv",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//buffer",
"@org_uber_go_zap//zapcore",
Expand Down