Skip to content

Commit

Permalink
tikv/txn: support local latch in transaction (#6418)
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreMouche authored and jackysp committed May 7, 2018
1 parent 1da01ea commit 4185e53
Show file tree
Hide file tree
Showing 15 changed files with 133 additions and 48 deletions.
55 changes: 33 additions & 22 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,19 @@ var (

// Config contains configuration options.
type Config struct {
Host string `toml:"host" json:"host"`
Port uint `toml:"port" json:"port"`
Store string `toml:"store" json:"store"`
Path string `toml:"path" json:"path"`
Socket string `toml:"socket" json:"socket"`
BinlogSocket string `toml:"binlog-socket" json:"binlog-socket"`
Lease string `toml:"lease" json:"lease"`
RunDDL bool `toml:"run-ddl" json:"run-ddl"`
SplitTable bool `toml:"split-table" json:"split-table"`
TokenLimit uint `toml:"token-limit" json:"token-limit"`
OOMAction string `toml:"oom-action" json:"oom-action"`
EnableStreaming bool `toml:"enable-streaming" json:"enable-streaming"`
Host string `toml:"host" json:"host"`
Port uint `toml:"port" json:"port"`
Store string `toml:"store" json:"store"`
Path string `toml:"path" json:"path"`
Socket string `toml:"socket" json:"socket"`
BinlogSocket string `toml:"binlog-socket" json:"binlog-socket"`
Lease string `toml:"lease" json:"lease"`
RunDDL bool `toml:"run-ddl" json:"run-ddl"`
SplitTable bool `toml:"split-table" json:"split-table"`
TokenLimit uint `toml:"token-limit" json:"token-limit"`
OOMAction string `toml:"oom-action" json:"oom-action"`
EnableStreaming bool `toml:"enable-streaming" json:"enable-streaming"`
TxnLocalLatches TxnLocalLatches `toml:"txn-local-latches" json:"txn-local-latches"`
// Set sys variable lower-case-table-names, ref: https://dev.mysql.com/doc/refman/5.7/en/identifier-case-sensitivity.html.
// TODO: We actually only support mode 2, which keeps the original case, but the comparison is case-insensitive.
LowerCaseTableNames int `toml:"lower-case-table-names" json:"lower-case-table-names"`
Expand Down Expand Up @@ -167,6 +168,12 @@ type PlanCache struct {
Shards uint `toml:"shards" json:"shards"`
}

// TxnLocalLatches is the TxnLocalLatches section of the config.
type TxnLocalLatches struct {
Enabled bool `toml:"enabled" json:"enabled"`
Capacity uint `toml:"capacity" json:"capacity"`
}

// PreparedPlanCache is the PreparedPlanCache section of the config.
type PreparedPlanCache struct {
Enabled bool `toml:"enabled" json:"enabled"`
Expand Down Expand Up @@ -220,16 +227,20 @@ type TiKVClient struct {
}

var defaultConf = Config{
Host: "0.0.0.0",
Port: 4000,
Store: "mocktikv",
Path: "/tmp/tidb",
RunDDL: true,
SplitTable: true,
Lease: "45s",
TokenLimit: 1000,
OOMAction: "log",
EnableStreaming: false,
Host: "0.0.0.0",
Port: 4000,
Store: "mocktikv",
Path: "/tmp/tidb",
RunDDL: true,
SplitTable: true,
Lease: "45s",
TokenLimit: 1000,
OOMAction: "log",
EnableStreaming: false,
TxnLocalLatches: TxnLocalLatches{
Enabled: false,
Capacity: 1024000,
},
LowerCaseTableNames: 2,
Log: Log{
Level: "info",
Expand Down
6 changes: 6 additions & 0 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -214,3 +214,9 @@ grpc-connection-count = 16

# max time for commit command, must be twice bigger than raft election timeout.
commit-timeout = "41s"

[txn-local-latches]
# Enable local latches for transactions. Enable it when
# there are lots of conflicts between transactions.
enabled = false
capacity = 1024000
4 changes: 2 additions & 2 deletions executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (e *SimpleExec) executeAlterUser(s *ast.AlterUserStmt) error {
}
if len(failedUsers) > 0 {
// Commit the transaction even if we returns error
err := e.ctx.Txn().Commit(sessionctx.SetConnID2Ctx(context.Background(), e.ctx))
err := e.ctx.Txn().Commit(sessionctx.SetCommitCtx(context.Background(), e.ctx))
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -238,7 +238,7 @@ func (e *SimpleExec) executeDropUser(s *ast.DropUserStmt) error {
}
if len(failedUsers) > 0 {
// Commit the transaction even if we returns error
err := e.ctx.Txn().Commit(sessionctx.SetConnID2Ctx(context.Background(), e.ctx))
err := e.ctx.Txn().Commit(sessionctx.SetCommitCtx(context.Background(), e.ctx))
if err != nil {
return errors.Trace(err)
}
Expand Down
8 changes: 7 additions & 1 deletion kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ import (
"golang.org/x/net/context"
)

// ContextKey is the type of context's key
type ContextKey string

// Retryable is the key in context
const Retryable ContextKey = "Retryable"

// RunInNewTxn will run the f in a new transaction environment.
func RunInNewTxn(store Storage, retryable bool, f func(txn Transaction) error) error {
var (
Expand Down Expand Up @@ -54,7 +60,7 @@ func RunInNewTxn(store Storage, retryable bool, f func(txn Transaction) error) e
return errors.Trace(err)
}

err = txn.Commit(context.Background())
err = txn.Commit(context.WithValue(context.Background(), Retryable, retryable))
if err == nil {
break
}
Expand Down
2 changes: 1 addition & 1 deletion server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor
}
return errors.Trace(err)
}
return errors.Trace(txn.Commit(sessionctx.SetConnID2Ctx(ctx, loadDataInfo.Ctx)))
return errors.Trace(txn.Commit(sessionctx.SetCommitCtx(ctx, loadDataInfo.Ctx)))
}

// handleLoadStats does the additional work after processing the 'load stats' query.
Expand Down
2 changes: 1 addition & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func (s *session) doCommit(ctx context.Context) error {
schemaVer: s.sessionVars.TxnCtx.SchemaVersion,
relatedTableIDs: tableIDs,
})
if err := s.txn.Commit(sessionctx.SetConnID2Ctx(ctx, s)); err != nil {
if err := s.txn.Commit(sessionctx.SetCommitCtx(ctx, s)); err != nil {
return errors.Trace(err)
}
return nil
Expand Down
22 changes: 16 additions & 6 deletions sessionctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,22 @@ const (
LastExecuteDDL basicCtxType = 3
)

type contextKey string

// ConnID is the key in context.
const ConnID contextKey = "conn ID"
const ConnID kv.ContextKey = "conn ID"

// SetCommitCtx sets the variables for context before commit a transaction.
func SetCommitCtx(ctx context.Context, sessCtx Context) context.Context {
ctx = context.WithValue(ctx, ConnID, sessCtx.GetSessionVars().ConnectionID)
retryAble := !sessCtx.GetSessionVars().TxnCtx.ForUpdate
return context.WithValue(ctx, kv.Retryable, retryAble)
}

// SetConnID2Ctx sets the connection ID to context.
func SetConnID2Ctx(ctx context.Context, sessCtx Context) context.Context {
return context.WithValue(ctx, ConnID, sessCtx.GetSessionVars().ConnectionID)
// GetRetryable returns the value of Retryable from the ctx.
func GetRetryable(ctx context.Context) bool {
var retryable bool
val := ctx.Value(kv.Retryable)
if val != nil {
retryable = val.(bool)
}
return retryable
}
11 changes: 11 additions & 0 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,17 @@ func (c *twoPhaseCommitter) cleanupKeys(bo *Backoffer, keys [][]byte) error {
// should be less than `gcRunInterval`.
const maxTxnTimeUse = 590000

func (c *twoPhaseCommitter) executeAndWriteFinishBinlog(ctx context.Context) error {
err := c.execute(ctx)
if err != nil {
c.writeFinishBinlog(binlog.BinlogType_Rollback, 0)
} else {
c.txn.commitTS = c.commitTS
c.writeFinishBinlog(binlog.BinlogType_Commit, int64(c.commitTS))
}
return errors.Trace(err)
}

// execute executes the two-phase commit protocol.
func (c *twoPhaseCommitter) execute(ctx context.Context) error {
defer func() {
Expand Down
1 change: 1 addition & 0 deletions store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func (s *testCommitterSuite) SetUpTest(c *C) {
spkv := NewMockSafePointKV()
store, err := newTikvStore("mocktikv-store", pdCli, spkv, client, false)
c.Assert(err, IsNil)
store.EnableTxnLocalLatches(1024000)
s.store = store
CommitMaxBackoff = 2000
}
Expand Down
14 changes: 14 additions & 0 deletions store/tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/store/tikv/latch"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/oracle/oracles"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
Expand Down Expand Up @@ -71,6 +72,7 @@ func (d Driver) Open(path string) (kv.Storage, error) {
defer mc.Unlock()

security := config.GetGlobalConfig().Security
txnLocalLatches := config.GetGlobalConfig().TxnLocalLatches
etcdAddrs, disableGC, err := parsePath(path)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -109,6 +111,9 @@ func (d Driver) Open(path string) (kv.Storage, error) {
if err != nil {
return nil, errors.Trace(err)
}
if txnLocalLatches.Enabled {
s.EnableTxnLocalLatches(txnLocalLatches.Capacity)
}
s.etcdAddrs = etcdAddrs
s.tlsConfig = tlsConfig

Expand All @@ -127,6 +132,7 @@ type tikvStore struct {
pdClient pd.Client
regionCache *RegionCache
lockResolver *LockResolver
txnLatches *latch.LatchesScheduler
gcWorker GCHandler
etcdAddrs []string
tlsConfig *tls.Config
Expand Down Expand Up @@ -190,6 +196,10 @@ func newTikvStore(uuid string, pdClient pd.Client, spkv SafePointKV, client Clie
return store, nil
}

func (s *tikvStore) EnableTxnLocalLatches(size uint) {
s.txnLatches = latch.NewScheduler(size)
}

func (s *tikvStore) EtcdAddrs() []string {
return s.etcdAddrs
}
Expand Down Expand Up @@ -274,6 +284,10 @@ func (s *tikvStore) Close() error {
if err := s.client.Close(); err != nil {
return errors.Trace(err)
}

if s.txnLatches != nil {
s.txnLatches.Close()
}
return nil
}

Expand Down
7 changes: 6 additions & 1 deletion store/tikv/latch/latch.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ func (l *Lock) isLocked() bool {
return !l.isStale && l.acquiredCount != len(l.requiredSlots)
}

// SetCommitTS sets the lock's commitTS.
func (l *Lock) SetCommitTS(commitTS uint64) {
l.commitTS = commitTS
}

// Latches which are used for concurrency control.
// Each latch is indexed by a slot's ID, hence the term latch and slot are used in interchangeable,
// but conceptually a latch is a queue, and a slot is an index to the queue
Expand All @@ -98,7 +103,7 @@ type Latches struct {

// NewLatches create a Latches with fixed length,
// the size will be rounded up to the power of 2.
func NewLatches(size int) *Latches {
func NewLatches(size uint) *Latches {
powerOfTwoSize := 1 << uint32(bits.Len32(uint32(size-1)))
slots := make([]latch, powerOfTwoSize)
return &Latches{
Expand Down
6 changes: 3 additions & 3 deletions store/tikv/latch/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type LatchesScheduler struct {
}

// NewScheduler create the LatchesScheduler.
func NewScheduler(size int) *LatchesScheduler {
func NewScheduler(size uint) *LatchesScheduler {
latches := NewLatches(size)
unlockCh := make(chan *Lock, lockChanSize)
scheduler := &LatchesScheduler{
Expand Down Expand Up @@ -83,8 +83,8 @@ func (scheduler *LatchesScheduler) Lock(startTS uint64, keys [][]byte) *Lock {
return lock
}

// UnLock unlocks a lock with commitTS.
func (scheduler *LatchesScheduler) UnLock(lock *Lock, commitTS uint64) {
// UnLock unlocks a lock.
func (scheduler *LatchesScheduler) UnLock(lock *Lock) {
scheduler.RLock()
defer scheduler.RUnlock()
if !scheduler.closed {
Expand Down
3 changes: 2 additions & 1 deletion store/tikv/latch/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ func (s *testSchedulerSuite) TestWithConcurrency(c *C) {
for _, txn := range txns {
go func(txn [][]byte, wg *sync.WaitGroup) {
lock := sched.Lock(getTso(), txn)
defer sched.UnLock(lock, getTso())
defer sched.UnLock(lock)
if lock.IsStale() {
// Should restart the transaction or return error
} else {
lock.SetCommitTS(getTso())
// Do 2pc
}
wg.Done()
Expand Down
1 change: 1 addition & 0 deletions store/tikv/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func NewTestTiKVStore(client Client, pdClient pd.Client, clientHijack func(Clien
uid := uuid.NewV4().String()
spkv := NewMockSafePointKV()
tikvStore, err := newTikvStore(uid, pdCli, spkv, client, false)
tikvStore.EnableTxnLocalLatches(1024000)
tikvStore.mock = true
return tikvStore, errors.Trace(err)
}
39 changes: 29 additions & 10 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
binlog "github.com/pingcap/tipb/go-binlog"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -185,20 +184,40 @@ func (txn *tikvTxn) Commit(ctx context.Context) error {
connID = val.(uint64)
}
committer, err := newTwoPhaseCommitter(txn, connID)
if err != nil {
if err != nil || committer == nil {
return errors.Trace(err)
}
if committer == nil {
return nil
// latches disabled
if txn.store.txnLatches == nil {
err = committer.executeAndWriteFinishBinlog(ctx)
log.Debug("[kv]", connID, " txnLatches disabled, 2pc directly:", err)
return errors.Trace(err)
}
err = committer.execute(ctx)
if err != nil {
committer.writeFinishBinlog(binlog.BinlogType_Rollback, 0)

// latches enabled
// for transactions not retryable, commit directly.
if !sessionctx.GetRetryable(ctx) {
err = committer.executeAndWriteFinishBinlog(ctx)
if err == nil {
txn.store.txnLatches.RefreshCommitTS(committer.keys, committer.commitTS)
}
log.Debug("[kv]", connID, " txnLatches enabled while txn not retryable, 2pc directly:", err)
return errors.Trace(err)
}
committer.writeFinishBinlog(binlog.BinlogType_Commit, int64(committer.commitTS))
txn.commitTS = committer.commitTS
return nil

// for transactions which need to acquire latches
lock := txn.store.txnLatches.Lock(committer.startTS, committer.keys)
defer txn.store.txnLatches.UnLock(lock)
if lock.IsStale() {
err = errors.Errorf("startTS %d is stale", txn.startTS)
return errors.Annotate(err, txnRetryableMark)
}
err = committer.executeAndWriteFinishBinlog(ctx)
if err == nil {
lock.SetCommitTS(committer.commitTS)
}
log.Debug("[kv]", connID, " txnLatches enabled while txn retryable:", err)
return errors.Trace(err)
}

func (txn *tikvTxn) close() {
Expand Down

0 comments on commit 4185e53

Please sign in to comment.