diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index 6c8eab47eb356..42ec8f3cbfd43 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -452,6 +452,8 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64 if diff == nil { // Empty diff means the txn of generating schema version is committed, but the txn of `runDDLJob` is not or fail. // It is safe to skip the empty diff because the infoschema is new enough and consistent. + logutil.BgLogger().Info("diff load InfoSchema get empty schema diff", zap.Int64("version", usedVersion)) + do.infoCache.InsertEmptySchemaVersion(usedVersion) continue } diffs = append(diffs, diff) diff --git a/pkg/infoschema/cache.go b/pkg/infoschema/cache.go index f3d4d67e576c6..2d5355cb4fe5a 100644 --- a/pkg/infoschema/cache.go +++ b/pkg/infoschema/cache.go @@ -32,6 +32,9 @@ type InfoCache struct { // cache is sorted by both SchemaVersion and timestamp in descending order, assume they have same order cache []schemaAndTimestamp + // emptySchemaVersions stores schema version which has no schema_diff. + emptySchemaVersions map[int64]struct{} + r autoid.Requirement Data *Data } @@ -45,9 +48,10 @@ type schemaAndTimestamp struct { func NewCache(r autoid.Requirement, capacity int) *InfoCache { infoData := NewData() return &InfoCache{ - cache: make([]schemaAndTimestamp, 0, capacity), - r: r, - Data: infoData, + cache: make([]schemaAndTimestamp, 0, capacity), + emptySchemaVersions: make(map[int64]struct{}), + r: r, + Data: infoData, } } @@ -99,6 +103,11 @@ func (h *InfoCache) Len() int { return len(h.cache) } +// GetEmptySchemaVersions returns emptySchemaVersions, exports for testing. +func (h *InfoCache) GetEmptySchemaVersions() map[int64]struct{} { + return h.emptySchemaVersions +} + func (h *InfoCache) getSchemaByTimestampNoLock(ts uint64) (InfoSchema, bool) { logutil.BgLogger().Debug("SCHEMA CACHE get schema", zap.Uint64("timestamp", ts)) // search one by one instead of binary search, because the timestamp of a schema could be 0 @@ -116,11 +125,32 @@ func (h *InfoCache) getSchemaByTimestampNoLock(ts uint64) (InfoSchema, bool) { // the first element is the latest schema, so we can return it directly. return is.infoschema, true } - if h.cache[i-1].infoschema.SchemaMetaVersion() == is.infoschema.SchemaMetaVersion()+1 && uint64(h.cache[i-1].timestamp) > ts { - // This first condition is to make sure the schema version is continuous. If last(cache[i-1]) schema-version is 10, - // but current(cache[i]) schema-version is not 9, then current schema is not suitable for ts. - // The second condition is to make sure the cache[i-1].timestamp > ts >= cache[i].timestamp, then the current schema is suitable for ts. - return is.infoschema, true + + if uint64(h.cache[i-1].timestamp) > ts { + // The first condition is to make sure the cache[i-1].timestamp > ts >= cache[i].timestamp, then the current schema is suitable for ts. + lastVersion := h.cache[i-1].infoschema.SchemaMetaVersion() + currentVersion := is.infoschema.SchemaMetaVersion() + if lastVersion == currentVersion+1 { + // This condition is to make sure the schema version is continuous. If last(cache[i-1]) schema-version is 10, + // but current(cache[i]) schema-version is not 9, then current schema may not suitable for ts. + return is.infoschema, true + } + if lastVersion > currentVersion { + found := true + for ver := currentVersion + 1; ver < lastVersion; ver++ { + _, ok := h.emptySchemaVersions[ver] + if !ok { + found = false + break + } + } + if found { + // This condition is to make sure the schema version is continuous. If last(cache[i-1]) schema-version is 10, and + // current(cache[i]) schema-version is 8, then there is a gap exist, and if all the gap version can be found in cache.emptySchemaVersions + // which means those gap versions don't have schema info, then current schema is also suitable for ts. + return is.infoschema, true + } + } } // current schema is not suitable for ts, then break the loop to avoid the unnecessary search. break @@ -231,3 +261,25 @@ func (h *InfoCache) Insert(is InfoSchema, schemaTS uint64) bool { return true } + +// InsertEmptySchemaVersion inserts empty schema version into a map. If exceeded the cache capacity, remove the oldest version. +func (h *InfoCache) InsertEmptySchemaVersion(version int64) { + h.mu.Lock() + defer h.mu.Unlock() + + h.emptySchemaVersions[version] = struct{}{} + if len(h.emptySchemaVersions) > cap(h.cache) { + // remove oldest version. + versions := make([]int64, 0, len(h.emptySchemaVersions)) + for ver := range h.emptySchemaVersions { + versions = append(versions, ver) + } + sort.Slice(versions, func(i, j int) bool { return versions[i] < versions[j] }) + for _, ver := range versions { + delete(h.emptySchemaVersions, ver) + if len(h.emptySchemaVersions) <= cap(h.cache) { + break + } + } + } +} diff --git a/pkg/infoschema/test/cachetest/BUILD.bazel b/pkg/infoschema/test/cachetest/BUILD.bazel index 0f60ab6a233ac..8fe7aa7f379e8 100644 --- a/pkg/infoschema/test/cachetest/BUILD.bazel +++ b/pkg/infoschema/test/cachetest/BUILD.bazel @@ -8,7 +8,7 @@ go_test( "main_test.go", ], flaky = True, - shard_count = 7, + shard_count = 8, deps = [ "//pkg/infoschema", "//pkg/testkit/testsetup", diff --git a/pkg/infoschema/test/cachetest/cache_test.go b/pkg/infoschema/test/cachetest/cache_test.go index 4f9f9249a3f66..5823bd27c7887 100644 --- a/pkg/infoschema/test/cachetest/cache_test.go +++ b/pkg/infoschema/test/cachetest/cache_test.go @@ -290,4 +290,51 @@ func TestCacheWithSchemaTsZero(t *testing.T) { checkFn(1, 84, false) checkFn(85, 100, true) require.Equal(t, 16, ic.Size()) + + // Test cache with schema version hole, which is cause by schema version doesn't has related schema-diff. + ic = infoschema.NewCache(nil, 16) + require.NotNil(t, ic) + for i := 1; i <= 8; i++ { + ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, int64(i)), uint64(i)) + } + checkFn(1, 10, true) + // mock for schema version hole, schema-version 9 is missing. + ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 10), 10) + checkFn(1, 7, true) + // without empty schema version map, get snapshot by ts 8, 9 will both failed. + checkFn(8, 9, false) + checkFn(10, 10, true) + // add empty schema version 9. + ic.InsertEmptySchemaVersion(9) + // after set empty schema version, get snapshot by ts 8, 9 will both success. + checkFn(1, 8, true) + checkFn(10, 10, true) + is := ic.GetBySnapshotTS(uint64(9)) + require.NotNil(t, is) + // since schema version 9 is empty, so get by ts 9 will get schema which version is 8. + require.Equal(t, int64(8), is.SchemaMetaVersion()) +} + +func TestCacheEmptySchemaVersion(t *testing.T) { + ic := infoschema.NewCache(nil, 16) + require.NotNil(t, ic) + require.Equal(t, 0, len(ic.GetEmptySchemaVersions())) + for i := 0; i < 16; i++ { + ic.InsertEmptySchemaVersion(int64(i)) + } + emptyVersions := ic.GetEmptySchemaVersions() + require.Equal(t, 16, len(emptyVersions)) + for i := 0; i < 16; i++ { + _, ok := emptyVersions[int64(i)] + require.True(t, ok) + } + for i := 16; i < 20; i++ { + ic.InsertEmptySchemaVersion(int64(i)) + } + emptyVersions = ic.GetEmptySchemaVersions() + require.Equal(t, 16, len(emptyVersions)) + for i := 4; i < 20; i++ { + _, ok := emptyVersions[int64(i)] + require.True(t, ok) + } }