Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: tiny refactor code to reduce txn conflict on 'table_cache_meta' #32387

Merged
merged 7 commits into from
Mar 1, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ddl/placement_policy_ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (s *testDDLSuiteToVerify) TestPlacementPolicyInUse() {
t4.State = model.StatePublic
db1.Tables = append(db1.Tables, t4)

builder, err := infoschema.NewBuilder(store, nil, nil).InitWithDBInfos(
builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos(
[]*model.DBInfo{db1, db2, dbP},
nil,
[]*model.PolicyInfo{p1, p2, p3, p4, p5},
Expand Down
25 changes: 3 additions & 22 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ type Domain struct {
serverID uint64
serverIDSession *concurrency.Session
isLostConnectionToPD atomicutil.Int32 // !0: true, 0: false.
renewLeaseCh chan func() // It is used to call the renewLease function of the cache table.
onClose func()
sysExecutorFactory func(*Domain) (pools.Resource, error)
}
Expand Down Expand Up @@ -162,7 +161,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
return nil, false, currentSchemaVersion, nil, err
}

newISBuilder, err := infoschema.NewBuilder(do.Store(), do.renewLeaseCh, do.sysFacHack).InitWithDBInfos(schemas, bundles, policies, neededSchemaVersion)
newISBuilder, err := infoschema.NewBuilder(do.Store(), do.sysFacHack).InitWithDBInfos(schemas, bundles, policies, neededSchemaVersion)
if err != nil {
return nil, false, currentSchemaVersion, nil, err
}
Expand Down Expand Up @@ -284,7 +283,7 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64
}
diffs = append(diffs, diff)
}
builder := infoschema.NewBuilder(do.Store(), do.renewLeaseCh, do.sysFacHack).InitWithOldInfoSchema(do.infoCache.GetLatest())
builder := infoschema.NewBuilder(do.Store(), do.sysFacHack).InitWithOldInfoSchema(do.infoCache.GetLatest())
phyTblIDs := make([]int64, 0, len(diffs))
actions := make([]uint64, 0, len(diffs))
for _, diff := range diffs {
Expand Down Expand Up @@ -731,7 +730,6 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio
indexUsageSyncLease: idxUsageSyncLease,
planReplayer: &planReplayer{planReplayerGCLease: planReplayerGCLease},
onClose: onClose,
renewLeaseCh: make(chan func(), 10),
expiredTimeStamp4PC: types.NewTime(types.ZeroCoreTime, mysql.TypeTimestamp, types.DefaultFsp),
}

Expand Down Expand Up @@ -858,10 +856,9 @@ func (do *Domain) Init(ddlLease time.Duration, sysExecutorFactory func(*Domain)
// Local store needs to get the change information for every DDL state in each session.
go do.loadSchemaInLoop(ctx, ddlLease)
}
do.wg.Add(4)
do.wg.Add(3)
go do.topNSlowQueryLoop()
go do.infoSyncerKeeper()
go do.renewLease()
go do.globalConfigSyncerKeeper()
if !skipRegisterToDashboard {
do.wg.Add(1)
Expand Down Expand Up @@ -1782,22 +1779,6 @@ func (do *Domain) MockInfoCacheAndLoadInfoSchema(is infoschema.InfoSchema) {
do.infoCache.Insert(is, 0)
}

func (do *Domain) renewLease() {
defer func() {
do.wg.Done()
logutil.BgLogger().Info("renew lease goroutine exited.")
}()
for {
select {
case <-do.exit:
close(do.renewLeaseCh)
return
case op := <-do.renewLeaseCh:
op()
}
}
}

func init() {
initByLDFlagsForGlobalKill()
}
Expand Down
2 changes: 1 addition & 1 deletion executor/slow_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func parseLog(retriever *slowQueryRetriever, sctx sessionctx.Context, reader *bu
}

func newSlowQueryRetriever() (*slowQueryRetriever, error) {
newISBuilder, err := infoschema.NewBuilder(nil, nil, nil).InitWithDBInfos(nil, nil, nil, 0)
newISBuilder, err := infoschema.NewBuilder(nil, nil).InitWithDBInfos(nil, nil, nil, 0)
if err != nil {
return nil, err
}
Expand Down
14 changes: 6 additions & 8 deletions infoschema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@ type Builder struct {
// TODO: store is only used by autoid allocators
// detach allocators from storage, use passed transaction in the feature
store kv.Storage
// TODO: renewLeaseCh is only used to pass data between table and domain
renewLeaseCh chan func()
factory func() (pools.Resource, error)

factory func() (pools.Resource, error)
}

// ApplyDiff applies SchemaDiff to the new InfoSchema.
Expand Down Expand Up @@ -711,7 +710,7 @@ func (b *Builder) tableFromMeta(alloc autoid.Allocators, tblInfo *model.TableInf
return nil, errors.Trace(err)
}

err = t.Init(b.renewLeaseCh, tmp.(sqlexec.SQLExecutor))
err = t.Init(tmp.(sqlexec.SQLExecutor))
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -755,7 +754,7 @@ func RegisterVirtualTable(dbInfo *model.DBInfo, tableFromMeta tableFromMetaFunc)
}

// NewBuilder creates a new Builder with a Handle.
func NewBuilder(store kv.Storage, renewCh chan func(), factory func() (pools.Resource, error)) *Builder {
func NewBuilder(store kv.Storage, factory func() (pools.Resource, error)) *Builder {
return &Builder{
store: store,
is: &infoSchema{
Expand All @@ -764,9 +763,8 @@ func NewBuilder(store kv.Storage, renewCh chan func(), factory func() (pools.Res
ruleBundleMap: map[string]*placement.Bundle{},
sortedTablesBuckets: make([]sortedTables, bucketCount),
},
dirtyDB: make(map[string]bool),
renewLeaseCh: renewCh,
factory: factory,
dirtyDB: make(map[string]bool),
factory: factory,
}
}

Expand Down
6 changes: 3 additions & 3 deletions infoschema/infoschema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestBasic(t *testing.T) {
})
require.NoError(t, err)

builder, err := infoschema.NewBuilder(dom.Store(), nil, nil).InitWithDBInfos(dbInfos, nil, nil, 1)
builder, err := infoschema.NewBuilder(dom.Store(), nil).InitWithDBInfos(dbInfos, nil, nil, 1)
require.NoError(t, err)

txn, err := store.Begin()
Expand Down Expand Up @@ -254,7 +254,7 @@ func TestInfoTables(t *testing.T) {
require.NoError(t, err)
}()

builder, err := infoschema.NewBuilder(store, nil, nil).InitWithDBInfos(nil, nil, nil, 0)
builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos(nil, nil, nil, 0)
require.NoError(t, err)
is := builder.Build()

Expand Down Expand Up @@ -319,7 +319,7 @@ func TestGetBundle(t *testing.T) {
require.NoError(t, err)
}()

builder, err := infoschema.NewBuilder(store, nil, nil).InitWithDBInfos(nil, nil, nil, 0)
builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos(nil, nil, nil, 0)
require.NoError(t, err)
is := builder.Build()

Expand Down
2 changes: 1 addition & 1 deletion table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ var MockTableFromMeta func(tableInfo *model.TableInfo) Table
type CachedTable interface {
Table

Init(renewCh chan func(), exec sqlexec.SQLExecutor) error
Init(exec sqlexec.SQLExecutor) error

// TryReadFromCache checks if the cache table is readable.
TryReadFromCache(ts uint64, leaseDuration time.Duration) kv.MemBuffer
Expand Down
71 changes: 30 additions & 41 deletions table/tables/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package tables

import (
"context"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -51,15 +50,14 @@ type cachedTable struct {
TableCommon
cacheData atomic.Value
handle StateRemote
renewCh chan func()
totalSize int64

mu struct {
sync.RWMutex
lockingForRead bool
}
lockingForRead tokenLimit
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to be a semaphore, why not just use semaphore in go lib?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with semaphore, the channel is more simple for me here.

renewReadLease tokenLimit
}

type tokenLimit = chan struct{}

// cacheData pack the cache data and lease.
type cacheData struct {
Start uint64
Expand Down Expand Up @@ -94,11 +92,10 @@ func (c *cachedTable) TryReadFromCache(ts uint64, leaseDuration time.Duration) k
nowTime := oracle.GetTimeFromTS(ts)
distance := leaseTime.Sub(nowTime)
if distance >= 0 && distance <= leaseDuration/2 {
op := c.renewLease(ts, RenewReadLease, data, leaseDuration)
select {
case c.renewCh <- op:
case c.renewReadLease <- struct{}{}:
go c.renewLease(ts, data, leaseDuration)
default:
// Skip this time, if the previous renew lease operation hasn't finished.
}
}
return data.MemBuffer
Expand All @@ -109,15 +106,16 @@ func (c *cachedTable) TryReadFromCache(ts uint64, leaseDuration time.Duration) k
// newCachedTable creates a new CachedTable Instance
func newCachedTable(tbl *TableCommon) (table.Table, error) {
ret := &cachedTable{
TableCommon: *tbl,
TableCommon: *tbl,
lockingForRead: make(chan struct{}, 1),
renewReadLease: make(chan struct{}, 1),
}
return ret, nil
}

// Init is an extra operation for cachedTable after TableFromMeta,
// Because cachedTable need some additional parameter that can't be passed in TableFromMeta.
func (c *cachedTable) Init(renewCh chan func(), exec sqlexec.SQLExecutor) error {
c.renewCh = renewCh
func (c *cachedTable) Init(exec sqlexec.SQLExecutor) error {
raw, ok := exec.(sqlExec)
if !ok {
return errors.New("Need sqlExec rather than sqlexec.SQLExecutor")
Expand Down Expand Up @@ -172,19 +170,12 @@ func (c *cachedTable) loadDataFromOriginalTable(store kv.Storage, lease uint64)
}

func (c *cachedTable) UpdateLockForRead(ctx context.Context, store kv.Storage, ts uint64, leaseDuration time.Duration) {
c.mu.RLock()
lockingForRead := c.mu.lockingForRead
c.mu.RUnlock()
if lockingForRead {
select {
case c.lockingForRead <- struct{}{}:
go c.updateLockForRead(ctx, store, ts, leaseDuration)
default:
// There is a inflight calling already.
return
}

c.mu.Lock()
c.mu.lockingForRead = true
c.mu.Unlock()

go c.updateLockForRead(ctx, store, ts, leaseDuration)
}

func (c *cachedTable) updateLockForRead(ctx context.Context, store kv.Storage, ts uint64, leaseDuration time.Duration) {
Expand All @@ -194,9 +185,7 @@ func (c *cachedTable) updateLockForRead(ctx context.Context, store kv.Storage, t
zap.Reflect("r", r),
zap.Stack("stack trace"))
}
c.mu.Lock()
c.mu.lockingForRead = false
c.mu.Unlock()
<-c.lockingForRead
}()

// Load data from original table and the update lock information.
Expand Down Expand Up @@ -260,20 +249,20 @@ func (c *cachedTable) RemoveRecord(sctx sessionctx.Context, h kv.Handle, r []typ
return c.TableCommon.RemoveRecord(sctx, h, r)
}

func (c *cachedTable) renewLease(ts uint64, op RenewLeaseType, data *cacheData, leaseDuration time.Duration) func() {
return func() {
tid := c.Meta().ID
lease := leaseFromTS(ts, leaseDuration)
succ, err := c.handle.RenewLease(context.Background(), tid, lease, op)
if err != nil {
log.Warn("Renew read lease error", zap.Error(err))
}
if succ {
c.cacheData.Store(&cacheData{
Start: data.Start,
Lease: lease,
MemBuffer: data.MemBuffer,
})
}
func (c *cachedTable) renewLease(ts uint64, data *cacheData, leaseDuration time.Duration) {
defer func() { <-c.renewReadLease }()

tid := c.Meta().ID
lease := leaseFromTS(ts, leaseDuration)
succ, err := c.handle.RenewLease(context.Background(), tid, lease, RenewReadLease)
if err != nil && !kv.IsTxnRetryableError(err) {
log.Warn("Renew read lease error", zap.Error(err))
}
if succ {
c.cacheData.Store(&cacheData{
Start: data.Start,
Lease: lease,
MemBuffer: data.MemBuffer,
})
}
}
4 changes: 2 additions & 2 deletions table/tables/state_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func (h *stateRemoteHandle) renewWriteLease(ctx context.Context, tid int64, newL
}

func (h *stateRemoteHandle) beginTxn(ctx context.Context) error {
_, err := h.execSQL(ctx, "begin")
_, err := h.execSQL(ctx, "begin optimistic")
return err
}

Expand Down Expand Up @@ -324,7 +324,7 @@ func (h *stateRemoteHandle) runInTxn(ctx context.Context, fn func(ctx context.Co
}

func (h *stateRemoteHandle) loadRow(ctx context.Context, tid int64) (CachedTableLockType, uint64, uint64, error) {
chunkRows, err := h.execSQL(ctx, "select lock_type, lease, oldReadLease from mysql.table_cache_meta where tid = %? for update", tid)
chunkRows, err := h.execSQL(ctx, "select lock_type, lease, oldReadLease from mysql.table_cache_meta where tid = %?", tid)
if err != nil {
return 0, 0, 0, errors.Trace(err)
}
Expand Down