From 339d391fafa0ccb7d2bc09eef57d3f3613050e9c Mon Sep 17 00:00:00 2001 From: crazycs Date: Mon, 27 May 2024 12:05:49 +0800 Subject: [PATCH 1/5] infoschema: fix issue of information schema cache miss cause by schema version gap (#53445) close pingcap/tidb#53428 Signed-off-by: crazycs520 --- domain/domain.go | 2 ++ infoschema/cache.go | 61 ++++++++++++++++++++++++++++++++++++++-- infoschema/cache_test.go | 47 +++++++++++++++++++++++++++++++ 3 files changed, 107 insertions(+), 3 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index d1a445345cc9b..dc113fc34da9f 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -380,6 +380,8 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64 if diff == nil { // Empty diff means the txn of generating schema version is committed, but the txn of `runDDLJob` is not or fail. // It is safe to skip the empty diff because the infoschema is new enough and consistent. + logutil.BgLogger().Info("diff load InfoSchema get empty schema diff", zap.Int64("version", usedVersion)) + do.infoCache.InsertEmptySchemaVersion(usedVersion) continue } diffs = append(diffs, diff) diff --git a/infoschema/cache.go b/infoschema/cache.go index 1005ad4354424..3ca2f3828e7b2 100644 --- a/infoschema/cache.go +++ b/infoschema/cache.go @@ -40,6 +40,9 @@ type InfoCache struct { mu sync.RWMutex // cache is sorted by both SchemaVersion and timestamp in descending order, assume they have same order cache []schemaAndTimestamp + + // emptySchemaVersions stores schema version which has no schema_diff. + emptySchemaVersions map[int64]struct{} } type schemaAndTimestamp struct { @@ -50,7 +53,8 @@ type schemaAndTimestamp struct { // NewCache creates a new InfoCache. func NewCache(capacity int) *InfoCache { return &InfoCache{ - cache: make([]schemaAndTimestamp, 0, capacity), + cache: make([]schemaAndTimestamp, 0, capacity), + emptySchemaVersions: make(map[int64]struct{}), } } @@ -102,6 +106,11 @@ func (h *InfoCache) Len() int { return len(h.cache) } +// GetEmptySchemaVersions returns emptySchemaVersions, exports for testing. +func (h *InfoCache) GetEmptySchemaVersions() map[int64]struct{} { + return h.emptySchemaVersions +} + func (h *InfoCache) getSchemaByTimestampNoLock(ts uint64) (InfoSchema, bool) { logutil.BgLogger().Debug("SCHEMA CACHE get schema", zap.Uint64("timestamp", ts)) // search one by one instead of binary search, because the timestamp of a schema could be 0 @@ -115,8 +124,32 @@ func (h *InfoCache) getSchemaByTimestampNoLock(ts uint64) (InfoSchema, bool) { if i == 0 { return is.infoschema, true } - if h.cache[i-1].infoschema.SchemaMetaVersion() == is.infoschema.SchemaMetaVersion()+1 && uint64(h.cache[i-1].timestamp) > ts { - return is.infoschema, true + + if uint64(h.cache[i-1].timestamp) > ts { + // The first condition is to make sure the cache[i-1].timestamp > ts >= cache[i].timestamp, then the current schema is suitable for ts. + lastVersion := h.cache[i-1].infoschema.SchemaMetaVersion() + currentVersion := is.infoschema.SchemaMetaVersion() + if lastVersion == currentVersion+1 { + // This condition is to make sure the schema version is continuous. If last(cache[i-1]) schema-version is 10, + // but current(cache[i]) schema-version is not 9, then current schema may not suitable for ts. + return is.infoschema, true + } + if lastVersion > currentVersion { + found := true + for ver := currentVersion + 1; ver < lastVersion; ver++ { + _, ok := h.emptySchemaVersions[ver] + if !ok { + found = false + break + } + } + if found { + // This condition is to make sure the schema version is continuous. If last(cache[i-1]) schema-version is 10, and + // current(cache[i]) schema-version is 8, then there is a gap exist, and if all the gap version can be found in cache.emptySchemaVersions + // which means those gap versions don't have schema info, then current schema is also suitable for ts. + return is.infoschema, true + } + } } break } @@ -225,3 +258,25 @@ func (h *InfoCache) Insert(is InfoSchema, schemaTS uint64) bool { return true } + +// InsertEmptySchemaVersion inserts empty schema version into a map. If exceeded the cache capacity, remove the oldest version. +func (h *InfoCache) InsertEmptySchemaVersion(version int64) { + h.mu.Lock() + defer h.mu.Unlock() + + h.emptySchemaVersions[version] = struct{}{} + if len(h.emptySchemaVersions) > cap(h.cache) { + // remove oldest version. + versions := make([]int64, 0, len(h.emptySchemaVersions)) + for ver := range h.emptySchemaVersions { + versions = append(versions, ver) + } + sort.Slice(versions, func(i, j int) bool { return versions[i] < versions[j] }) + for _, ver := range versions { + delete(h.emptySchemaVersions, ver) + if len(h.emptySchemaVersions) <= cap(h.cache) { + break + } + } + } +} diff --git a/infoschema/cache_test.go b/infoschema/cache_test.go index 7cc3126344a47..1290ee39544c3 100644 --- a/infoschema/cache_test.go +++ b/infoschema/cache_test.go @@ -290,4 +290,51 @@ func TestCacheWithSchemaTsZero(t *testing.T) { checkFn(1, 84, false) checkFn(85, 100, true) require.Equal(t, 16, ic.Size()) + + // Test cache with schema version hole, which is cause by schema version doesn't has related schema-diff. + ic = infoschema.NewCache(16) + require.NotNil(t, ic) + for i := 1; i <= 8; i++ { + ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, int64(i)), uint64(i)) + } + checkFn(1, 10, true) + // mock for schema version hole, schema-version 9 is missing. + ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 10), 10) + checkFn(1, 7, true) + // without empty schema version map, get snapshot by ts 8, 9 will both failed. + checkFn(8, 9, false) + checkFn(10, 10, true) + // add empty schema version 9. + ic.InsertEmptySchemaVersion(9) + // after set empty schema version, get snapshot by ts 8, 9 will both success. + checkFn(1, 8, true) + checkFn(10, 10, true) + is := ic.GetBySnapshotTS(uint64(9)) + require.NotNil(t, is) + // since schema version 9 is empty, so get by ts 9 will get schema which version is 8. + require.Equal(t, int64(8), is.SchemaMetaVersion()) +} + +func TestCacheEmptySchemaVersion(t *testing.T) { + ic := infoschema.NewCache(16) + require.NotNil(t, ic) + require.Equal(t, 0, len(ic.GetEmptySchemaVersions())) + for i := 0; i < 16; i++ { + ic.InsertEmptySchemaVersion(int64(i)) + } + emptyVersions := ic.GetEmptySchemaVersions() + require.Equal(t, 16, len(emptyVersions)) + for i := 0; i < 16; i++ { + _, ok := emptyVersions[int64(i)] + require.True(t, ok) + } + for i := 16; i < 20; i++ { + ic.InsertEmptySchemaVersion(int64(i)) + } + emptyVersions = ic.GetEmptySchemaVersions() + require.Equal(t, 16, len(emptyVersions)) + for i := 4; i < 20; i++ { + _, ok := emptyVersions[int64(i)] + require.True(t, ok) + } } From a9314f255d282ec14068c2fee832cb4ef05d47fa Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Mon, 27 May 2024 14:05:22 +0800 Subject: [PATCH 2/5] make bazel_prepare Signed-off-by: crazycs520 --- DEPS.bzl | 4 ++-- util/logutil/BUILD.bazel | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index d31f30b6eed03..7695276e877bc 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -3703,8 +3703,8 @@ def go_deps(): name = "com_github_tikv_client_go_v2", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/client-go/v2", - sum = "h1:M0DudF9JLF1GbmdoxM7LK8pNaiLAReHihdk5nwnB8pg=", - version = "v2.0.4-0.20240125040124-4c64e5d5d57d", + sum = "h1:NXhdhxU4dibxCmMChAOpyBXzuqi2VKp17nSx1HUg4HU=", + version = "v2.0.4-0.20240521070200-f9fbc4c8f578", ) go_repository( name = "com_github_tikv_pd_client", diff --git a/util/logutil/BUILD.bazel b/util/logutil/BUILD.bazel index 3abf806e9290b..5b3bf9d7a8da1 100644 --- a/util/logutil/BUILD.bazel +++ b/util/logutil/BUILD.bazel @@ -16,6 +16,7 @@ go_library( "@com_github_opentracing_opentracing_go//log", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_log//:log", + "@com_github_tikv_client_go_v2//tikv", "@org_uber_go_zap//:zap", "@org_uber_go_zap//buffer", "@org_uber_go_zap//zapcore", From ca69fe268226edf16c2c835654be353a20ac7ce1 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Mon, 27 May 2024 14:42:29 +0800 Subject: [PATCH 3/5] domain: improve information log information (#47810) Signed-off-by: crazycs520 --- domain/domain.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index dc113fc34da9f..30733b9512793 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -217,7 +217,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i // 3. There are less 100 diffs. startTime := time.Now() if currentSchemaVersion != 0 && neededSchemaVersion > currentSchemaVersion && neededSchemaVersion-currentSchemaVersion < 100 { - is, relatedChanges, err := do.tryLoadSchemaDiffs(m, currentSchemaVersion, neededSchemaVersion) + is, relatedChanges, diffTypes, err := do.tryLoadSchemaDiffs(m, currentSchemaVersion, neededSchemaVersion) if err == nil { loadSchemaDurationLoadDiff.Observe(time.Since(startTime).Seconds()) do.infoCache.Insert(is, uint64(schemaTs)) @@ -226,7 +226,8 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i zap.Int64("neededSchemaVersion", neededSchemaVersion), zap.Duration("start time", time.Since(startTime)), zap.Int64s("phyTblIDs", relatedChanges.PhyTblIDS), - zap.Uint64s("actionTypes", relatedChanges.ActionTypes)) + zap.Uint64s("actionTypes", relatedChanges.ActionTypes), + zap.Strings("diffTypes", diffTypes)) return is, false, currentSchemaVersion, relatedChanges, nil } // We can fall back to full load, don't need to return the error. @@ -369,13 +370,13 @@ func (do *Domain) fetchSchemasWithTables(schemas []*model.DBInfo, m *meta.Meta, // Return true if the schema is loaded successfully. // Return false if the schema can not be loaded by schema diff, then we need to do full load. // The second returned value is the delta updated table and partition IDs. -func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64) (infoschema.InfoSchema, *transaction.RelatedSchemaChange, error) { +func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64) (infoschema.InfoSchema, *transaction.RelatedSchemaChange, []string, error) { var diffs []*model.SchemaDiff for usedVersion < newVersion { usedVersion++ diff, err := m.GetSchemaDiff(usedVersion) if err != nil { - return nil, nil, err + return nil, nil, nil, err } if diff == nil { // Empty diff means the txn of generating schema version is committed, but the txn of `runDDLJob` is not or fail. @@ -390,14 +391,16 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64 builder.SetDeltaUpdateBundles() phyTblIDs := make([]int64, 0, len(diffs)) actions := make([]uint64, 0, len(diffs)) + diffTypes := make([]string, 0, len(diffs)) for _, diff := range diffs { IDs, err := builder.ApplyDiff(m, diff) if err != nil { - return nil, nil, err + return nil, nil, nil, err } if canSkipSchemaCheckerDDL(diff.Type) { continue } + diffTypes = append(diffTypes, diff.Type.String()) phyTblIDs = append(phyTblIDs, IDs...) for i := 0; i < len(IDs); i++ { actions = append(actions, uint64(1< Date: Mon, 27 May 2024 16:17:42 +0800 Subject: [PATCH 4/5] fix test Signed-off-by: crazycs520 --- tests/realtikvtest/sessiontest/session_fail_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/realtikvtest/sessiontest/session_fail_test.go b/tests/realtikvtest/sessiontest/session_fail_test.go index 1311ed10e88f0..87cc9592abe1c 100644 --- a/tests/realtikvtest/sessiontest/session_fail_test.go +++ b/tests/realtikvtest/sessiontest/session_fail_test.go @@ -254,7 +254,7 @@ func TestTiKVClientReadTimeout(t *testing.T) { rows = tk.MustQuery("explain analyze select /*+ set_var(tikv_client_read_timeout=1) */ * from t as of timestamp(@stale_read_ts_var) where b > 1").Rows() require.Len(t, rows, 3) explain = fmt.Sprintf("%v", rows[0]) - require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .*num_rpc:(3|4).*", explain) + require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .*num_rpc:(3|4|5).*", explain) // Test for tikv_client_read_timeout session variable. tk.MustExec("set @@tikv_client_read_timeout=1;") @@ -282,5 +282,5 @@ func TestTiKVClientReadTimeout(t *testing.T) { rows = tk.MustQuery("explain analyze select * from t as of timestamp(@stale_read_ts_var) where b > 1").Rows() require.Len(t, rows, 3) explain = fmt.Sprintf("%v", rows[0]) - require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, *num_rpc:(3|4).*", explain) + require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .*num_rpc:(3|4|5).*", explain) } From 984c0f25396a43031d5d0232d3f83c22a56eadd3 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Mon, 27 May 2024 16:47:01 +0800 Subject: [PATCH 5/5] fix test Signed-off-by: crazycs520 --- distsql/distsql_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index b462546711ac0..514dd3c4df963 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -137,7 +137,7 @@ func TestSelectResultRuntimeStats(t *testing.T) { } stmtStats.RegisterStats(2, s1) stats = stmtStats.GetRootStats(2) - expect = "cop_task: {num: 2, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 1s, tot_wait: 1s, rpc_num: 1, rpc_time: 1s, copr_cache_hit_ratio: 0.00, distsql_concurrency: 15}, backoff{RegionMiss: 1ms}" + expect = "cop_task: {num: 2, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 1s, tot_wait: 1s, copr_cache_hit_ratio: 0.00, distsql_concurrency: 15}, rpc_info:{Cop:{num_rpc:1, total_time:1s}}, backoff{RegionMiss: 1ms}" require.Equal(t, expect, stats.String()) // Test for idempotence. require.Equal(t, expect, stats.String())