diff --git a/config/config.go b/config/config.go index 224df6fe022c3..9fcf062cf2270 100644 --- a/config/config.go +++ b/config/config.go @@ -40,18 +40,19 @@ var ( // Config contains configuration options. type Config struct { - Host string `toml:"host" json:"host"` - Port uint `toml:"port" json:"port"` - Store string `toml:"store" json:"store"` - Path string `toml:"path" json:"path"` - Socket string `toml:"socket" json:"socket"` - BinlogSocket string `toml:"binlog-socket" json:"binlog-socket"` - Lease string `toml:"lease" json:"lease"` - RunDDL bool `toml:"run-ddl" json:"run-ddl"` - SplitTable bool `toml:"split-table" json:"split-table"` - TokenLimit uint `toml:"token-limit" json:"token-limit"` - OOMAction string `toml:"oom-action" json:"oom-action"` - EnableStreaming bool `toml:"enable-streaming" json:"enable-streaming"` + Host string `toml:"host" json:"host"` + Port uint `toml:"port" json:"port"` + Store string `toml:"store" json:"store"` + Path string `toml:"path" json:"path"` + Socket string `toml:"socket" json:"socket"` + BinlogSocket string `toml:"binlog-socket" json:"binlog-socket"` + Lease string `toml:"lease" json:"lease"` + RunDDL bool `toml:"run-ddl" json:"run-ddl"` + SplitTable bool `toml:"split-table" json:"split-table"` + TokenLimit uint `toml:"token-limit" json:"token-limit"` + OOMAction string `toml:"oom-action" json:"oom-action"` + EnableStreaming bool `toml:"enable-streaming" json:"enable-streaming"` + TxnLocalLatches TxnLocalLatches `toml:"txn-local-latches" json:"txn-local-latches"` // Set sys variable lower-case-table-names, ref: https://dev.mysql.com/doc/refman/5.7/en/identifier-case-sensitivity.html. // TODO: We actually only support mode 2, which keeps the original case, but the comparison is case-insensitive. LowerCaseTableNames int `toml:"lower-case-table-names" json:"lower-case-table-names"` @@ -167,6 +168,12 @@ type PlanCache struct { Shards uint `toml:"shards" json:"shards"` } +// TxnLocalLatches is the TxnLocalLatches section of the config. +type TxnLocalLatches struct { + Enabled bool `toml:"enabled" json:"enabled"` + Capacity uint `toml:"capacity" json:"capacity"` +} + // PreparedPlanCache is the PreparedPlanCache section of the config. type PreparedPlanCache struct { Enabled bool `toml:"enabled" json:"enabled"` @@ -220,16 +227,20 @@ type TiKVClient struct { } var defaultConf = Config{ - Host: "0.0.0.0", - Port: 4000, - Store: "mocktikv", - Path: "/tmp/tidb", - RunDDL: true, - SplitTable: true, - Lease: "45s", - TokenLimit: 1000, - OOMAction: "log", - EnableStreaming: false, + Host: "0.0.0.0", + Port: 4000, + Store: "mocktikv", + Path: "/tmp/tidb", + RunDDL: true, + SplitTable: true, + Lease: "45s", + TokenLimit: 1000, + OOMAction: "log", + EnableStreaming: false, + TxnLocalLatches: TxnLocalLatches{ + Enabled: false, + Capacity: 1024000, + }, LowerCaseTableNames: 2, Log: Log{ Level: "info", diff --git a/config/config.toml.example b/config/config.toml.example index a3e1f0bc1d76d..d850b30044e4e 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -214,3 +214,9 @@ grpc-connection-count = 16 # max time for commit command, must be twice bigger than raft election timeout. commit-timeout = "41s" + +[txn-local-latches] +# Enable local latches for transactions. Enable it when +# there are lots of conflicts between transactions. +enabled = false +capacity = 1024000 diff --git a/executor/simple.go b/executor/simple.go index a4d4387bccd81..51dd3b6196a87 100644 --- a/executor/simple.go +++ b/executor/simple.go @@ -206,7 +206,7 @@ func (e *SimpleExec) executeAlterUser(s *ast.AlterUserStmt) error { } if len(failedUsers) > 0 { // Commit the transaction even if we returns error - err := e.ctx.Txn().Commit(sessionctx.SetConnID2Ctx(context.Background(), e.ctx)) + err := e.ctx.Txn().Commit(sessionctx.SetCommitCtx(context.Background(), e.ctx)) if err != nil { return errors.Trace(err) } @@ -238,7 +238,7 @@ func (e *SimpleExec) executeDropUser(s *ast.DropUserStmt) error { } if len(failedUsers) > 0 { // Commit the transaction even if we returns error - err := e.ctx.Txn().Commit(sessionctx.SetConnID2Ctx(context.Background(), e.ctx)) + err := e.ctx.Txn().Commit(sessionctx.SetCommitCtx(context.Background(), e.ctx)) if err != nil { return errors.Trace(err) } diff --git a/kv/txn.go b/kv/txn.go index 5bea8f384ae94..1318998362ef1 100644 --- a/kv/txn.go +++ b/kv/txn.go @@ -24,6 +24,12 @@ import ( "golang.org/x/net/context" ) +// ContextKey is the type of context's key +type ContextKey string + +// Retryable is the key in context +const Retryable ContextKey = "Retryable" + // RunInNewTxn will run the f in a new transaction environment. func RunInNewTxn(store Storage, retryable bool, f func(txn Transaction) error) error { var ( @@ -54,7 +60,7 @@ func RunInNewTxn(store Storage, retryable bool, f func(txn Transaction) error) e return errors.Trace(err) } - err = txn.Commit(context.Background()) + err = txn.Commit(context.WithValue(context.Background(), Retryable, retryable)) if err == nil { break } diff --git a/server/conn.go b/server/conn.go index df2cb26a7b25a..07f12970e47d1 100644 --- a/server/conn.go +++ b/server/conn.go @@ -806,7 +806,7 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor } return errors.Trace(err) } - return errors.Trace(txn.Commit(sessionctx.SetConnID2Ctx(ctx, loadDataInfo.Ctx))) + return errors.Trace(txn.Commit(sessionctx.SetCommitCtx(ctx, loadDataInfo.Ctx))) } // handleLoadStats does the additional work after processing the 'load stats' query. diff --git a/session/session.go b/session/session.go index 2e258945ee22e..632e56cd17c8d 100644 --- a/session/session.go +++ b/session/session.go @@ -320,7 +320,7 @@ func (s *session) doCommit(ctx context.Context) error { schemaVer: s.sessionVars.TxnCtx.SchemaVersion, relatedTableIDs: tableIDs, }) - if err := s.txn.Commit(sessionctx.SetConnID2Ctx(ctx, s)); err != nil { + if err := s.txn.Commit(sessionctx.SetCommitCtx(ctx, s)); err != nil { return errors.Trace(err) } return nil diff --git a/sessionctx/context.go b/sessionctx/context.go index 93d1cbe5966c5..fff2e3c46ce71 100644 --- a/sessionctx/context.go +++ b/sessionctx/context.go @@ -107,12 +107,22 @@ const ( LastExecuteDDL basicCtxType = 3 ) -type contextKey string - // ConnID is the key in context. -const ConnID contextKey = "conn ID" +const ConnID kv.ContextKey = "conn ID" + +// SetCommitCtx sets the variables for context before commit a transaction. +func SetCommitCtx(ctx context.Context, sessCtx Context) context.Context { + ctx = context.WithValue(ctx, ConnID, sessCtx.GetSessionVars().ConnectionID) + retryAble := !sessCtx.GetSessionVars().TxnCtx.ForUpdate + return context.WithValue(ctx, kv.Retryable, retryAble) +} -// SetConnID2Ctx sets the connection ID to context. -func SetConnID2Ctx(ctx context.Context, sessCtx Context) context.Context { - return context.WithValue(ctx, ConnID, sessCtx.GetSessionVars().ConnectionID) +// GetRetryable returns the value of Retryable from the ctx. +func GetRetryable(ctx context.Context) bool { + var retryable bool + val := ctx.Value(kv.Retryable) + if val != nil { + retryable = val.(bool) + } + return retryable } diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index ccd4d43608408..12afbb35d8f77 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -563,6 +563,17 @@ func (c *twoPhaseCommitter) cleanupKeys(bo *Backoffer, keys [][]byte) error { // should be less than `gcRunInterval`. const maxTxnTimeUse = 590000 +func (c *twoPhaseCommitter) executeAndWriteFinishBinlog(ctx context.Context) error { + err := c.execute(ctx) + if err != nil { + c.writeFinishBinlog(binlog.BinlogType_Rollback, 0) + } else { + c.txn.commitTS = c.commitTS + c.writeFinishBinlog(binlog.BinlogType_Commit, int64(c.commitTS)) + } + return errors.Trace(err) +} + // execute executes the two-phase commit protocol. func (c *twoPhaseCommitter) execute(ctx context.Context) error { defer func() { diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index 54a738d1c2989..9dae0941e5fc3 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -45,6 +45,7 @@ func (s *testCommitterSuite) SetUpTest(c *C) { spkv := NewMockSafePointKV() store, err := newTikvStore("mocktikv-store", pdCli, spkv, client, false) c.Assert(err, IsNil) + store.EnableTxnLocalLatches(1024000) s.store = store CommitMaxBackoff = 2000 } diff --git a/store/tikv/kv.go b/store/tikv/kv.go index acce41f8b91cd..d659f20dd3452 100644 --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" + "github.com/pingcap/tidb/store/tikv/latch" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/store/tikv/oracle/oracles" "github.com/pingcap/tidb/store/tikv/tikvrpc" @@ -71,6 +72,7 @@ func (d Driver) Open(path string) (kv.Storage, error) { defer mc.Unlock() security := config.GetGlobalConfig().Security + txnLocalLatches := config.GetGlobalConfig().TxnLocalLatches etcdAddrs, disableGC, err := parsePath(path) if err != nil { return nil, errors.Trace(err) @@ -109,6 +111,9 @@ func (d Driver) Open(path string) (kv.Storage, error) { if err != nil { return nil, errors.Trace(err) } + if txnLocalLatches.Enabled { + s.EnableTxnLocalLatches(txnLocalLatches.Capacity) + } s.etcdAddrs = etcdAddrs s.tlsConfig = tlsConfig @@ -127,6 +132,7 @@ type tikvStore struct { pdClient pd.Client regionCache *RegionCache lockResolver *LockResolver + txnLatches *latch.LatchesScheduler gcWorker GCHandler etcdAddrs []string tlsConfig *tls.Config @@ -190,6 +196,10 @@ func newTikvStore(uuid string, pdClient pd.Client, spkv SafePointKV, client Clie return store, nil } +func (s *tikvStore) EnableTxnLocalLatches(size uint) { + s.txnLatches = latch.NewScheduler(size) +} + func (s *tikvStore) EtcdAddrs() []string { return s.etcdAddrs } @@ -274,6 +284,10 @@ func (s *tikvStore) Close() error { if err := s.client.Close(); err != nil { return errors.Trace(err) } + + if s.txnLatches != nil { + s.txnLatches.Close() + } return nil } diff --git a/store/tikv/latch/latch.go b/store/tikv/latch/latch.go index 70f818c853daa..a8ca718947490 100644 --- a/store/tikv/latch/latch.go +++ b/store/tikv/latch/latch.go @@ -86,6 +86,11 @@ func (l *Lock) isLocked() bool { return !l.isStale && l.acquiredCount != len(l.requiredSlots) } +// SetCommitTS sets the lock's commitTS. +func (l *Lock) SetCommitTS(commitTS uint64) { + l.commitTS = commitTS +} + // Latches which are used for concurrency control. // Each latch is indexed by a slot's ID, hence the term latch and slot are used in interchangeable, // but conceptually a latch is a queue, and a slot is an index to the queue @@ -98,7 +103,7 @@ type Latches struct { // NewLatches create a Latches with fixed length, // the size will be rounded up to the power of 2. -func NewLatches(size int) *Latches { +func NewLatches(size uint) *Latches { powerOfTwoSize := 1 << uint32(bits.Len32(uint32(size-1))) slots := make([]latch, powerOfTwoSize) return &Latches{ diff --git a/store/tikv/latch/scheduler.go b/store/tikv/latch/scheduler.go index 050b3e2172794..f3ffad7a77d9f 100644 --- a/store/tikv/latch/scheduler.go +++ b/store/tikv/latch/scheduler.go @@ -28,7 +28,7 @@ type LatchesScheduler struct { } // NewScheduler create the LatchesScheduler. -func NewScheduler(size int) *LatchesScheduler { +func NewScheduler(size uint) *LatchesScheduler { latches := NewLatches(size) unlockCh := make(chan *Lock, lockChanSize) scheduler := &LatchesScheduler{ @@ -83,8 +83,8 @@ func (scheduler *LatchesScheduler) Lock(startTS uint64, keys [][]byte) *Lock { return lock } -// UnLock unlocks a lock with commitTS. -func (scheduler *LatchesScheduler) UnLock(lock *Lock, commitTS uint64) { +// UnLock unlocks a lock. +func (scheduler *LatchesScheduler) UnLock(lock *Lock) { scheduler.RLock() defer scheduler.RUnlock() if !scheduler.closed { diff --git a/store/tikv/latch/scheduler_test.go b/store/tikv/latch/scheduler_test.go index 80970f40dbff2..d57737fb9512d 100644 --- a/store/tikv/latch/scheduler_test.go +++ b/store/tikv/latch/scheduler_test.go @@ -41,10 +41,11 @@ func (s *testSchedulerSuite) TestWithConcurrency(c *C) { for _, txn := range txns { go func(txn [][]byte, wg *sync.WaitGroup) { lock := sched.Lock(getTso(), txn) - defer sched.UnLock(lock, getTso()) + defer sched.UnLock(lock) if lock.IsStale() { // Should restart the transaction or return error } else { + lock.SetCommitTS(getTso()) // Do 2pc } wg.Done() diff --git a/store/tikv/test_util.go b/store/tikv/test_util.go index d461e732f84b8..0e48ec6cabc97 100644 --- a/store/tikv/test_util.go +++ b/store/tikv/test_util.go @@ -35,6 +35,7 @@ func NewTestTiKVStore(client Client, pdClient pd.Client, clientHijack func(Clien uid := uuid.NewV4().String() spkv := NewMockSafePointKV() tikvStore, err := newTikvStore(uid, pdCli, spkv, client, false) + tikvStore.EnableTxnLocalLatches(1024000) tikvStore.mock = true return tikvStore, errors.Trace(err) } diff --git a/store/tikv/txn.go b/store/tikv/txn.go index f2a211bd5cbc7..1987bee8b6401 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -22,7 +22,6 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/sessionctx" - binlog "github.com/pingcap/tipb/go-binlog" log "github.com/sirupsen/logrus" "golang.org/x/net/context" ) @@ -185,20 +184,40 @@ func (txn *tikvTxn) Commit(ctx context.Context) error { connID = val.(uint64) } committer, err := newTwoPhaseCommitter(txn, connID) - if err != nil { + if err != nil || committer == nil { return errors.Trace(err) } - if committer == nil { - return nil + // latches disabled + if txn.store.txnLatches == nil { + err = committer.executeAndWriteFinishBinlog(ctx) + log.Debug("[kv]", connID, " txnLatches disabled, 2pc directly:", err) + return errors.Trace(err) } - err = committer.execute(ctx) - if err != nil { - committer.writeFinishBinlog(binlog.BinlogType_Rollback, 0) + + // latches enabled + // for transactions not retryable, commit directly. + if !sessionctx.GetRetryable(ctx) { + err = committer.executeAndWriteFinishBinlog(ctx) + if err == nil { + txn.store.txnLatches.RefreshCommitTS(committer.keys, committer.commitTS) + } + log.Debug("[kv]", connID, " txnLatches enabled while txn not retryable, 2pc directly:", err) return errors.Trace(err) } - committer.writeFinishBinlog(binlog.BinlogType_Commit, int64(committer.commitTS)) - txn.commitTS = committer.commitTS - return nil + + // for transactions which need to acquire latches + lock := txn.store.txnLatches.Lock(committer.startTS, committer.keys) + defer txn.store.txnLatches.UnLock(lock) + if lock.IsStale() { + err = errors.Errorf("startTS %d is stale", txn.startTS) + return errors.Annotate(err, txnRetryableMark) + } + err = committer.executeAndWriteFinishBinlog(ctx) + if err == nil { + lock.SetCommitTS(committer.commitTS) + } + log.Debug("[kv]", connID, " txnLatches enabled while txn retryable:", err) + return errors.Trace(err) } func (txn *tikvTxn) close() {