diff --git a/config/config.go b/config/config.go index cada69fe3edbd..7313af776a9a0 100644 --- a/config/config.go +++ b/config/config.go @@ -82,6 +82,7 @@ type Config struct { Binlog Binlog `toml:"binlog" json:"binlog"` CompatibleKillQuery bool `toml:"compatible-kill-query" json:"compatible-kill-query"` Plugin Plugin `toml:"plugin" json:"plugin"` + PessimisticTxn PessimisticTxn `toml:"pessimistic-txn" json:"pessimistic_txn"` CheckMb4ValueInUTF8 bool `toml:"check-mb4-value-in-utf8" json:"check-mb4-value-in-utf8"` // TreatOldVersionUTF8AsUTF8MB4 is use to treat old version table/column UTF8 charset as UTF8MB4. This is for compatibility. // Currently not support dynamic modify, because this need to reload all old version schema. @@ -291,6 +292,18 @@ type Plugin struct { Load string `toml:"load" json:"load"` } +// PessimisticTxn is the config for pessimistic transaction. +type PessimisticTxn struct { + // Enable must be true for 'begin lock' or session variable to start a pessimistic transaction. + Enable bool `toml:"enable" json:"enable"` + // Starts a pessimistic transaction by default when Enable is true. + Default bool `toml:"default" json:"default"` + // The max count of retry for a single statement in a pessimistic transaction. + MaxRetryCount uint `toml:"max-retry-count" json:"max-retry-count"` + // The pessimistic lock ttl in milliseconds. + TTL uint64 `toml:"ttl" json:"ttl"` +} + var defaultConf = Config{ Host: "0.0.0.0", AdvertiseAddress: "", @@ -374,6 +387,12 @@ var defaultConf = Config{ WriteTimeout: "15s", Strategy: "range", }, + PessimisticTxn: PessimisticTxn{ + Enable: false, + Default: false, + MaxRetryCount: 256, + TTL: 60 * 1000, + }, } var ( diff --git a/config/config.toml.example b/config/config.toml.example index a8e8a15abd23e..589046ee46a1d 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -282,4 +282,17 @@ ignore-error = false binlog-socket = "" # the strategy for sending binlog to pump, value can be "range" or "hash" now. -strategy = "range" \ No newline at end of file +strategy = "range" + +[pessimistic-txn] +# enable pessimistic transaction. +enable = false + +# start pessimistic transaction by default. +default = false + +# max retry count for a statement in a pessimistic transaction. +max-retry-count = 256 + +# default TTL in milliseconds for pessimistic lock. +ttl = 60000 diff --git a/ddl/index.go b/ddl/index.go index 2164d763ac685..9e43ce42a4c5c 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -793,7 +793,7 @@ func (w *addIndexWorker) backfillIndexInTxn(handleRange reorgIndexTask) (taskCtx // Lock the row key to notify us that someone delete or update the row, // then we should not backfill the index of it, otherwise the adding index is redundant. - err := txn.LockKeys(idxRecord.key) + err := txn.LockKeys(context.Background(), 0, idxRecord.key) if err != nil { return errors.Trace(err) } diff --git a/executor/adapter.go b/executor/adapter.go index 07ac3fcfd0fa1..3709a34ecfbd6 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "math" + "strconv" "strings" "sync/atomic" "time" @@ -39,6 +40,7 @@ import ( "github.com/pingcap/tidb/plugin" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/sqlexec" @@ -151,8 +153,10 @@ type ExecStmt struct { Ctx sessionctx.Context // StartTime stands for the starting time when executing the statement. - StartTime time.Time - isPreparedStmt bool + StartTime time.Time + isPreparedStmt bool + isSelectForUpdate bool + retryCount uint } // OriginText returns original statement as a string. @@ -219,7 +223,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) { }() } - e, err := a.buildExecutor(sctx) + e, err := a.buildExecutor() if err != nil { return nil, err } @@ -246,19 +250,29 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) { a.Ctx.GetSessionVars().StmtCtx.StmtType = GetStmtLabel(a.StmtNode) } + isPessimistic := sctx.GetSessionVars().TxnCtx.IsPessimistic + + // Special handle for "select for update statement" in pessimistic transaction. + if isPessimistic && a.isSelectForUpdate { + return a.handlePessimisticSelectForUpdate(ctx, e) + } + // If the executor doesn't return any result to the client, we execute it without delay. if e.Schema().Len() == 0 { - return a.handleNoDelayExecutor(ctx, sctx, e) + if isPessimistic { + return nil, a.handlePessimisticDML(ctx, e) + } + return a.handleNoDelayExecutor(ctx, e) } else if proj, ok := e.(*ProjectionExec); ok && proj.calculateNoDelay { // Currently this is only for the "DO" statement. Take "DO 1, @a=2;" as an example: // the Projection has two expressions and two columns in the schema, but we should // not return the result of the two expressions. - return a.handleNoDelayExecutor(ctx, sctx, e) + return a.handleNoDelayExecutor(ctx, e) } var txnStartTS uint64 - txn, err1 := sctx.Txn(false) - if err1 != nil { + txn, err := sctx.Txn(false) + if err != nil { return nil, err } if txn.Valid() { @@ -271,7 +285,81 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) { }, nil } -func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Context, e Executor) (sqlexec.RecordSet, error) { +type chunkRowRecordSet struct { + rows []chunk.Row + idx int + fields []*ast.ResultField + e Executor +} + +func (c *chunkRowRecordSet) Fields() []*ast.ResultField { + return c.fields +} + +func (c *chunkRowRecordSet) Next(ctx context.Context, req *chunk.RecordBatch) error { + chk := req.Chunk + chk.Reset() + for !chk.IsFull() && c.idx < len(c.rows) { + chk.AppendRow(c.rows[c.idx]) + c.idx++ + } + return nil +} + +func (c *chunkRowRecordSet) NewRecordBatch() *chunk.RecordBatch { + return chunk.NewRecordBatch(c.e.newFirstChunk()) +} + +func (c *chunkRowRecordSet) Close() error { + return nil +} + +func (a *ExecStmt) handlePessimisticSelectForUpdate(ctx context.Context, e Executor) (sqlexec.RecordSet, error) { + for { + rs, err := a.runPessimisticSelectForUpdate(ctx, e) + e, err = a.handlePessimisticLockError(ctx, err) + if err != nil { + return nil, err + } + if e == nil { + return rs, nil + } + } +} + +func (a *ExecStmt) runPessimisticSelectForUpdate(ctx context.Context, e Executor) (sqlexec.RecordSet, error) { + rs := &recordSet{ + executor: e, + stmt: a, + } + defer func() { + terror.Log(rs.Close()) + }() + + var rows []chunk.Row + var err error + fields := rs.Fields() + req := rs.NewRecordBatch() + for { + err = rs.Next(ctx, req) + if err != nil { + // Handle 'write conflict' error. + break + } + if req.NumRows() == 0 { + return &chunkRowRecordSet{rows: rows, fields: fields, e: e}, nil + } + iter := chunk.NewIterator4Chunk(req.Chunk) + for r := iter.Begin(); r != iter.End(); r = iter.Next() { + rows = append(rows, r) + } + req.Chunk = chunk.Renew(req.Chunk, a.Ctx.GetSessionVars().MaxChunkSize) + } + return nil, err +} + +func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e Executor) (sqlexec.RecordSet, error) { + sctx := a.Ctx if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("executor.handleNoDelayExecutor", opentracing.ChildOf(span.Context())) defer span1.Finish() @@ -297,12 +385,114 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Co if err != nil { return nil, err } + return nil, err +} + +func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error { + sctx := a.Ctx + txn, err := sctx.Txn(true) + if err != nil { + return err + } + txnCtx := sctx.GetSessionVars().TxnCtx + for { + _, err = a.handleNoDelayExecutor(ctx, e) + if err != nil { + return err + } + keys, err1 := txn.(pessimisticTxn).KeysNeedToLock() + if err1 != nil { + return err1 + } + if len(keys) == 0 { + return nil + } + forUpdateTS := txnCtx.GetForUpdateTS() + err = txn.LockKeys(ctx, forUpdateTS, keys...) + e, err = a.handlePessimisticLockError(ctx, err) + if err != nil { + return err + } + if e == nil { + return nil + } + } +} + +// handlePessimisticLockError updates TS and rebuild executor if the err is write conflict. +func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (Executor, error) { + if err == nil { + return nil, nil + } + errStr := err.Error() + if !strings.Contains(errStr, util.WriteConflictMarker) { + return nil, err + } + if a.retryCount >= config.GetGlobalConfig().PessimisticTxn.MaxRetryCount { + return nil, errors.New("pessimistic lock retry limit reached") + } + a.retryCount++ + conflictTS := extractConflictTS(errStr) + if conflictTS == 0 { + logutil.Logger(ctx).Warn("failed to extract conflictTS from a conflict error") + } + sctx := a.Ctx + txnCtx := sctx.GetSessionVars().TxnCtx + forUpdateTS := txnCtx.GetForUpdateTS() + logutil.Logger(ctx).Info("pessimistic write conflict, retry statement", + zap.Uint64("txn", txnCtx.StartTS), + zap.Uint64("forUpdateTS", forUpdateTS), + zap.Uint64("conflictTS", conflictTS)) + if conflictTS > txnCtx.GetForUpdateTS() { + txnCtx.SetForUpdateTS(conflictTS) + } else { + ts, err1 := sctx.GetStore().GetOracle().GetTimestamp(ctx) + if err1 != nil { + return nil, err1 + } + txnCtx.SetForUpdateTS(ts) + } + e, err := a.buildExecutor() + if err != nil { + return nil, err + } + // Rollback the statement change before retry it. + sctx.StmtRollback() + sctx.GetSessionVars().StmtCtx.ResetForRetry() + + if err = e.Open(ctx); err != nil { + return nil, err + } + return e, nil +} + +func extractConflictTS(errStr string) uint64 { + strs := strings.Split(errStr, "conflictTS=") + if len(strs) != 2 { + return 0 + } + tsPart := strs[1] + length := strings.IndexByte(tsPart, ',') + if length < 0 { + return 0 + } + tsStr := tsPart[:length] + ts, err := strconv.ParseUint(tsStr, 10, 64) + if err != nil { + return 0 + } + return ts +} - return nil, nil +type pessimisticTxn interface { + kv.Transaction + // KeysNeedToLock returns the keys need to be locked. + KeysNeedToLock() ([]kv.Key, error) } // buildExecutor build a executor from plan, prepared statement may need additional procedure. -func (a *ExecStmt) buildExecutor(ctx sessionctx.Context) (Executor, error) { +func (a *ExecStmt) buildExecutor() (Executor, error) { + ctx := a.Ctx if _, ok := a.Plan.(*plannercore.Execute); !ok { // Do not sync transaction for Execute statement, because the real optimization work is done in // "ExecuteExec.Build". @@ -344,7 +534,7 @@ func (a *ExecStmt) buildExecutor(ctx sessionctx.Context) (Executor, error) { // ExecuteExec is not a real Executor, we only use it to build another Executor from a prepared statement. if executorExec, ok := e.(*ExecuteExec); ok { - err := executorExec.Build() + err := executorExec.Build(b) if err != nil { return nil, err } @@ -352,6 +542,7 @@ func (a *ExecStmt) buildExecutor(ctx sessionctx.Context) (Executor, error) { a.Plan = executorExec.plan e = executorExec.stmtExec } + a.isSelectForUpdate = b.isSelectForUpdate return e, nil } diff --git a/executor/admin.go b/executor/admin.go index dd05865e42038..e6721dd976f89 100644 --- a/executor/admin.go +++ b/executor/admin.go @@ -431,7 +431,7 @@ func (e *RecoverIndexExec) backfillIndexInTxn(ctx context.Context, txn kv.Transa } recordKey := e.table.RecordKey(row.handle) - err := txn.LockKeys(recordKey) + err := txn.LockKeys(ctx, 0, recordKey) if err != nil { return result, err } diff --git a/executor/builder.go b/executor/builder.go index 915ed68ebccc7..8e68f586f5f2d 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -70,7 +70,8 @@ type executorBuilder struct { is infoschema.InfoSchema startTS uint64 // cached when the first time getStartTS() is called // err is set when there is error happened during Executor building process. - err error + err error + isSelectForUpdate bool } func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema) *executorBuilder { @@ -471,6 +472,10 @@ func (b *executorBuilder) buildDeallocate(v *plannercore.Deallocate) Executor { } func (b *executorBuilder) buildSelectLock(v *plannercore.PhysicalLock) Executor { + b.isSelectForUpdate = true + // Build 'select for update' using the 'for update' ts. + b.startTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS() + src := b.build(v.Children()[0]) if b.err != nil { return nil @@ -591,6 +596,7 @@ func (b *executorBuilder) buildSet(v *plannercore.Set) Executor { } func (b *executorBuilder) buildInsert(v *plannercore.Insert) Executor { + b.startTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS() selectExec := b.build(v.SelectPlan) if b.err != nil { return nil @@ -1265,6 +1271,7 @@ func (b *executorBuilder) buildUpdate(v *plannercore.Update) Executor { for id := range v.SelectPlan.Schema().TblID2Handle { tblID2table[id], _ = b.is.TableByID(id) } + b.startTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS() selExec := b.build(v.SelectPlan) if b.err != nil { return nil @@ -1346,6 +1353,7 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor { for id := range v.SelectPlan.Schema().TblID2Handle { tblID2table[id], _ = b.is.TableByID(id) } + b.startTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS() selExec := b.build(v.SelectPlan) if b.err != nil { return nil diff --git a/executor/executor.go b/executor/executor.go index afbc20045ba23..16bf5746049dd 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -656,6 +656,11 @@ func (e *SelectLockExec) Open(ctx context.Context) error { // Next implements the Executor Next interface. func (e *SelectLockExec) Next(ctx context.Context, req *chunk.RecordBatch) error { + if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("selectLock.Next", opentracing.ChildOf(span.Context())) + defer span1.Finish() + } + req.GrowAndReset(e.maxChunkSize) err := e.children[0].Next(ctx, req) if err != nil { @@ -671,13 +676,17 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.RecordBatch) error } keys := make([]kv.Key, 0, req.NumRows()) iter := chunk.NewIterator4Chunk(req.Chunk) + forUpdateTS := e.ctx.GetSessionVars().TxnCtx.GetForUpdateTS() for id, cols := range e.Schema().TblID2Handle { for _, col := range cols { keys = keys[:0] for row := iter.Begin(); row != iter.End(); row = iter.Next() { keys = append(keys, tablecodec.EncodeRowKeyWithHandle(id, row.GetInt64(col.Index))) } - err = txn.LockKeys(keys...) + if len(keys) == 0 { + continue + } + err = txn.LockKeys(ctx, forUpdateTS, keys...) if err != nil { return err } diff --git a/executor/point_get.go b/executor/point_get.go index 9ec774c4621e3..d045c97935de7 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -154,7 +154,16 @@ func (e *PointGetExecutor) get(key kv.Key) (val []byte, err error) { return nil, err } if txn != nil && txn.Valid() && !txn.IsReadOnly() { - return txn.Get(key) + // We cannot use txn.Get directly here because the snapshot in txn and the snapshot of e.snapshot may be + // different for pessimistic transaction. + val, err = txn.GetMemBuffer().Get(key) + if err == nil { + return val, err + } + if !kv.IsErrNotFound(err) { + return nil, err + } + // fallthrough to snapshot get. } return e.snapshot.Get(key) } diff --git a/executor/prepared.go b/executor/prepared.go index bfdb1cc6611f3..73dbd092e7d46 100644 --- a/executor/prepared.go +++ b/executor/prepared.go @@ -217,7 +217,7 @@ func (e *ExecuteExec) Next(ctx context.Context, req *chunk.RecordBatch) error { // Build builds a prepared statement into an executor. // After Build, e.StmtExec will be used to do the real execution. -func (e *ExecuteExec) Build() error { +func (e *ExecuteExec) Build(b *executorBuilder) error { ok, err := IsPointGetWithPKOrUniqueKeyByAutoCommit(e.ctx, e.plan) if err != nil { return err @@ -228,7 +228,6 @@ func (e *ExecuteExec) Build() error { if err != nil { return err } - b := newExecutorBuilder(e.ctx, e.is) stmtExec := b.build(e.plan) if b.err != nil { log.Warn("rebuild plan in EXECUTE statement failed", zap.String("labelName of PREPARE statement", e.name)) diff --git a/executor/simple.go b/executor/simple.go index f742e86844931..fb33fa81a82f6 100644 --- a/executor/simple.go +++ b/executor/simple.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/plugin" "github.com/pingcap/tidb/privilege" "github.com/pingcap/tidb/sessionctx" @@ -398,9 +399,17 @@ func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error { // reverts to its previous state. e.ctx.GetSessionVars().SetStatusFlag(mysql.ServerStatusInTrans, true) // Call ctx.Txn(true) to active pending txn. - if _, err := e.ctx.Txn(true); err != nil { + pTxnConf := config.GetGlobalConfig().PessimisticTxn + if pTxnConf.Enable && (s.Pessimistic || pTxnConf.Default || e.ctx.GetSessionVars().PessimisticLock) { + e.ctx.GetSessionVars().TxnCtx.IsPessimistic = true + } + txn, err := e.ctx.Txn(true) + if err != nil { return err } + if e.ctx.GetSessionVars().TxnCtx.IsPessimistic { + txn.SetOption(kv.Pessimistic, true) + } return nil } diff --git a/go.mod b/go.mod index 70a6f40911b88..c210b8ad3f234 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,7 @@ require ( github.com/pingcap/errors v0.11.1 github.com/pingcap/failpoint v0.0.0-20190422094118-d8535965f59b github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e - github.com/pingcap/kvproto v0.0.0-20190327032727-3d8cb3a30d5d + github.com/pingcap/kvproto v0.0.0-20190425131531-4ed0aa16f7ea github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 github.com/pingcap/parser v0.0.0-20190506092653-e336082eb825 github.com/pingcap/pd v0.0.0-20190424024702-bd1e2496a669 diff --git a/go.sum b/go.sum index 288f6e6d080d2..c0f237d505cdc 100644 --- a/go.sum +++ b/go.sum @@ -155,8 +155,9 @@ github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3 h1:04yuCf5NMvLU8rB2 github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3/go.mod h1:DazNTg0PTldtpsQiT9I5tVJwV1onHMKBBgXzmJUlMns= github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8= github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= -github.com/pingcap/kvproto v0.0.0-20190327032727-3d8cb3a30d5d h1:LJYJl+cBhkkTWD79n+n9Bp4agQ85SdF9YKY4zEtL9Kw= github.com/pingcap/kvproto v0.0.0-20190327032727-3d8cb3a30d5d/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= +github.com/pingcap/kvproto v0.0.0-20190425131531-4ed0aa16f7ea h1:70nTOXpmTdRTrlEvVP+ZYexYe8RIgqyGibHmg0VLuoI= +github.com/pingcap/kvproto v0.0.0-20190425131531-4ed0aa16f7ea/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w= github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3FwJptMTt6MEPdzK1Wt99oaefQ= github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw= diff --git a/kv/kv.go b/kv/kv.go index 2a959e61df001..7e98844c2082f 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -45,6 +45,8 @@ const ( SyncLog // KeyOnly retrieve only keys, it can be used in scan now. KeyOnly + // Pessimistic is defined for pessimistic lock + Pessimistic ) // Priority value for transaction priority. @@ -132,7 +134,7 @@ type Transaction interface { // String implements fmt.Stringer interface. String() string // LockKeys tries to lock the entries with the keys in KV store. - LockKeys(keys ...Key) error + LockKeys(ctx context.Context, forUpdateTS uint64, keys ...Key) error // SetOption sets an option with a value, when val is nil, uses the default // value of this option. SetOption(opt Option, val interface{}) @@ -153,6 +155,7 @@ type Transaction interface { SetAssertion(key Key, assertion AssertionType) // BatchGet gets kv from the memory buffer of statement and transaction, and the kv storage. BatchGet(keys []Key) (map[string][]byte, error) + IsPessimistic() bool } // Client is used to send request to KV layer. diff --git a/kv/mock.go b/kv/mock.go index a3fc01f4908a8..a595a0f74f61b 100644 --- a/kv/mock.go +++ b/kv/mock.go @@ -39,7 +39,7 @@ func (t *mockTxn) String() string { return "" } -func (t *mockTxn) LockKeys(keys ...Key) error { +func (t *mockTxn) LockKeys(_ context.Context, _ uint64, _ ...Key) error { return nil } @@ -135,6 +135,10 @@ func (s *mockStorage) Begin() (Transaction, error) { return tx, nil } +func (*mockTxn) IsPessimistic() bool { + return false +} + // BeginWithStartTS begins a transaction with startTS. func (s *mockStorage) BeginWithStartTS(startTS uint64) (Transaction, error) { return s.Begin() diff --git a/kv/mock_test.go b/kv/mock_test.go index 86e0e91141a3f..67b4193f7ea9f 100644 --- a/kv/mock_test.go +++ b/kv/mock_test.go @@ -37,7 +37,7 @@ func (s testMockSuite) TestInterface(c *C) { transaction, err := storage.Begin() c.Check(err, IsNil) - err = transaction.LockKeys(Key("lock")) + err = transaction.LockKeys(context.Background(), 0, Key("lock")) c.Check(err, IsNil) transaction.SetOption(Option(23), struct{}{}) if mock, ok := transaction.(*mockTxn); ok { diff --git a/session/session.go b/session/session.go index 1935af30a6a55..fd97c34ffef38 100644 --- a/session/session.go +++ b/session/session.go @@ -394,8 +394,10 @@ func (s *session) doCommit(ctx context.Context) error { func (s *session) doCommitWithRetry(ctx context.Context) error { var txnSize int + var isPessimistic bool if s.txn.Valid() { txnSize = s.txn.Size() + isPessimistic = s.txn.IsPessimistic() } err := s.doCommit(ctx) if err != nil { @@ -412,7 +414,7 @@ func (s *session) doCommitWithRetry(ctx context.Context) error { // Don't retry in BatchInsert mode. As a counter-example, insert into t1 select * from t2, // BatchInsert already commit the first batch 1000 rows, then it commit 1000-2000 and retry the statement, // Finally t1 will have more data than t2, with no errors return to user! - if s.isRetryableError(err) && !s.sessionVars.BatchInsert && commitRetryLimit > 0 { + if s.isRetryableError(err) && !s.sessionVars.BatchInsert && commitRetryLimit > 0 && !isPessimistic { logutil.Logger(ctx).Warn("sql", zap.String("label", s.getSQLLabel()), zap.Error(err), @@ -1220,6 +1222,9 @@ func (s *session) Txn(active bool) (kv.Transaction, error) { return &s.txn, err } s.sessionVars.TxnCtx.StartTS = s.txn.StartTS() + if s.sessionVars.TxnCtx.IsPessimistic { + s.txn.SetOption(kv.Pessimistic, true) + } if !s.sessionVars.IsAutocommit() { s.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, true) } @@ -1733,6 +1738,12 @@ func (s *session) PrepareTxnCtx(ctx context.Context) { SchemaVersion: is.SchemaMetaVersion(), CreateTime: time.Now(), } + if !s.sessionVars.IsAutocommit() { + txnConf := config.GetGlobalConfig().PessimisticTxn + if txnConf.Enable && (txnConf.Default || s.sessionVars.PessimisticLock) { + s.sessionVars.TxnCtx.IsPessimistic = true + } + } } // RefreshTxnCtx implements context.RefreshTxnCtx interface. diff --git a/session/session_test.go b/session/session_test.go index 155e10fdc4106..bb3827a2004fe 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -2612,3 +2612,51 @@ func (s *testSessionSuite) TestTxnGoString(c *C) { tk.MustExec("rollback") c.Assert(fmt.Sprintf("%#v", txn), Equals, "Txn{state=invalid}") } + +func (s *testSessionSuite) TestPessimisticTxn(c *C) { + if !config.GetGlobalConfig().PessimisticTxn.Enable { + c.Skip("pessimistic transaction is not enabled") + } + tk := testkit.NewTestKitWithInit(c, s.store) + // Make the name has different indent for easier read. + tk1 := testkit.NewTestKitWithInit(c, s.store) + + tk.MustExec("drop table if exists pessimistic") + tk.MustExec("create table pessimistic (k int, v int)") + tk.MustExec("insert into pessimistic values (1, 1)") + + // t1 lock, t2 update, t1 update and retry statement. + tk1.MustExec("begin lock") + + tk.MustExec("update pessimistic set v = 2 where v = 1") + + // Update can see the change, so this statement affects 0 roews. + tk1.MustExec("update pessimistic set v = 3 where v = 1") + c.Assert(tk1.Se.AffectedRows(), Equals, uint64(0)) + // select for update can see the change of another transaction. + tk1.MustQuery("select * from pessimistic for update").Check(testkit.Rows("1 2")) + // plain select can not see the change of another transaction. + tk1.MustQuery("select * from pessimistic").Check(testkit.Rows("1 1")) + tk1.MustExec("update pessimistic set v = 3 where v = 2") + c.Assert(tk1.Se.AffectedRows(), Equals, uint64(1)) + + // pessimistic lock doesn't block read operation of other transactions. + tk.MustQuery("select * from pessimistic").Check(testkit.Rows("1 2")) + + tk1.MustExec("commit") + tk1.MustQuery("select * from pessimistic").Check(testkit.Rows("1 3")) + + // t1 lock, t1 select for update, t2 wait t1. + tk1.MustExec("begin lock") + tk1.MustExec("select * from pessimistic where k = 1 for update") + finishCh := make(chan struct{}) + go func() { + tk.MustExec("update pessimistic set v = 5 where k = 1") + finishCh <- struct{}{} + }() + time.Sleep(time.Millisecond * 10) + tk1.MustExec("update pessimistic set v = 3 where k = 1") + tk1.MustExec("commit") + <-finishCh + tk.MustQuery("select * from pessimistic").Check(testkit.Rows("1 5")) +} diff --git a/session/tidb.go b/session/tidb.go index 2be35eb4165b3..0863a1fc3f63c 100644 --- a/session/tidb.go +++ b/session/tidb.go @@ -163,6 +163,10 @@ func finishStmt(ctx context.Context, sctx sessionctx.Context, se *session, sessV if !sessVars.InTxn() { logutil.Logger(context.Background()).Info("rollbackTxn for ddl/autocommit error.") se.RollbackTxn(ctx) + } else if se.txn.Valid() && se.txn.IsPessimistic() && strings.Contains(meetsErr.Error(), "deadlock") { + logutil.Logger(context.Background()).Info("rollbackTxn for deadlock error", zap.Uint64("txn", se.txn.StartTS())) + meetsErr = errDeadlock + se.RollbackTxn(ctx) } return meetsErr } @@ -324,15 +328,18 @@ func IsQuery(sql string) bool { var ( errForUpdateCantRetry = terror.ClassSession.New(codeForUpdateCantRetry, mysql.MySQLErrName[mysql.ErrForUpdateCantRetry]) + errDeadlock = terror.ClassSession.New(codeDeadlock, mysql.MySQLErrName[mysql.ErrLockDeadlock]) ) const ( codeForUpdateCantRetry terror.ErrCode = mysql.ErrForUpdateCantRetry + codeDeadlock terror.ErrCode = mysql.ErrLockDeadlock ) func init() { sessionMySQLErrCodes := map[terror.ErrCode]uint16{ codeForUpdateCantRetry: mysql.ErrForUpdateCantRetry, + codeDeadlock: mysql.ErrLockDeadlock, } terror.ErrClassToMySQLCodes[terror.ClassSession] = sessionMySQLErrCodes } diff --git a/session/txn.go b/session/txn.go index b646e5df00d11..4942b6710cae8 100644 --- a/session/txn.go +++ b/session/txn.go @@ -14,6 +14,7 @@ package session import ( + "bytes" "context" "fmt" "strings" @@ -28,6 +29,7 @@ import ( "github.com/pingcap/tidb/sessionctx/binloginfo" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tipb/go-binlog" @@ -305,6 +307,45 @@ func (st *TxnState) cleanup() { } } +// KeysNeedToLock returns the keys need to be locked. +func (st *TxnState) KeysNeedToLock() ([]kv.Key, error) { + keys := make([]kv.Key, 0, st.buf.Len()) + if err := kv.WalkMemBuffer(st.buf, func(k kv.Key, v []byte) error { + if !keyNeedToLock(k, v) { + return nil + } + if mb := st.Transaction.GetMemBuffer(); mb != nil { + _, err1 := mb.Get(k) + if err1 == nil { + // Key is already in txn MemBuffer, must already been locked, we don't need to lock it again. + return nil + } + } + // The statement MemBuffer will be reused, so we must copy the key here. + keys = append(keys, append([]byte{}, k...)) + return nil + }); err != nil { + return nil, err + } + return keys, nil +} + +func keyNeedToLock(k, v []byte) bool { + isTableKey := bytes.HasPrefix(k, tablecodec.TablePrefix()) + if !isTableKey { + // meta key always need to lock. + return true + } + isDelete := len(v) == 0 + if isDelete { + // only need to delete row key. + return k[10] == 'r' + } + isNonUniqueIndex := len(v) == 1 && v[0] == '0' + // Put row key and unique index need to lock. + return !isNonUniqueIndex +} + func getBinlogMutation(ctx sessionctx.Context, tableID int64) *binlog.TableMutation { bin := binloginfo.GetPrewriteValue(ctx, true) for i := range bin.Mutations { diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 2cd81c4a86a1e..fdf88be869835 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -100,6 +100,7 @@ func (r *RetryInfo) GetCurrAutoIncrementID() (int64, error) { // TransactionContext is used to store variables that has transaction scope. type TransactionContext struct { ForUpdate bool + forUpdateTS uint64 DirtyDB interface{} Binlog interface{} InfoSchema interface{} @@ -108,6 +109,7 @@ type TransactionContext struct { StartTS uint64 Shard *int64 TableDeltaMap map[int64]TableDelta + IsPessimistic bool // For metrics. CreateTime time.Time @@ -145,6 +147,21 @@ func (tc *TransactionContext) ClearDelta() { tc.TableDeltaMap = nil } +// GetForUpdateTS returns the ts for update. +func (tc *TransactionContext) GetForUpdateTS() uint64 { + if tc.forUpdateTS > tc.StartTS { + return tc.forUpdateTS + } + return tc.StartTS +} + +// SetForUpdateTS sets the ts for update. +func (tc *TransactionContext) SetForUpdateTS(forUpdateTS uint64) { + if forUpdateTS > tc.forUpdateTS { + tc.forUpdateTS = forUpdateTS + } +} + // WriteStmtBufs can be used by insert/replace/delete/update statement. // TODO: use a common memory pool to replace this. type WriteStmtBufs struct { @@ -356,6 +373,9 @@ type SessionVars struct { // EnableFastAnalyze indicates whether to take fast analyze. EnableFastAnalyze bool + + // PessimisticLock indicates whether new transaction should be pessimistic . + PessimisticLock bool } // ConnectionInfo present connection used by audit. @@ -771,6 +791,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { s.EnableFastAnalyze = TiDBOptOn(val) case TiDBWaitTableSplitFinish: s.WaitTableSplitFinish = TiDBOptOn(val) + case TiDBPessimisticLock: + s.PessimisticLock = TiDBOptOn(val) } s.systems[name] = val return nil diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index fd32280801da8..d419607a5aacb 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -674,6 +674,7 @@ var defaultSysVars = []*SysVar{ {ScopeGlobal | ScopeSession, TiDBRetryLimit, strconv.Itoa(DefTiDBRetryLimit)}, {ScopeGlobal | ScopeSession, TiDBDisableTxnAutoRetry, BoolToIntStr(DefTiDBDisableTxnAutoRetry)}, {ScopeGlobal | ScopeSession, TiDBConstraintCheckInPlace, BoolToIntStr(DefTiDBConstraintCheckInPlace)}, + {ScopeSession, TiDBPessimisticLock, strconv.Itoa(DefTiDBPessimisticLock)}, {ScopeSession, TiDBOptimizerSelectivityLevel, strconv.Itoa(DefTiDBOptimizerSelectivityLevel)}, {ScopeGlobal | ScopeSession, TiDBEnableWindowFunction, BoolToIntStr(DefEnableWindowFunction)}, {ScopeGlobal | ScopeSession, TiDBEnableFastAnalyze, BoolToIntStr(DefTiDBUseFastAnalyze)}, diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 949591d4810eb..b8ae7498ec1cb 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -122,6 +122,9 @@ const ( // tidb_optimizer_selectivity_level is used to control the selectivity estimation level. TiDBOptimizerSelectivityLevel = "tidb_optimizer_selectivity_level" + // tidb_pessimistic_lock is used to control the transactin behavior. + TiDBPessimisticLock = "tidb_pessimistic_lock" + // tidb_enable_table_partition is used to control table partition feature. // The valid value include auto/on/off: // auto: enable table partition when that feature is implemented. @@ -311,6 +314,7 @@ const ( DefTiDBHashJoinConcurrency = 5 DefTiDBProjectionConcurrency = 4 DefTiDBOptimizerSelectivityLevel = 0 + DefTiDBPessimisticLock = 0 DefTiDBDDLReorgWorkerCount = 16 DefTiDBDDLReorgBatchSize = 1024 DefTiDBDDLErrorCountLimit = 512 diff --git a/store/mockstore/mocktikv/mock_tikv_test.go b/store/mockstore/mocktikv/mock_tikv_test.go index 78dd9a46f298f..1e1d66ff0152a 100644 --- a/store/mockstore/mocktikv/mock_tikv_test.go +++ b/store/mockstore/mocktikv/mock_tikv_test.go @@ -20,6 +20,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/tidb/util" ) func TestT(t *testing.T) { @@ -512,7 +513,7 @@ func (s *testMockTiKVSuite) TestDeleteRange(c *C) { func (s *testMockTiKVSuite) mustWriteWriteConflict(c *C, errs []error, i int) { c.Assert(errs[i], NotNil) - c.Assert(strings.Contains(errs[i].Error(), writeConflictMarker), IsTrue) + c.Assert(strings.Contains(errs[i].Error(), util.WriteConflictMarker), IsTrue) } func (s *testMockTiKVSuite) TestRC(c *C) { diff --git a/store/mockstore/mocktikv/mvcc.go b/store/mockstore/mocktikv/mvcc.go index 9dc992f1e0134..18d154e642a83 100644 --- a/store/mockstore/mocktikv/mvcc.go +++ b/store/mockstore/mocktikv/mvcc.go @@ -23,6 +23,7 @@ import ( "github.com/google/btree" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/codec" ) @@ -34,8 +35,6 @@ const ( typeRollback ) -const writeConflictMarker = "write conflict" - type mvccValue struct { valueType mvccValueType startTS uint64 @@ -201,7 +200,8 @@ func (l *mvccLock) lockErr(key []byte) error { func (l *mvccLock) check(ts uint64, key []byte) (uint64, error) { // ignore when ts is older than lock or lock's type is Lock. - if l.startTS > ts || l.op == kvrpcpb.Op_Lock { + // Pessimistic lock doesn't block read. + if l.startTS > ts || l.op == kvrpcpb.Op_Lock || l.op == kvrpcpb.Op_PessimisticLock { return ts, nil } // for point get latest version. @@ -257,7 +257,7 @@ func (e *mvccEntry) Get(ts uint64, isoLevel kvrpcpb.IsolationLevel) ([]byte, err func (e *mvccEntry) Prewrite(mutation *kvrpcpb.Mutation, startTS uint64, primary []byte, ttl uint64) error { if len(e.values) > 0 { if e.values[0].commitTS >= startTS { - return ErrRetryable(writeConflictMarker) + return ErrRetryable(util.WriteConflictMarker) } } if e.lock != nil { @@ -425,6 +425,7 @@ type MVCCStore interface { Scan(startKey, endKey []byte, limit int, startTS uint64, isoLevel kvrpcpb.IsolationLevel) []Pair ReverseScan(startKey, endKey []byte, limit int, startTS uint64, isoLevel kvrpcpb.IsolationLevel) []Pair BatchGet(ks [][]byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel) []Pair + PessimisticLock(mutations []*kvrpcpb.Mutation, primary []byte, startTS, forUpdateTS uint64, ttl uint64) []error Prewrite(mutations []*kvrpcpb.Mutation, primary []byte, startTS uint64, ttl uint64) []error Commit(keys [][]byte, startTS, commitTS uint64) error Rollback(keys [][]byte, startTS uint64) error diff --git a/store/mockstore/mocktikv/mvcc_leveldb.go b/store/mockstore/mocktikv/mvcc_leveldb.go index 8d33ffe3af3aa..5d6c79c10ff17 100644 --- a/store/mockstore/mocktikv/mvcc_leveldb.go +++ b/store/mockstore/mocktikv/mvcc_leveldb.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/goleveldb/leveldb/util" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/parser/terror" + tidbutil "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" @@ -498,6 +499,71 @@ func reverse(values []mvccValue) { } } +// PessimisticLock writes the pessimistic lock. +func (mvcc *MVCCLevelDB) PessimisticLock(mutations []*kvrpcpb.Mutation, primary []byte, startTS, forUpdateTS uint64, ttl uint64) []error { + mvcc.mu.Lock() + defer mvcc.mu.Unlock() + + anyError := false + batch := &leveldb.Batch{} + errs := make([]error, 0, len(mutations)) + for _, m := range mutations { + err := pessimisticLockMutation(mvcc.db, batch, m, startTS, forUpdateTS, primary, ttl) + errs = append(errs, err) + if err != nil { + anyError = true + } + } + if anyError { + return errs + } + if err := mvcc.db.Write(batch, nil); err != nil { + return nil + } + + return errs +} + +func pessimisticLockMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mutation, startTS, forUpdateTS uint64, primary []byte, ttl uint64) error { + startKey := mvccEncode(mutation.Key, lockVer) + iter := newIterator(db, &util.Range{ + Start: startKey, + }) + defer iter.Release() + + dec := lockDecoder{ + expectKey: mutation.Key, + } + ok, err := dec.Decode(iter) + if err != nil { + return errors.Trace(err) + } + if ok { + if dec.lock.startTS != startTS { + return dec.lock.lockErr(mutation.Key) + } + return nil + } + if err = checkConflictValue(iter, mutation.Key, forUpdateTS); err != nil { + return err + } + + lock := mvccLock{ + startTS: startTS, + primary: primary, + op: kvrpcpb.Op_PessimisticLock, + ttl: ttl, + } + writeKey := mvccEncode(mutation.Key, lockVer) + writeValue, err := lock.MarshalBinary() + if err != nil { + return errors.Trace(err) + } + + batch.Put(writeKey, writeValue) + return nil +} + // Prewrite implements the MVCCStore interface. func (mvcc *MVCCLevelDB) Prewrite(mutations []*kvrpcpb.Mutation, primary []byte, startTS uint64, ttl uint64) []error { mvcc.mu.Lock() @@ -541,6 +607,21 @@ func (mvcc *MVCCLevelDB) Prewrite(mutations []*kvrpcpb.Mutation, primary []byte, return errs } +func checkConflictValue(iter *Iterator, key []byte, startTS uint64) error { + dec := valueDecoder{ + expectKey: key, + } + ok, err := dec.Decode(iter) + if err != nil { + return errors.Trace(err) + } + // Note that it's a write conflict here, even if the value is a rollback one. + if ok && dec.value.commitTS >= startTS { + return ErrRetryable(tidbutil.WriteConflictMarker) + } + return nil +} + func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mutation, startTS uint64, primary []byte, ttl uint64) error { startKey := mvccEncode(mutation.Key, lockVer) iter := newIterator(db, &util.Range{ @@ -559,7 +640,15 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mu if dec.lock.startTS != startTS { return dec.lock.lockErr(mutation.Key) } - return nil + if dec.lock.op != kvrpcpb.Op_PessimisticLock { + return nil + } + // Overwrite the pessimistic lock. + } else { + err = checkConflictValue(iter, mutation.Key, startTS) + if err != nil { + return err + } } dec1 := valueDecoder{ @@ -571,7 +660,7 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mu } // Note that it's a write conflict here, even if the value is a rollback one. if ok && dec1.value.commitTS >= startTS { - return ErrRetryable(writeConflictMarker) + return ErrRetryable(tidbutil.WriteConflictMarker) } op := mutation.GetOp() diff --git a/store/mockstore/mocktikv/rpc.go b/store/mockstore/mocktikv/rpc.go index 20ef0e3cae011..edfc4fdfbc4fd 100644 --- a/store/mockstore/mocktikv/rpc.go +++ b/store/mockstore/mocktikv/rpc.go @@ -261,6 +261,18 @@ func (h *rpcHandler) handleKvPrewrite(req *kvrpcpb.PrewriteRequest) *kvrpcpb.Pre } } +func (h *rpcHandler) handleKvPessimisticLock(req *kvrpcpb.PessimisticLockRequest) *kvrpcpb.PessimisticLockResponse { + for _, m := range req.Mutations { + if !h.checkKeyInRegion(m.Key) { + panic("KvPrewrite: key not in region") + } + } + errs := h.mvccStore.PessimisticLock(req.Mutations, req.PrimaryLock, req.GetStartVersion(), req.GetForUpdateTs(), req.GetLockTtl()) + return &kvrpcpb.PessimisticLockResponse{ + Errors: convertToKeyErrors(errs), + } +} + func (h *rpcHandler) handleKvCommit(req *kvrpcpb.CommitRequest) *kvrpcpb.CommitResponse { for _, k := range req.Keys { if !h.checkKeyInRegion(k) { @@ -616,6 +628,13 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R return resp, nil } resp.Prewrite = handler.handleKvPrewrite(r) + case tikvrpc.CmdPessimisticLock: + r := req.PessimisticLock + if err := handler.checkRequest(reqCtx, r.Size()); err != nil { + resp.PessimisticLock = &kvrpcpb.PessimisticLockResponse{RegionError: err} + return resp, nil + } + resp.PessimisticLock = handler.handleKvPessimisticLock(r) case tikvrpc.CmdCommit: failpoint.Inject("rpcCommitResult", func(val failpoint.Value) { switch val.(string) { diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index f4029af0dccff..057b38db312c4 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -16,6 +16,7 @@ package tikv import ( "bytes" "context" + "fmt" "math" "sync" "sync/atomic" @@ -40,9 +41,10 @@ import ( type twoPhaseCommitAction int const ( - actionPrewrite twoPhaseCommitAction = 1 - actionCommit twoPhaseCommitAction = 2 - actionCleanup twoPhaseCommitAction = 3 + actionPrewrite twoPhaseCommitAction = 1 + iota + actionCommit + actionCleanup + actionPessimisticLock ) var ( @@ -58,6 +60,8 @@ func (ca twoPhaseCommitAction) String() string { return "commit" case actionCleanup: return "cleanup" + case actionPessimisticLock: + return "pessimistic_lock" } return "unknown" } @@ -90,15 +94,29 @@ type twoPhaseCommitter struct { // should be less than GC life time. maxTxnTimeUse uint64 detail *execdetails.CommitDetails + // For pessimistic transaction + primaryKey []byte + forUpdateTS uint64 + isFirstLock bool } type mutationEx struct { pb.Mutation - asserted bool + asserted bool + isPessimisticLock bool } // newTwoPhaseCommitter creates a twoPhaseCommitter. func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, error) { + return &twoPhaseCommitter{ + store: txn.store, + txn: txn, + startTS: txn.StartTS(), + connID: connID, + }, nil +} + +func (c *twoPhaseCommitter) initKeysAndMutations() error { var ( keys [][]byte size int @@ -107,6 +125,18 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro lockCnt int ) mutations := make(map[string]*mutationEx) + txn := c.txn + isPessimistic := txn.IsPessimistic() + if isPessimistic && len(c.primaryKey) > 0 { + keys = append(keys, c.primaryKey) + mutations[string(c.primaryKey)] = &mutationEx{ + Mutation: pb.Mutation{ + Op: pb.Op_Lock, + Key: c.primaryKey, + }, + isPessimisticLock: true, + } + } err := txn.us.WalkBuffer(func(k kv.Key, v []byte) error { if len(v) > 0 { op := pb.Op_Put @@ -130,7 +160,13 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro } delCnt++ } - keys = append(keys, k) + if isPessimistic { + if !bytes.Equal(k, c.primaryKey) { + keys = append(keys, k) + } + } else { + keys = append(keys, k) + } entrySize := len(k) + len(v) if entrySize > kv.TxnEntrySizeLimit { return kv.ErrEntryTooLarge.GenWithStackByArgs(kv.TxnEntrySizeLimit, entrySize) @@ -139,30 +175,33 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro return nil }) if err != nil { - return nil, errors.Trace(err) + return errors.Trace(err) } for _, lockKey := range txn.lockKeys { - if _, ok := mutations[string(lockKey)]; !ok { + muEx, ok := mutations[string(lockKey)] + if !ok { mutations[string(lockKey)] = &mutationEx{ Mutation: pb.Mutation{ Op: pb.Op_Lock, Key: lockKey, }, + isPessimisticLock: isPessimistic, } lockCnt++ keys = append(keys, lockKey) size += len(lockKey) + } else { + muEx.isPessimisticLock = isPessimistic } } if len(keys) == 0 { - return nil, nil + return nil } for _, pair := range txn.assertions { mutation, ok := mutations[string(pair.key)] if !ok { - logutil.Logger(context.Background()).Error("ASSERTION FAIL!!! assertion exists but no mutation?", - zap.Stringer("assertion", pair)) + // It's possible when a transaction inserted a key then deleted it later. continue } // Only apply the first assertion! @@ -182,14 +221,14 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro entrylimit := atomic.LoadUint64(&kv.TxnEntryCountLimit) if len(keys) > int(entrylimit) || size > kv.TxnTotalSizeLimit { - return nil, kv.ErrTxnTooLarge + return kv.ErrTxnTooLarge } const logEntryCount = 10000 const logSize = 4 * 1024 * 1024 // 4MB if len(keys) > logEntryCount || size > logSize { tableID := tablecodec.DecodeTableID(keys[0]) logutil.Logger(context.Background()).Info("[BIG_TXN]", - zap.Uint64("con", connID), + zap.Uint64("con", c.connID), zap.Int64("table ID", tableID), zap.Int("size", size), zap.Int("keys", len(keys)), @@ -206,31 +245,29 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro if txn.StartTS() == math.MaxUint64 { err = errors.Errorf("try to commit with invalid txnStartTS: %d", txn.StartTS()) logutil.Logger(context.Background()).Error("commit failed", - zap.Uint64("conn", connID), + zap.Uint64("conn", c.connID), zap.Error(err)) - return nil, errors.Trace(err) + return errors.Trace(err) } commitDetail := &execdetails.CommitDetails{WriteSize: size, WriteKeys: len(keys)} metrics.TiKVTxnWriteKVCountHistogram.Observe(float64(commitDetail.WriteKeys)) metrics.TiKVTxnWriteSizeHistogram.Observe(float64(commitDetail.WriteSize)) - return &twoPhaseCommitter{ - store: txn.store, - txn: txn, - startTS: txn.StartTS(), - keys: keys, - mutations: mutations, - lockTTL: txnLockTTL(txn.startTime, size), - priority: getTxnPriority(txn), - syncLog: getTxnSyncLog(txn), - connID: connID, - maxTxnTimeUse: maxTxnTimeUse, - detail: commitDetail, - }, nil + c.maxTxnTimeUse = maxTxnTimeUse + c.keys = keys + c.mutations = mutations + c.lockTTL = txnLockTTL(txn.startTime, size) + c.priority = getTxnPriority(txn) + c.syncLog = getTxnSyncLog(txn) + c.detail = commitDetail + return nil } func (c *twoPhaseCommitter) primary() []byte { - return c.keys[0] + if len(c.primaryKey) == 0 { + return c.keys[0] + } + return c.primaryKey } const bytesPerMiB = 1024 * 1024 @@ -330,6 +367,8 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm singleBatchActionFunc = c.commitSingleBatch case actionCleanup: singleBatchActionFunc = c.cleanupSingleBatch + case actionPessimisticLock: + singleBatchActionFunc = c.pessimisticLockSingleBatch } if len(batches) == 1 { e := singleBatchActionFunc(bo, batches[0]) @@ -412,18 +451,26 @@ func (c *twoPhaseCommitter) keySize(key []byte) int { func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys) error { mutations := make([]*pb.Mutation, len(batch.keys)) + var isPessimisticLock []bool for i, k := range batch.keys { tmp := c.mutations[string(k)] mutations[i] = &tmp.Mutation + if tmp.isPessimisticLock { + if len(isPessimisticLock) == 0 { + isPessimisticLock = make([]bool, len(mutations)) + } + isPessimisticLock[i] = true + } } req := &tikvrpc.Request{ Type: tikvrpc.CmdPrewrite, Prewrite: &pb.PrewriteRequest{ - Mutations: mutations, - PrimaryLock: c.primary(), - StartVersion: c.startTS, - LockTtl: c.lockTTL, + Mutations: mutations, + PrimaryLock: c.primary(), + StartVersion: c.startTS, + LockTtl: c.lockTTL, + IsPessimisticLock: isPessimisticLock, }, Context: pb.Context{ Priority: c.priority, @@ -495,6 +542,92 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys) } } +func (c *twoPhaseCommitter) pessimisticLockSingleBatch(bo *Backoffer, batch batchKeys) error { + mutations := make([]*pb.Mutation, len(batch.keys)) + for i, k := range batch.keys { + mut := &pb.Mutation{ + Op: pb.Op_PessimisticLock, + Key: k, + } + conditionPair := c.txn.us.LookupConditionPair(k) + if conditionPair != nil && conditionPair.ShouldNotExist() { + mut.Assertion = pb.Assertion_NotExist + } + mutations[i] = mut + } + + req := &tikvrpc.Request{ + Type: tikvrpc.CmdPessimisticLock, + PessimisticLock: &pb.PessimisticLockRequest{ + Mutations: mutations, + PrimaryLock: c.primary(), + StartVersion: c.startTS, + ForUpdateTs: c.forUpdateTS, + LockTtl: config.GetGlobalConfig().PessimisticTxn.TTL, + IsFirstLock: c.isFirstLock, + }, + Context: pb.Context{ + Priority: c.priority, + SyncLog: c.syncLog, + }, + } + for { + resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort) + if err != nil { + return errors.Trace(err) + } + regionErr, err := resp.GetRegionError() + if err != nil { + return errors.Trace(err) + } + if regionErr != nil { + err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) + if err != nil { + return errors.Trace(err) + } + err = c.pessimisticLockKeys(bo, batch.keys) + return errors.Trace(err) + } + lockResp := resp.PessimisticLock + if lockResp == nil { + return errors.Trace(ErrBodyMissing) + } + keyErrs := lockResp.GetErrors() + if len(keyErrs) == 0 { + return nil + } + var locks []*Lock + for _, keyErr := range keyErrs { + // Check already exists error + if alreadyExist := keyErr.GetAlreadyExist(); alreadyExist != nil { + key := alreadyExist.GetKey() + conditionPair := c.txn.us.LookupConditionPair(key) + if conditionPair == nil { + panic(fmt.Sprintf("con:%d, conditionPair for key:%s should not be nil", c.connID, key)) + } + return errors.Trace(conditionPair.Err()) + } + + // Extract lock from key error + lock, err1 := extractLockFromKeyErr(keyErr) + if err1 != nil { + return errors.Trace(err1) + } + locks = append(locks, lock) + } + ok, err := c.store.lockResolver.ResolveLocks(bo, locks) + if err != nil { + return errors.Trace(err) + } + if !ok { + err = bo.Backoff(BoTxnLock, errors.Errorf("2PC prewrite lockedKeys: %d", len(locks))) + if err != nil { + return errors.Trace(err) + } + } + } +} + func getTxnPriority(txn *tikvTxn) pb.CommandPri { if pri := txn.us.GetOption(kv.Priority); pri != nil { return kvPriorityToCommandPri(pri.(int)) @@ -661,6 +794,10 @@ func (c *twoPhaseCommitter) cleanupKeys(bo *Backoffer, keys [][]byte) error { return c.doActionOnKeys(bo, actionCleanup, keys) } +func (c *twoPhaseCommitter) pessimisticLockKeys(bo *Backoffer, keys [][]byte) error { + return c.doActionOnKeys(bo, actionPessimisticLock, keys) +} + func (c *twoPhaseCommitter) executeAndWriteFinishBinlog(ctx context.Context) error { err := c.execute(ctx) if err != nil { diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index 24a0e788ed8a5..820f78cd59e3f 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -133,7 +133,7 @@ func (s *testCommitterSuite) TestPrewriteRollback(c *C) { c.Assert(err, IsNil) err = txn1.Set([]byte("b"), []byte("b1")) c.Assert(err, IsNil) - committer, err := newTwoPhaseCommitter(txn1, 0) + committer, err := newTwoPhaseCommitterWithInit(txn1, 0) c.Assert(err, IsNil) err = committer.prewriteKeys(NewBackoffer(ctx, prewriteMaxBackoff), committer.keys) c.Assert(err, IsNil) @@ -151,7 +151,7 @@ func (s *testCommitterSuite) TestPrewriteRollback(c *C) { c.Assert(err, IsNil) err = txn1.Set([]byte("b"), []byte("b1")) c.Assert(err, IsNil) - committer, err = newTwoPhaseCommitter(txn1, 0) + committer, err = newTwoPhaseCommitterWithInit(txn1, 0) c.Assert(err, IsNil) err = committer.prewriteKeys(NewBackoffer(ctx, prewriteMaxBackoff), committer.keys) c.Assert(err, IsNil) @@ -173,7 +173,7 @@ func (s *testCommitterSuite) TestContextCancel(c *C) { c.Assert(err, IsNil) err = txn1.Set([]byte("b"), []byte("b1")) c.Assert(err, IsNil) - committer, err := newTwoPhaseCommitter(txn1, 0) + committer, err := newTwoPhaseCommitterWithInit(txn1, 0) c.Assert(err, IsNil) bo := NewBackoffer(context.Background(), prewriteMaxBackoff) @@ -203,7 +203,7 @@ func (s *testCommitterSuite) TestContextCancelRetryable(c *C) { // txn1 locks "b" err := txn1.Set([]byte("b"), []byte("b1")) c.Assert(err, IsNil) - committer, err := newTwoPhaseCommitter(txn1, 0) + committer, err := newTwoPhaseCommitterWithInit(txn1, 0) c.Assert(err, IsNil) err = committer.prewriteKeys(NewBackoffer(context.Background(), prewriteMaxBackoff), committer.keys) c.Assert(err, IsNil) @@ -326,11 +326,22 @@ func errMsgMustContain(c *C, err error, msg string) { c.Assert(strings.Contains(err.Error(), msg), IsTrue) } +func newTwoPhaseCommitterWithInit(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, error) { + c, err := newTwoPhaseCommitter(txn, connID) + if err != nil { + return nil, errors.Trace(err) + } + if err = c.initKeysAndMutations(); err != nil { + return nil, errors.Trace(err) + } + return c, nil +} + func (s *testCommitterSuite) TestCommitBeforePrewrite(c *C) { txn := s.begin(c) err := txn.Set([]byte("a"), []byte("a1")) c.Assert(err, IsNil) - commiter, err := newTwoPhaseCommitter(txn, 0) + commiter, err := newTwoPhaseCommitterWithInit(txn, 0) c.Assert(err, IsNil) ctx := context.Background() err = commiter.cleanupKeys(NewBackoffer(ctx, cleanupMaxBackoff), commiter.keys) @@ -375,7 +386,7 @@ func (s *testCommitterSuite) TestPrewritePrimaryKeyFailed(c *C) { // clean again, shouldn't be failed when a rollback already exist. ctx := context.Background() - commiter, err := newTwoPhaseCommitter(txn2, 0) + commiter, err := newTwoPhaseCommitterWithInit(txn2, 0) c.Assert(err, IsNil) err = commiter.cleanupKeys(NewBackoffer(ctx, cleanupMaxBackoff), commiter.keys) c.Assert(err, IsNil) @@ -410,13 +421,13 @@ func (s *testCommitterSuite) TestWrittenKeysOnConflict(c *C) { txn1 := s.begin(c) txn2 := s.begin(c) txn2.Set([]byte("x1"), []byte("1")) - commiter2, err := newTwoPhaseCommitter(txn2, 2) + commiter2, err := newTwoPhaseCommitterWithInit(txn2, 2) c.Assert(err, IsNil) err = commiter2.execute(context.Background()) c.Assert(err, IsNil) txn1.Set([]byte("x1"), []byte("1")) txn1.Set([]byte("y1"), []byte("2")) - commiter1, err := newTwoPhaseCommitter(txn1, 2) + commiter1, err := newTwoPhaseCommitterWithInit(txn1, 2) c.Assert(err, IsNil) err = commiter1.execute(context.Background()) c.Assert(err, NotNil) diff --git a/store/tikv/lock_test.go b/store/tikv/lock_test.go index 24fa1c68fa391..d507ecaa800da 100644 --- a/store/tikv/lock_test.go +++ b/store/tikv/lock_test.go @@ -14,6 +14,7 @@ package tikv import ( + "bytes" "context" "math" "runtime" @@ -55,9 +56,13 @@ func (s *testLockSuite) lockKey(c *C, key, value, primaryKey, primaryValue []byt err = txn.Delete(primaryKey) } c.Assert(err, IsNil) - tpc, err := newTwoPhaseCommitter(txn, 0) + tpc, err := newTwoPhaseCommitterWithInit(txn, 0) c.Assert(err, IsNil) - tpc.keys = [][]byte{primaryKey, key} + if bytes.Equal(key, primaryKey) { + tpc.keys = [][]byte{primaryKey} + } else { + tpc.keys = [][]byte{primaryKey, key} + } ctx := context.Background() err = tpc.prewriteKeys(NewBackoffer(ctx, prewriteMaxBackoff), tpc.keys) @@ -198,7 +203,7 @@ func (s *testLockSuite) TestGetTxnStatus(c *C) { } func (s *testLockSuite) prewriteTxn(c *C, txn *tikvTxn) { - committer, err := newTwoPhaseCommitter(txn, 0) + committer, err := newTwoPhaseCommitterWithInit(txn, 0) c.Assert(err, IsNil) err = committer.prewriteKeys(NewBackoffer(context.Background(), prewriteMaxBackoff), committer.keys) c.Assert(err, IsNil) diff --git a/store/tikv/region_request_test.go b/store/tikv/region_request_test.go index 115d19c41bec7..220a100b96b34 100644 --- a/store/tikv/region_request_test.go +++ b/store/tikv/region_request_test.go @@ -205,6 +205,9 @@ func (s *mockTikvGrpcServer) KvScanLock(context.Context, *kvrpcpb.ScanLockReques func (s *mockTikvGrpcServer) KvResolveLock(context.Context, *kvrpcpb.ResolveLockRequest) (*kvrpcpb.ResolveLockResponse, error) { return nil, errors.New("unreachable") } +func (s *mockTikvGrpcServer) KvPessimisticLock(context.Context, *kvrpcpb.PessimisticLockRequest) (*kvrpcpb.PessimisticLockResponse, error) { + return nil, errors.New("unreachable") +} func (s *mockTikvGrpcServer) KvGC(context.Context, *kvrpcpb.GCRequest) (*kvrpcpb.GCResponse, error) { return nil, errors.New("unreachable") } @@ -271,6 +274,10 @@ func (s *mockTikvGrpcServer) BatchCommands(tikvpb.Tikv_BatchCommandsServer) erro return errors.New("unreachable") } +func (s *mockTikvGrpcServer) ReadIndex(context.Context, *kvrpcpb.ReadIndexRequest) (*kvrpcpb.ReadIndexResponse, error) { + return nil, errors.New("unreachable") +} + func (s *testRegionRequestSuite) TestNoReloadRegionForGrpcWhenCtxCanceled(c *C) { // prepare a mock tikv grpc server addr := "localhost:56341" diff --git a/store/tikv/ticlient_test.go b/store/tikv/ticlient_test.go index 2fdf6a2bb93f8..19f6f6ded07f7 100644 --- a/store/tikv/ticlient_test.go +++ b/store/tikv/ticlient_test.go @@ -119,7 +119,7 @@ func (s *testTiclientSuite) TestSingleKey(c *C) { txn := s.beginTxn(c) err := txn.Set(encodeKey(s.prefix, "key"), []byte("value")) c.Assert(err, IsNil) - err = txn.LockKeys(encodeKey(s.prefix, "key")) + err = txn.LockKeys(context.Background(), 0, encodeKey(s.prefix, "key")) c.Assert(err, IsNil) err = txn.Commit(context.Background()) c.Assert(err, IsNil) diff --git a/store/tikv/tikvrpc/tikvrpc.go b/store/tikv/tikvrpc/tikvrpc.go index 4d22ade260974..e62a153410a61 100644 --- a/store/tikv/tikvrpc/tikvrpc.go +++ b/store/tikv/tikvrpc/tikvrpc.go @@ -44,6 +44,7 @@ const ( CmdResolveLock CmdGC CmdDeleteRange + CmdPessimisticLock CmdRawGet CmdType = 256 + iota CmdRawBatchGet @@ -74,6 +75,8 @@ func (t CmdType) String() string { return "Scan" case CmdPrewrite: return "Prewrite" + case CmdPessimisticLock: + return "PessimisticLock" case CmdCommit: return "Commit" case CmdCleanup: @@ -131,6 +134,7 @@ type Request struct { Get *kvrpcpb.GetRequest Scan *kvrpcpb.ScanRequest Prewrite *kvrpcpb.PrewriteRequest + PessimisticLock *kvrpcpb.PessimisticLockRequest Commit *kvrpcpb.CommitRequest Cleanup *kvrpcpb.CleanupRequest BatchGet *kvrpcpb.BatchGetRequest @@ -199,6 +203,8 @@ func (req *Request) ToBatchCommandsRequest() *tikvpb.BatchCommandsRequest_Reques return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_RawScan{RawScan: req.RawScan}} case CmdCop: return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_Coprocessor{Coprocessor: req.Cop}} + case CmdPessimisticLock: + return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_PessimisticLock{PessimisticLock: req.PessimisticLock}} } return nil } @@ -218,6 +224,7 @@ type Response struct { Get *kvrpcpb.GetResponse Scan *kvrpcpb.ScanResponse Prewrite *kvrpcpb.PrewriteResponse + PessimisticLock *kvrpcpb.PessimisticLockResponse Commit *kvrpcpb.CommitResponse Cleanup *kvrpcpb.CleanupResponse BatchGet *kvrpcpb.BatchGetResponse @@ -287,6 +294,8 @@ func FromBatchCommandsResponse(res *tikvpb.BatchCommandsResponse_Response) *Resp return &Response{Type: CmdRawScan, RawScan: res.RawScan} case *tikvpb.BatchCommandsResponse_Response_Coprocessor: return &Response{Type: CmdCop, Cop: res.Coprocessor} + case *tikvpb.BatchCommandsResponse_Response_PessimisticLock: + return &Response{Type: CmdPessimisticLock, PessimisticLock: res.PessimisticLock} } return nil } @@ -315,6 +324,8 @@ func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error { req.Scan.Context = ctx case CmdPrewrite: req.Prewrite.Context = ctx + case CmdPessimisticLock: + req.PessimisticLock.Context = ctx case CmdCommit: req.Commit.Context = ctx case CmdCleanup: @@ -383,6 +394,10 @@ func GenRegionErrorResp(req *Request, e *errorpb.Error) (*Response, error) { resp.Prewrite = &kvrpcpb.PrewriteResponse{ RegionError: e, } + case CmdPessimisticLock: + resp.PessimisticLock = &kvrpcpb.PessimisticLockResponse{ + RegionError: e, + } case CmdCommit: resp.Commit = &kvrpcpb.CommitResponse{ RegionError: e, @@ -487,6 +502,8 @@ func (resp *Response) GetRegionError() (*errorpb.Error, error) { e = resp.Get.GetRegionError() case CmdScan: e = resp.Scan.GetRegionError() + case CmdPessimisticLock: + e = resp.PessimisticLock.GetRegionError() case CmdPrewrite: e = resp.Prewrite.GetRegionError() case CmdCommit: @@ -553,6 +570,8 @@ func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Resp resp.Scan, err = client.KvScan(ctx, req.Scan) case CmdPrewrite: resp.Prewrite, err = client.KvPrewrite(ctx, req.Prewrite) + case CmdPessimisticLock: + resp.PessimisticLock, err = client.KvPessimisticLock(ctx, req.PessimisticLock) case CmdCommit: resp.Commit, err = client.KvCommit(ctx, req.Commit) case CmdCleanup: diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 026e3535cce04..27c931602234a 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -62,6 +62,7 @@ type tikvTxn struct { dirty bool setCnt int64 vars *kv.Variables + committer *twoPhaseCommitter // For data consistency check. assertions []assertionPair @@ -229,6 +230,10 @@ func (txn *tikvTxn) DelOption(opt kv.Option) { txn.us.DelOption(opt) } +func (txn *tikvTxn) IsPessimistic() bool { + return txn.us.GetOption(kv.Pessimistic) != nil +} + func (txn *tikvTxn) Commit(ctx context.Context) error { if !txn.valid { return kv.ErrInvalidTxn @@ -253,10 +258,23 @@ func (txn *tikvTxn) Commit(ctx context.Context) error { if val != nil { connID = val.(uint64) } - committer, err := newTwoPhaseCommitter(txn, connID) - if err != nil || committer == nil { + + var err error + // If the txn use pessimistic lock, committer is initialized. + committer := txn.committer + if committer == nil { + committer, err = newTwoPhaseCommitter(txn, connID) + if err != nil { + return errors.Trace(err) + } + } + if err := committer.initKeysAndMutations(); err != nil { return errors.Trace(err) } + if len(committer.keys) == 0 { + return nil + } + defer func() { ctxValue := ctx.Value(execdetails.CommitDetailCtxKey) if ctxValue != nil { @@ -269,7 +287,8 @@ func (txn *tikvTxn) Commit(ctx context.Context) error { } }() // latches disabled - if txn.store.txnLatches == nil { + // pessimistic transaction should also bypass latch. + if txn.store.txnLatches == nil || txn.IsPessimistic() { err = committer.executeAndWriteFinishBinlog(ctx) logutil.Logger(ctx).Debug("[kv] txnLatches disabled, 2pc directly", zap.Error(err)) return errors.Trace(err) @@ -301,6 +320,14 @@ func (txn *tikvTxn) close() { } func (txn *tikvTxn) Rollback() error { + // Clean up pessimistic lock. + if txn.IsPessimistic() && txn.committer != nil { + err := txn.rollbackPessimisticLock() + if err != nil { + logutil.Logger(context.Background()).Error(err.Error()) + } + } + if !txn.valid { return kv.ErrInvalidTxn } @@ -311,8 +338,53 @@ func (txn *tikvTxn) Rollback() error { return nil } -func (txn *tikvTxn) LockKeys(keys ...kv.Key) error { +func (txn *tikvTxn) rollbackPessimisticLock() error { + c := txn.committer + if err := c.initKeysAndMutations(); err != nil { + return errors.Trace(err) + } + if len(c.keys) == 0 { + return nil + } + + return c.cleanupKeys(NewBackoffer(context.Background(), cleanupMaxBackoff), c.keys) +} + +func (txn *tikvTxn) LockKeys(ctx context.Context, forUpdateTS uint64, keys ...kv.Key) error { + if len(keys) == 0 { + return nil + } tikvTxnCmdHistogramWithLockKeys.Inc() + if txn.IsPessimistic() && forUpdateTS > 0 { + if txn.committer == nil { + // connID is used for log. + var connID uint64 + var err error + val := ctx.Value(sessionctx.ConnID) + if val != nil { + connID = val.(uint64) + } + txn.committer, err = newTwoPhaseCommitter(txn, connID) + if err != nil { + return err + } + txn.committer.primaryKey = keys[0] + } + + bo := NewBackoffer(ctx, prewriteMaxBackoff).WithVars(txn.vars) + keys1 := make([][]byte, len(keys)) + for i, key := range keys { + keys1[i] = key + } + txn.committer.forUpdateTS = forUpdateTS + // If the number of keys1 greater than 1, it can be on different region, + // concurrently execute on multiple regions may lead to deadlock. + txn.committer.isFirstLock = len(txn.lockKeys) == 0 && len(keys1) == 1 + err := txn.committer.pessimisticLockKeys(bo, keys1) + if err != nil { + return err + } + } txn.mu.Lock() for _, key := range keys { txn.lockKeys = append(txn.lockKeys, key) diff --git a/util/misc.go b/util/misc.go index 15ecad2b7e296..43d796b91c823 100644 --- a/util/misc.go +++ b/util/misc.go @@ -33,6 +33,8 @@ const ( RetryInterval uint64 = 500 // GCTimeFormat is the format that gc_worker used to store times. GCTimeFormat = "20060102-15:04:05 -0700" + // WriteConflictMarker is used when transaction writing is conflicted. + WriteConflictMarker = "write conflict" ) // RunWithRetry will run the f with backoff and retry.