Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schema cache: record the ddl timestamp and associate schema cache with the timestamp for stale read #42083

Merged
merged 22 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ go_library(
"//sessionctx/sessionstates",
"//sessionctx/variable",
"//statistics/handle",
"//store/helper",
"//telemetry",
"//ttl/ttlworker",
"//types",
Expand Down
30 changes: 27 additions & 3 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import (
"github.com/pingcap/tidb/sessionctx/sessionstates"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/telemetry"
"github.com/pingcap/tidb/ttl/ttlworker"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -104,7 +105,7 @@ func NewMockDomain() *Domain {
do := &Domain{
infoCache: infoschema.NewCache(1),
}
do.infoCache.Insert(infoschema.MockInfoSchema(nil), 1)
do.infoCache.Insert(infoschema.MockInfoSchema(nil), 0)
return do
}

Expand Down Expand Up @@ -199,6 +200,12 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
if err != nil {
return nil, false, 0, nil, err
}
// fetch the commit timestamp of the schema diff
schemaTs, err := do.getTimestampForSchemaVersionWithNonEmptyDiff(m, neededSchemaVersion)
if err != nil {
logutil.BgLogger().Warn("failed to get schema version", zap.Error(err), zap.Int64("version", neededSchemaVersion))
schemaTs = 0
}

if is := do.infoCache.GetByVersion(neededSchemaVersion); is != nil {
return is, true, 0, nil, nil
Expand All @@ -220,7 +227,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
if currentSchemaVersion != 0 && neededSchemaVersion > currentSchemaVersion && neededSchemaVersion-currentSchemaVersion < 100 {
is, relatedChanges, err := do.tryLoadSchemaDiffs(m, currentSchemaVersion, neededSchemaVersion)
if err == nil {
do.infoCache.Insert(is, startTS)
do.infoCache.Insert(is, uint64(schemaTs))
logutil.BgLogger().Info("diff load InfoSchema success",
zap.Int64("currentSchemaVersion", currentSchemaVersion),
zap.Int64("neededSchemaVersion", neededSchemaVersion),
Expand Down Expand Up @@ -259,10 +266,27 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
zap.Duration("start time", time.Since(startTime)))

is := newISBuilder.Build()
do.infoCache.Insert(is, startTS)
do.infoCache.Insert(is, uint64(schemaTs))
return is, false, currentSchemaVersion, nil, nil
}

// Returns the timestamp of a schema version, which is the commit timestamp of the schema diff
func (do *Domain) getTimestampForSchemaVersionWithNonEmptyDiff(m *meta.Meta, version int64) (int64, error) {
tikvStore, ok := do.Store().(helper.Storage)
if ok {
helper := helper.NewHelper(tikvStore)
data, err := helper.GetMvccByEncodedKey(m.EncodeSchemaDiffKey(version))
if err != nil {
return 0, err
}
if len(data.Info.Writes) == 0 {
return 0, errors.Errorf("There is no Write MVCC info for the schema version")
}
return int64(data.Info.Writes[0].CommitTs), nil
}
return 0, errors.Errorf("cannot get store from domain")
}

func (do *Domain) sysFacHack() (pools.Resource, error) {
// TODO: Here we create new sessions with sysFac in DDL,
// which will use `do` as Domain instead of call `domap.Get`.
Expand Down
102 changes: 72 additions & 30 deletions infoschema/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,36 @@ import (
"sync"

infoschema_metrics "github.com/pingcap/tidb/infoschema/metrics"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

// InfoCache handles information schema, including getting and setting.
// The cache behavior, however, is transparent and under automatic management.
// It only promised to cache the infoschema, if it is newer than all the cached.
type InfoCache struct {
mu sync.RWMutex
// cache is sorted by SchemaVersion in descending order
cache []InfoSchema
// record SnapshotTS of the latest schema Insert.
maxUpdatedSnapshotTS uint64
// cache is sorted by both SchemaVersion and timestamp in descending order, assume they have same order
cache []schemaAndTimestamp
}

type schemaAndTimestamp struct {
infoschema InfoSchema
timestamp int64
}

// NewCache creates a new InfoCache.
func NewCache(capacity int) *InfoCache {
return &InfoCache{cache: make([]InfoSchema, 0, capacity)}
return &InfoCache{
cache: make([]schemaAndTimestamp, 0, capacity),
}
}

// Reset resets the cache.
func (h *InfoCache) Reset(capacity int) {
h.mu.Lock()
defer h.mu.Unlock()
h.cache = make([]InfoSchema, 0, capacity)
h.cache = make([]schemaAndTimestamp, 0, capacity)
}

// GetLatest gets the newest information schema.
Expand All @@ -51,18 +58,44 @@ func (h *InfoCache) GetLatest() InfoSchema {
infoschema_metrics.GetLatestCounter.Inc()
if len(h.cache) > 0 {
infoschema_metrics.HitLatestCounter.Inc()
return h.cache[0]
return h.cache[0].infoschema
}
return nil
}

func (h *InfoCache) getSchemaByTimestampNoLock(ts uint64) (InfoSchema, bool) {
logutil.BgLogger().Debug("SCHEMA CACHE get schema", zap.Uint64("timestamp", ts))
// search one by one instead of binary search, because the timestamp of a schema could be 0
// this is ok because the size of h.cache is small (currently set to 16)
// moreover, the most likely hit element in the array is the first one in steady mode
// thus it may have better performance than binary search
for i, is := range h.cache {
if is.timestamp == 0 || (i > 0 && h.cache[i-1].infoschema.SchemaMetaVersion() != is.infoschema.SchemaMetaVersion()+1) {
// the schema version doesn't have a timestamp or there is a gap in the schema cache
// ignore all the schema cache equals or less than this version in search by timestamp
break
}
if ts >= uint64(is.timestamp) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems it has a assumption that all the history information schemas are synced and no one is lost between two information schemas. But I'm curious about what will happen if one TiDB offline for a while (for example one hour) and then online again because some of information schemas will be lost.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch! updated to ignore the schema cache if there is a gap in the schema version between the searched schema and the immediate one after it in the cache.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current check only works if cache hits. What about moving L80-L83 to L78, making it a general check like is.timestamp == 0?

That is, we trust if and only if is.timestamp != 0 and no gap.

// found the largest version before the given ts
return is.infoschema, true
}
}

logutil.BgLogger().Debug("SCHEMA CACHE no schema found")
return nil, false
}

// GetByVersion gets the information schema based on schemaVersion. Returns nil if it is not loaded.
func (h *InfoCache) GetByVersion(version int64) InfoSchema {
h.mu.RLock()
defer h.mu.RUnlock()
return h.getByVersionNoLock(version)
}

func (h *InfoCache) getByVersionNoLock(version int64) InfoSchema {
infoschema_metrics.GetVersionCounter.Inc()
i := sort.Search(len(h.cache), func(i int) bool {
return h.cache[i].SchemaMetaVersion() <= version
return h.cache[i].infoschema.SchemaMetaVersion() <= version
})

// `GetByVersion` is allowed to load the latest schema that is less than argument `version`.
Expand All @@ -83,63 +116,72 @@ func (h *InfoCache) GetByVersion(version int64) InfoSchema {
// }
// ```

if i < len(h.cache) && (i != 0 || h.cache[i].SchemaMetaVersion() == version) {
if i < len(h.cache) && (i != 0 || h.cache[i].infoschema.SchemaMetaVersion() == version) {
infoschema_metrics.HitVersionCounter.Inc()
return h.cache[i]
return h.cache[i].infoschema
}
return nil
}

// GetBySnapshotTS gets the information schema based on snapshotTS.
// If the snapshotTS is new than maxUpdatedSnapshotTS, that's mean it can directly use
// the latest infoschema. otherwise, will return nil.
// It searches the schema cache and find the schema with max schema ts that equals or smaller than given snapshot ts
// Where the schema ts is the commitTs of the txn creates the schema diff
func (h *InfoCache) GetBySnapshotTS(snapshotTS uint64) InfoSchema {
h.mu.RLock()
defer h.mu.RUnlock()

infoschema_metrics.GetTSCounter.Inc()
if snapshotTS >= h.maxUpdatedSnapshotTS {
if len(h.cache) > 0 {
infoschema_metrics.HitTSCounter.Inc()
return h.cache[0]
}
if schema, ok := h.getSchemaByTimestampNoLock(snapshotTS); ok {
infoschema_metrics.HitTSCounter.Inc()
return schema
}
return nil
}

// Insert will **TRY** to insert the infoschema into the cache.
// It only promised to cache the newest infoschema.
// It returns 'true' if it is cached, 'false' otherwise.
func (h *InfoCache) Insert(is InfoSchema, snapshotTS uint64) bool {
// schemaTs is the commitTs of the txn creates the schema diff, which indicates since when the schema version is taking effect
func (h *InfoCache) Insert(is InfoSchema, schemaTS uint64) bool {
logutil.BgLogger().Debug("INSERT SCHEMA", zap.Uint64("schema ts", schemaTS), zap.Int64("schema version", is.SchemaMetaVersion()))
h.mu.Lock()
defer h.mu.Unlock()

version := is.SchemaMetaVersion()

// assume this is the timestamp order as well
i := sort.Search(len(h.cache), func(i int) bool {
return h.cache[i].SchemaMetaVersion() <= version
return h.cache[i].infoschema.SchemaMetaVersion() <= version
})

if h.maxUpdatedSnapshotTS < snapshotTS {
h.maxUpdatedSnapshotTS = snapshotTS
}

// cached entry
if i < len(h.cache) && h.cache[i].SchemaMetaVersion() == version {
if i < len(h.cache) && h.cache[i].infoschema.SchemaMetaVersion() == version {
// update timestamp if it is not 0 and cached one is 0
if schemaTS > 0 && h.cache[i].timestamp == 0 {
h.cache[i].timestamp = int64(schemaTS)
}
return true
}

if len(h.cache) < cap(h.cache) {
// has free space, grown the slice
h.cache = h.cache[:len(h.cache)+1]
copy(h.cache[i+1:], h.cache[i:])
h.cache[i] = is
return true
h.cache[i] = schemaAndTimestamp{
infoschema: is,
timestamp: int64(schemaTS),
}
} else if i < len(h.cache) {
// drop older schema
copy(h.cache[i+1:], h.cache[i:])
h.cache[i] = is
return true
h.cache[i] = schemaAndTimestamp{
infoschema: is,
timestamp: int64(schemaTS),
}
} else {
// older than all cached schemas, refuse to cache it
return false
}
// older than all cached schemas, refuse to cache it
return false

return true
}
38 changes: 37 additions & 1 deletion infoschema/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func TestInsert(t *testing.T) {
ic.Insert(is5, 5)
require.Equal(t, is5, ic.GetByVersion(5))
require.Equal(t, is2, ic.GetByVersion(2))
// there is a gap in schema cache, so don't use this version
require.Nil(t, ic.GetBySnapshotTS(2))
require.Equal(t, is5, ic.GetBySnapshotTS(10))

Expand All @@ -59,7 +60,9 @@ func TestInsert(t *testing.T) {
require.Equal(t, is5, ic.GetByVersion(5))
require.Equal(t, is2, ic.GetByVersion(2))
require.Nil(t, ic.GetByVersion(0))
// there is a gap in schema cache, so don't use this version
require.Nil(t, ic.GetBySnapshotTS(2))
require.Equal(t, is5, ic.GetBySnapshotTS(5))
require.Equal(t, is6, ic.GetBySnapshotTS(10))

// replace 2, drop 2
Expand Down Expand Up @@ -91,7 +94,7 @@ func TestInsert(t *testing.T) {
require.Nil(t, ic.GetByVersion(2))
require.Nil(t, ic.GetByVersion(0))
require.Nil(t, ic.GetBySnapshotTS(2))
require.Nil(t, ic.GetBySnapshotTS(5))
require.Equal(t, is5, ic.GetBySnapshotTS(5))
require.Equal(t, is6, ic.GetBySnapshotTS(10))
}

Expand Down Expand Up @@ -129,3 +132,36 @@ func TestGetLatest(t *testing.T) {
ic.Insert(is0, 0)
require.Equal(t, is2, ic.GetLatest())
}

func TestGetByTimestamp(t *testing.T) {
ic := infoschema.NewCache(16)
require.NotNil(t, ic)
require.Nil(t, ic.GetLatest())

is1 := infoschema.MockInfoSchemaWithSchemaVer(nil, 1)
ic.Insert(is1, 1)
require.Nil(t, ic.GetBySnapshotTS(0))
require.Equal(t, is1, ic.GetBySnapshotTS(1))
require.Equal(t, is1, ic.GetBySnapshotTS(2))

is3 := infoschema.MockInfoSchemaWithSchemaVer(nil, 3)
ic.Insert(is3, 3)
require.Equal(t, is3, ic.GetLatest())
require.Nil(t, ic.GetBySnapshotTS(0))
// there is a gap, no schema returned for ts 2
require.Nil(t, ic.GetBySnapshotTS(2))
require.Equal(t, is3, ic.GetBySnapshotTS(3))
require.Equal(t, is3, ic.GetBySnapshotTS(4))

is2 := infoschema.MockInfoSchemaWithSchemaVer(nil, 2)
// schema version 2 doesn't have timestamp set
// thus all schema before ver 2 cannot be searched by timestamp anymore
// because the ts of ver 2 is not accurate
ic.Insert(is2, 0)
require.Equal(t, is3, ic.GetLatest())
require.Nil(t, ic.GetBySnapshotTS(0))
require.Nil(t, ic.GetBySnapshotTS(1))
require.Nil(t, ic.GetBySnapshotTS(2))
require.Equal(t, is3, ic.GetBySnapshotTS(3))
require.Equal(t, is3, ic.GetBySnapshotTS(4))
}
6 changes: 6 additions & 0 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,12 @@ func (m *Meta) GetSchemaVersionWithNonEmptyDiff() (int64, error) {
return v, err
}

// EncodeSchemaDiffKey returns the raw kv key for a schema diff
func (m *Meta) EncodeSchemaDiffKey(schemaVersion int64) kv.Key {
diffKey := m.schemaDiffKey(schemaVersion)
return m.txn.EncodeStringDataKey(diffKey)
}

// GetSchemaVersion gets current global schema version.
func (m *Meta) GetSchemaVersion() (int64, error) {
return m.txn.GetInt64(mSchemaVersionKey)
Expand Down
8 changes: 4 additions & 4 deletions structure/string.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ func (t *TxStructure) Set(key []byte, value []byte) error {
if t.readWriter == nil {
return ErrWriteOnSnapshot
}
ek := t.encodeStringDataKey(key)
ek := t.EncodeStringDataKey(key)
return t.readWriter.Set(ek, value)
}

// Get gets the string value of a key.
func (t *TxStructure) Get(key []byte) ([]byte, error) {
ek := t.encodeStringDataKey(key)
ek := t.EncodeStringDataKey(key)
value, err := t.reader.Get(context.TODO(), ek)
if kv.ErrNotExist.Equal(err) {
err = nil
Expand All @@ -58,7 +58,7 @@ func (t *TxStructure) Inc(key []byte, step int64) (int64, error) {
if t.readWriter == nil {
return 0, ErrWriteOnSnapshot
}
ek := t.encodeStringDataKey(key)
ek := t.EncodeStringDataKey(key)
// txn Inc will lock this key, so we don't lock it here.
n, err := kv.IncInt64(t.readWriter, ek, step)
if kv.ErrNotExist.Equal(err) {
Expand All @@ -72,7 +72,7 @@ func (t *TxStructure) Clear(key []byte) error {
if t.readWriter == nil {
return ErrWriteOnSnapshot
}
ek := t.encodeStringDataKey(key)
ek := t.EncodeStringDataKey(key)
err := t.readWriter.Delete(ek)
if kv.ErrNotExist.Equal(err) {
err = nil
Expand Down
3 changes: 2 additions & 1 deletion structure/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ const (
// Make linter happy, since encodeHashMetaKey is unused in this repo.
var _ = (&TxStructure{}).encodeHashMetaKey

func (t *TxStructure) encodeStringDataKey(key []byte) kv.Key {
// EncodeStringDataKey will encode string key.
func (t *TxStructure) EncodeStringDataKey(key []byte) kv.Key {
// for codec Encode, we may add extra bytes data, so here and following encode
// we will use extra length like 4 for a little optimization.
ek := make([]byte, 0, len(t.prefix)+len(key)+24)
Expand Down