Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
* cache schema version by timestamp

* fix test

* avoid recursive locking

* address comment

* optimize the code

* fix 40740

* use the commit ts of the shcema diff as the timestamp of the schema

* address comment

* address comment

* address comment

* fix CI

Signed-off-by: xhe <xw897002528@gmail.com>

* fix CI

Signed-off-by: xhe <xw897002528@gmail.com>

* Update meta/meta.go

* Update domain/domain.go

* fix test

---------

Signed-off-by: xhe <xw897002528@gmail.com>
Co-authored-by: xhe <xw897002528@gmail.com>
Co-authored-by: CbcWestwolf <1004626265@qq.com>

fix wrong timestamp in schema cache

address comment

format

Co-authored-by: Chen Ding <dingc05@gmail.com>
  • Loading branch information
2 people authored and GitHub Enterprise committed Aug 29, 2023
1 parent f66959f commit a62fcb7
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 59 deletions.
1 change: 1 addition & 0 deletions domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ go_library(
"//sessionctx/variable",
"//statistics",
"//statistics/handle",
"//store/helper",
"//telemetry",
"//ttl/ttlworker",
"//types",
Expand Down
33 changes: 30 additions & 3 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import (
"github.com/pingcap/tidb/sessionctx/sessionstates"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/telemetry"
"github.com/pingcap/tidb/ttl/ttlworker"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -86,7 +87,7 @@ func NewMockDomain() *Domain {
do := &Domain{
infoCache: infoschema.NewCache(1),
}
do.infoCache.Insert(infoschema.MockInfoSchema(nil), 1)
do.infoCache.Insert(infoschema.MockInfoSchema(nil), 0)
return do
}

Expand Down Expand Up @@ -177,8 +178,17 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
if err != nil {
return nil, false, 0, nil, err
}
// fetch the commit timestamp of the schema diff
schemaTs, err := do.getTimestampForSchemaVersionWithNonEmptyDiff(m, neededSchemaVersion)
if err != nil {
logutil.BgLogger().Warn("failed to get schema version", zap.Error(err), zap.Int64("version", neededSchemaVersion))
schemaTs = 0
}

if is := do.infoCache.GetByVersion(neededSchemaVersion); is != nil {
// try to insert here as well to correct the schemaTs if previous is wrong
// the insert method check if schemaTs is zero
do.infoCache.Insert(is, uint64(schemaTs))
return is, true, 0, nil, nil
}

Expand All @@ -197,7 +207,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
if currentSchemaVersion != 0 && neededSchemaVersion > currentSchemaVersion && neededSchemaVersion-currentSchemaVersion < 100 {
is, relatedChanges, err := do.tryLoadSchemaDiffs(m, currentSchemaVersion, neededSchemaVersion)
if err == nil {
do.infoCache.Insert(is, startTS)
do.infoCache.Insert(is, uint64(schemaTs))
logutil.BgLogger().Info("diff load InfoSchema success",
zap.Int64("currentSchemaVersion", currentSchemaVersion),
zap.Int64("neededSchemaVersion", neededSchemaVersion),
Expand Down Expand Up @@ -230,10 +240,27 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
zap.Duration("start time", time.Since(startTime)))

is := newISBuilder.Build()
do.infoCache.Insert(is, startTS)
do.infoCache.Insert(is, uint64(schemaTs))
return is, false, currentSchemaVersion, nil, nil
}

// Returns the timestamp of a schema version, which is the commit timestamp of the schema diff
func (do *Domain) getTimestampForSchemaVersionWithNonEmptyDiff(m *meta.Meta, version int64) (int64, error) {
tikvStore, ok := do.Store().(helper.Storage)
if ok {
helper := helper.NewHelper(tikvStore)
data, err := helper.GetMvccByEncodedKey(m.EncodeSchemaDiffKey(version))
if err != nil {
return 0, err
}
if data == nil || data.Info == nil || len(data.Info.Writes) == 0 {
return 0, errors.Errorf("There is no Write MVCC info for the schema version")
}
return int64(data.Info.Writes[0].CommitTs), nil
}
return 0, errors.Errorf("cannot get store from domain")
}

func (do *Domain) sysFacHack() (pools.Resource, error) {
// TODO: Here we create new sessions with sysFac in DDL,
// which will use `do` as Domain instead of call `domap.Get`.
Expand Down
114 changes: 84 additions & 30 deletions infoschema/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"sync"

"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

var (
Expand All @@ -36,15 +38,27 @@ var (
// It only promised to cache the infoschema, if it is newer than all the cached.
type InfoCache struct {
mu sync.RWMutex
// cache is sorted by SchemaVersion in descending order
cache []InfoSchema
// record SnapshotTS of the latest schema Insert.
maxUpdatedSnapshotTS uint64
// cache is sorted by both SchemaVersion and timestamp in descending order, assume they have same order
cache []schemaAndTimestamp
}

type schemaAndTimestamp struct {
infoschema InfoSchema
timestamp int64
}

// NewCache creates a new InfoCache.
func NewCache(capcity int) *InfoCache {
return &InfoCache{cache: make([]InfoSchema, 0, capcity)}
func NewCache(capacity int) *InfoCache {
return &InfoCache{
cache: make([]schemaAndTimestamp, 0, capacity),
}
}

// Reset resets the cache.
func (h *InfoCache) Reset(capacity int) {
h.mu.Lock()
defer h.mu.Unlock()
h.cache = make([]schemaAndTimestamp, 0, capacity)
}

// GetLatest gets the newest information schema.
Expand All @@ -54,18 +68,49 @@ func (h *InfoCache) GetLatest() InfoSchema {
getLatestCounter.Inc()
if len(h.cache) > 0 {
hitLatestCounter.Inc()
return h.cache[0]
return h.cache[0].infoschema
}
return nil
}

// Len returns the size of the cache
func (h *InfoCache) Len() int {
return len(h.cache)
}

func (h *InfoCache) getSchemaByTimestampNoLock(ts uint64) (InfoSchema, bool) {
logutil.BgLogger().Debug("SCHEMA CACHE get schema", zap.Uint64("timestamp", ts))
// search one by one instead of binary search, because the timestamp of a schema could be 0
// this is ok because the size of h.cache is small (currently set to 16)
// moreover, the most likely hit element in the array is the first one in steady mode
// thus it may have better performance than binary search
for i, is := range h.cache {
if is.timestamp == 0 || (i > 0 && h.cache[i-1].infoschema.SchemaMetaVersion() != is.infoschema.SchemaMetaVersion()+1) {
// the schema version doesn't have a timestamp or there is a gap in the schema cache
// ignore all the schema cache equals or less than this version in search by timestamp
break
}
if ts >= uint64(is.timestamp) {
// found the largest version before the given ts
return is.infoschema, true
}
}

logutil.BgLogger().Debug("SCHEMA CACHE no schema found")
return nil, false
}

// GetByVersion gets the information schema based on schemaVersion. Returns nil if it is not loaded.
func (h *InfoCache) GetByVersion(version int64) InfoSchema {
h.mu.RLock()
defer h.mu.RUnlock()
return h.getByVersionNoLock(version)
}

func (h *InfoCache) getByVersionNoLock(version int64) InfoSchema {
getVersionCounter.Inc()
i := sort.Search(len(h.cache), func(i int) bool {
return h.cache[i].SchemaMetaVersion() <= version
return h.cache[i].infoschema.SchemaMetaVersion() <= version
})

// `GetByVersion` is allowed to load the latest schema that is less than argument `version`.
Expand All @@ -86,63 +131,72 @@ func (h *InfoCache) GetByVersion(version int64) InfoSchema {
// }
// ```

if i < len(h.cache) && (i != 0 || h.cache[i].SchemaMetaVersion() == version) {
if i < len(h.cache) && (i != 0 || h.cache[i].infoschema.SchemaMetaVersion() == version) {
hitVersionCounter.Inc()
return h.cache[i]
return h.cache[i].infoschema
}
return nil
}

// GetBySnapshotTS gets the information schema based on snapshotTS.
// If the snapshotTS is new than maxUpdatedSnapshotTS, that's mean it can directly use
// the latest infoschema. otherwise, will return nil.
// It searches the schema cache and find the schema with max schema ts that equals or smaller than given snapshot ts
// Where the schema ts is the commitTs of the txn creates the schema diff
func (h *InfoCache) GetBySnapshotTS(snapshotTS uint64) InfoSchema {
h.mu.RLock()
defer h.mu.RUnlock()

getTSCounter.Inc()
if snapshotTS >= h.maxUpdatedSnapshotTS {
if len(h.cache) > 0 {
hitTSCounter.Inc()
return h.cache[0]
}
if schema, ok := h.getSchemaByTimestampNoLock(snapshotTS); ok {
hitTSCounter.Inc()
return schema
}
return nil
}

// Insert will **TRY** to insert the infoschema into the cache.
// It only promised to cache the newest infoschema.
// It returns 'true' if it is cached, 'false' otherwise.
func (h *InfoCache) Insert(is InfoSchema, snapshotTS uint64) bool {
// schemaTs is the commitTs of the txn creates the schema diff, which indicates since when the schema version is taking effect
func (h *InfoCache) Insert(is InfoSchema, schemaTS uint64) bool {
logutil.BgLogger().Debug("INSERT SCHEMA", zap.Uint64("schema ts", schemaTS), zap.Int64("schema version", is.SchemaMetaVersion()))
h.mu.Lock()
defer h.mu.Unlock()

version := is.SchemaMetaVersion()

// assume this is the timestamp order as well
i := sort.Search(len(h.cache), func(i int) bool {
return h.cache[i].SchemaMetaVersion() <= version
return h.cache[i].infoschema.SchemaMetaVersion() <= version
})

if h.maxUpdatedSnapshotTS < snapshotTS {
h.maxUpdatedSnapshotTS = snapshotTS
}

// cached entry
if i < len(h.cache) && h.cache[i].SchemaMetaVersion() == version {
if i < len(h.cache) && h.cache[i].infoschema.SchemaMetaVersion() == version {
// update timestamp if it is not 0 and cached one is 0
if schemaTS > 0 && h.cache[i].timestamp == 0 {
h.cache[i].timestamp = int64(schemaTS)
}
return true
}

if len(h.cache) < cap(h.cache) {
// has free space, grown the slice
h.cache = h.cache[:len(h.cache)+1]
copy(h.cache[i+1:], h.cache[i:])
h.cache[i] = is
return true
h.cache[i] = schemaAndTimestamp{
infoschema: is,
timestamp: int64(schemaTS),
}
} else if i < len(h.cache) {
// drop older schema
copy(h.cache[i+1:], h.cache[i:])
h.cache[i] = is
return true
h.cache[i] = schemaAndTimestamp{
infoschema: is,
timestamp: int64(schemaTS),
}
} else {
// older than all cached schemas, refuse to cache it
return false
}
// older than all cached schemas, refuse to cache it
return false

return true
}
51 changes: 50 additions & 1 deletion infoschema/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func TestInsert(t *testing.T) {
ic.Insert(is5, 5)
require.Equal(t, is5, ic.GetByVersion(5))
require.Equal(t, is2, ic.GetByVersion(2))
// there is a gap in schema cache, so don't use this version
require.Nil(t, ic.GetBySnapshotTS(2))
require.Equal(t, is5, ic.GetBySnapshotTS(10))

Expand All @@ -59,7 +60,9 @@ func TestInsert(t *testing.T) {
require.Equal(t, is5, ic.GetByVersion(5))
require.Equal(t, is2, ic.GetByVersion(2))
require.Nil(t, ic.GetByVersion(0))
// there is a gap in schema cache, so don't use this version
require.Nil(t, ic.GetBySnapshotTS(2))
require.Equal(t, is5, ic.GetBySnapshotTS(5))
require.Equal(t, is6, ic.GetBySnapshotTS(10))

// replace 2, drop 2
Expand Down Expand Up @@ -91,7 +94,7 @@ func TestInsert(t *testing.T) {
require.Nil(t, ic.GetByVersion(2))
require.Nil(t, ic.GetByVersion(0))
require.Nil(t, ic.GetBySnapshotTS(2))
require.Nil(t, ic.GetBySnapshotTS(5))
require.Equal(t, is5, ic.GetBySnapshotTS(5))
require.Equal(t, is6, ic.GetBySnapshotTS(10))
}

Expand Down Expand Up @@ -129,3 +132,49 @@ func TestGetLatest(t *testing.T) {
ic.Insert(is0, 0)
require.Equal(t, is2, ic.GetLatest())
}

func TestGetByTimestamp(t *testing.T) {
ic := infoschema.NewCache(16)
require.NotNil(t, ic)
require.Nil(t, ic.GetLatest())
require.Equal(t, 0, ic.Len())

is1 := infoschema.MockInfoSchemaWithSchemaVer(nil, 1)
ic.Insert(is1, 1)
require.Nil(t, ic.GetBySnapshotTS(0))
require.Equal(t, is1, ic.GetBySnapshotTS(1))
require.Equal(t, is1, ic.GetBySnapshotTS(2))
require.Equal(t, 1, ic.Len())

is3 := infoschema.MockInfoSchemaWithSchemaVer(nil, 3)
ic.Insert(is3, 3)
require.Equal(t, is3, ic.GetLatest())
require.Nil(t, ic.GetBySnapshotTS(0))
// there is a gap, no schema returned for ts 2
require.Nil(t, ic.GetBySnapshotTS(2))
require.Equal(t, is3, ic.GetBySnapshotTS(3))
require.Equal(t, is3, ic.GetBySnapshotTS(4))
require.Equal(t, 2, ic.Len())

is2 := infoschema.MockInfoSchemaWithSchemaVer(nil, 2)
// schema version 2 doesn't have timestamp set
// thus all schema before ver 2 cannot be searched by timestamp anymore
// because the ts of ver 2 is not accurate
ic.Insert(is2, 0)
require.Equal(t, is3, ic.GetLatest())
require.Nil(t, ic.GetBySnapshotTS(0))
require.Nil(t, ic.GetBySnapshotTS(1))
require.Nil(t, ic.GetBySnapshotTS(2))
require.Equal(t, is3, ic.GetBySnapshotTS(3))
require.Equal(t, is3, ic.GetBySnapshotTS(4))
require.Equal(t, 3, ic.Len())

// insert is2 again with correct timestamp, to correct previous wrong timestamp
ic.Insert(is2, 2)
require.Equal(t, is3, ic.GetLatest())
require.Equal(t, is1, ic.GetBySnapshotTS(1))
require.Equal(t, is2, ic.GetBySnapshotTS(2))
require.Equal(t, is3, ic.GetBySnapshotTS(3))
require.Equal(t, 3, ic.Len())

}
Loading

0 comments on commit a62fcb7

Please sign in to comment.