Skip to content

Commit

Permalink
infoschema: fix issue of information schema cache miss cause by schem…
Browse files Browse the repository at this point in the history
…a version gap (#53445) (#53562)

* infoschema: fix issue of information schema cache miss cause by schema version gap (#53445)

close #53428

Signed-off-by: crazycs520 <crazycs520@gmail.com>
  • Loading branch information
crazycs520 authored May 28, 2024
1 parent 4be26db commit a62856d
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 14 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -3703,8 +3703,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:M0DudF9JLF1GbmdoxM7LK8pNaiLAReHihdk5nwnB8pg=",
version = "v2.0.4-0.20240125040124-4c64e5d5d57d",
sum = "h1:NXhdhxU4dibxCmMChAOpyBXzuqi2VKp17nSx1HUg4HU=",
version = "v2.0.4-0.20240521070200-f9fbc4c8f578",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down
2 changes: 1 addition & 1 deletion distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func TestSelectResultRuntimeStats(t *testing.T) {
}
stmtStats.RegisterStats(2, s1)
stats = stmtStats.GetRootStats(2)
expect = "cop_task: {num: 2, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 1s, tot_wait: 1s, rpc_num: 1, rpc_time: 1s, copr_cache_hit_ratio: 0.00, distsql_concurrency: 15}, backoff{RegionMiss: 1ms}"
expect = "cop_task: {num: 2, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 1s, tot_wait: 1s, copr_cache_hit_ratio: 0.00, distsql_concurrency: 15}, rpc_info:{Cop:{num_rpc:1, total_time:1s}}, backoff{RegionMiss: 1ms}"
require.Equal(t, expect, stats.String())
// Test for idempotence.
require.Equal(t, expect, stats.String())
Expand Down
17 changes: 11 additions & 6 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
// 3. There are less 100 diffs.
startTime := time.Now()
if currentSchemaVersion != 0 && neededSchemaVersion > currentSchemaVersion && neededSchemaVersion-currentSchemaVersion < 100 {
is, relatedChanges, err := do.tryLoadSchemaDiffs(m, currentSchemaVersion, neededSchemaVersion)
is, relatedChanges, diffTypes, err := do.tryLoadSchemaDiffs(m, currentSchemaVersion, neededSchemaVersion)
if err == nil {
loadSchemaDurationLoadDiff.Observe(time.Since(startTime).Seconds())
do.infoCache.Insert(is, uint64(schemaTs))
Expand All @@ -226,7 +226,8 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
zap.Int64("neededSchemaVersion", neededSchemaVersion),
zap.Duration("start time", time.Since(startTime)),
zap.Int64s("phyTblIDs", relatedChanges.PhyTblIDS),
zap.Uint64s("actionTypes", relatedChanges.ActionTypes))
zap.Uint64s("actionTypes", relatedChanges.ActionTypes),
zap.Strings("diffTypes", diffTypes))
return is, false, currentSchemaVersion, relatedChanges, nil
}
// We can fall back to full load, don't need to return the error.
Expand Down Expand Up @@ -369,17 +370,19 @@ func (do *Domain) fetchSchemasWithTables(schemas []*model.DBInfo, m *meta.Meta,
// Return true if the schema is loaded successfully.
// Return false if the schema can not be loaded by schema diff, then we need to do full load.
// The second returned value is the delta updated table and partition IDs.
func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64) (infoschema.InfoSchema, *transaction.RelatedSchemaChange, error) {
func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64) (infoschema.InfoSchema, *transaction.RelatedSchemaChange, []string, error) {
var diffs []*model.SchemaDiff
for usedVersion < newVersion {
usedVersion++
diff, err := m.GetSchemaDiff(usedVersion)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
if diff == nil {
// Empty diff means the txn of generating schema version is committed, but the txn of `runDDLJob` is not or fail.
// It is safe to skip the empty diff because the infoschema is new enough and consistent.
logutil.BgLogger().Info("diff load InfoSchema get empty schema diff", zap.Int64("version", usedVersion))
do.infoCache.InsertEmptySchemaVersion(usedVersion)
continue
}
diffs = append(diffs, diff)
Expand All @@ -388,14 +391,16 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64
builder.SetDeltaUpdateBundles()
phyTblIDs := make([]int64, 0, len(diffs))
actions := make([]uint64, 0, len(diffs))
diffTypes := make([]string, 0, len(diffs))
for _, diff := range diffs {
IDs, err := builder.ApplyDiff(m, diff)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
if canSkipSchemaCheckerDDL(diff.Type) {
continue
}
diffTypes = append(diffTypes, diff.Type.String())
phyTblIDs = append(phyTblIDs, IDs...)
for i := 0; i < len(IDs); i++ {
actions = append(actions, uint64(1<<diff.Type))
Expand All @@ -406,7 +411,7 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64
relatedChange := transaction.RelatedSchemaChange{}
relatedChange.PhyTblIDS = phyTblIDs
relatedChange.ActionTypes = actions
return is, &relatedChange, nil
return is, &relatedChange, diffTypes, nil
}

func canSkipSchemaCheckerDDL(tp model.ActionType) bool {
Expand Down
61 changes: 58 additions & 3 deletions infoschema/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type InfoCache struct {
mu sync.RWMutex
// cache is sorted by both SchemaVersion and timestamp in descending order, assume they have same order
cache []schemaAndTimestamp

// emptySchemaVersions stores schema version which has no schema_diff.
emptySchemaVersions map[int64]struct{}
}

type schemaAndTimestamp struct {
Expand All @@ -50,7 +53,8 @@ type schemaAndTimestamp struct {
// NewCache creates a new InfoCache.
func NewCache(capacity int) *InfoCache {
return &InfoCache{
cache: make([]schemaAndTimestamp, 0, capacity),
cache: make([]schemaAndTimestamp, 0, capacity),
emptySchemaVersions: make(map[int64]struct{}),
}
}

Expand Down Expand Up @@ -102,6 +106,11 @@ func (h *InfoCache) Len() int {
return len(h.cache)
}

// GetEmptySchemaVersions returns emptySchemaVersions, exports for testing.
func (h *InfoCache) GetEmptySchemaVersions() map[int64]struct{} {
return h.emptySchemaVersions
}

func (h *InfoCache) getSchemaByTimestampNoLock(ts uint64) (InfoSchema, bool) {
logutil.BgLogger().Debug("SCHEMA CACHE get schema", zap.Uint64("timestamp", ts))
// search one by one instead of binary search, because the timestamp of a schema could be 0
Expand All @@ -115,8 +124,32 @@ func (h *InfoCache) getSchemaByTimestampNoLock(ts uint64) (InfoSchema, bool) {
if i == 0 {
return is.infoschema, true
}
if h.cache[i-1].infoschema.SchemaMetaVersion() == is.infoschema.SchemaMetaVersion()+1 && uint64(h.cache[i-1].timestamp) > ts {
return is.infoschema, true

if uint64(h.cache[i-1].timestamp) > ts {
// The first condition is to make sure the cache[i-1].timestamp > ts >= cache[i].timestamp, then the current schema is suitable for ts.
lastVersion := h.cache[i-1].infoschema.SchemaMetaVersion()
currentVersion := is.infoschema.SchemaMetaVersion()
if lastVersion == currentVersion+1 {
// This condition is to make sure the schema version is continuous. If last(cache[i-1]) schema-version is 10,
// but current(cache[i]) schema-version is not 9, then current schema may not suitable for ts.
return is.infoschema, true
}
if lastVersion > currentVersion {
found := true
for ver := currentVersion + 1; ver < lastVersion; ver++ {
_, ok := h.emptySchemaVersions[ver]
if !ok {
found = false
break
}
}
if found {
// This condition is to make sure the schema version is continuous. If last(cache[i-1]) schema-version is 10, and
// current(cache[i]) schema-version is 8, then there is a gap exist, and if all the gap version can be found in cache.emptySchemaVersions
// which means those gap versions don't have schema info, then current schema is also suitable for ts.
return is.infoschema, true
}
}
}
break
}
Expand Down Expand Up @@ -225,3 +258,25 @@ func (h *InfoCache) Insert(is InfoSchema, schemaTS uint64) bool {

return true
}

// InsertEmptySchemaVersion inserts empty schema version into a map. If exceeded the cache capacity, remove the oldest version.
func (h *InfoCache) InsertEmptySchemaVersion(version int64) {
h.mu.Lock()
defer h.mu.Unlock()

h.emptySchemaVersions[version] = struct{}{}
if len(h.emptySchemaVersions) > cap(h.cache) {
// remove oldest version.
versions := make([]int64, 0, len(h.emptySchemaVersions))
for ver := range h.emptySchemaVersions {
versions = append(versions, ver)
}
sort.Slice(versions, func(i, j int) bool { return versions[i] < versions[j] })
for _, ver := range versions {
delete(h.emptySchemaVersions, ver)
if len(h.emptySchemaVersions) <= cap(h.cache) {
break
}
}
}
}
47 changes: 47 additions & 0 deletions infoschema/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,4 +290,51 @@ func TestCacheWithSchemaTsZero(t *testing.T) {
checkFn(1, 84, false)
checkFn(85, 100, true)
require.Equal(t, 16, ic.Size())

// Test cache with schema version hole, which is cause by schema version doesn't has related schema-diff.
ic = infoschema.NewCache(16)
require.NotNil(t, ic)
for i := 1; i <= 8; i++ {
ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, int64(i)), uint64(i))
}
checkFn(1, 10, true)
// mock for schema version hole, schema-version 9 is missing.
ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 10), 10)
checkFn(1, 7, true)
// without empty schema version map, get snapshot by ts 8, 9 will both failed.
checkFn(8, 9, false)
checkFn(10, 10, true)
// add empty schema version 9.
ic.InsertEmptySchemaVersion(9)
// after set empty schema version, get snapshot by ts 8, 9 will both success.
checkFn(1, 8, true)
checkFn(10, 10, true)
is := ic.GetBySnapshotTS(uint64(9))
require.NotNil(t, is)
// since schema version 9 is empty, so get by ts 9 will get schema which version is 8.
require.Equal(t, int64(8), is.SchemaMetaVersion())
}

func TestCacheEmptySchemaVersion(t *testing.T) {
ic := infoschema.NewCache(16)
require.NotNil(t, ic)
require.Equal(t, 0, len(ic.GetEmptySchemaVersions()))
for i := 0; i < 16; i++ {
ic.InsertEmptySchemaVersion(int64(i))
}
emptyVersions := ic.GetEmptySchemaVersions()
require.Equal(t, 16, len(emptyVersions))
for i := 0; i < 16; i++ {
_, ok := emptyVersions[int64(i)]
require.True(t, ok)
}
for i := 16; i < 20; i++ {
ic.InsertEmptySchemaVersion(int64(i))
}
emptyVersions = ic.GetEmptySchemaVersions()
require.Equal(t, 16, len(emptyVersions))
for i := 4; i < 20; i++ {
_, ok := emptyVersions[int64(i)]
require.True(t, ok)
}
}
4 changes: 2 additions & 2 deletions tests/realtikvtest/sessiontest/session_fail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func TestTiKVClientReadTimeout(t *testing.T) {
rows = tk.MustQuery("explain analyze select /*+ set_var(tikv_client_read_timeout=1) */ * from t as of timestamp(@stale_read_ts_var) where b > 1").Rows()
require.Len(t, rows, 3)
explain = fmt.Sprintf("%v", rows[0])
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .*num_rpc:(3|4).*", explain)
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .*num_rpc:(3|4|5).*", explain)

// Test for tikv_client_read_timeout session variable.
tk.MustExec("set @@tikv_client_read_timeout=1;")
Expand Down Expand Up @@ -282,5 +282,5 @@ func TestTiKVClientReadTimeout(t *testing.T) {
rows = tk.MustQuery("explain analyze select * from t as of timestamp(@stale_read_ts_var) where b > 1").Rows()
require.Len(t, rows, 3)
explain = fmt.Sprintf("%v", rows[0])
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, *num_rpc:(3|4).*", explain)
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .*num_rpc:(3|4|5).*", explain)
}
1 change: 1 addition & 0 deletions util/logutil/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"@com_github_opentracing_opentracing_go//log",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_log//:log",
"@com_github_tikv_client_go_v2//tikv",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//buffer",
"@org_uber_go_zap//zapcore",
Expand Down

0 comments on commit a62856d

Please sign in to comment.