Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support pessimistic transaction (experimental feature) #10297

Merged
merged 13 commits into from
May 11, 2019
19 changes: 19 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type Config struct {
Binlog Binlog `toml:"binlog" json:"binlog"`
CompatibleKillQuery bool `toml:"compatible-kill-query" json:"compatible-kill-query"`
Plugin Plugin `toml:"plugin" json:"plugin"`
PessimisticTxn PessimisticTxn `toml:"pessimistic-txn" json:"pessimistic_txn"`
CheckMb4ValueInUTF8 bool `toml:"check-mb4-value-in-utf8" json:"check-mb4-value-in-utf8"`
// TreatOldVersionUTF8AsUTF8MB4 is use to treat old version table/column UTF8 charset as UTF8MB4. This is for compatibility.
// Currently not support dynamic modify, because this need to reload all old version schema.
Expand Down Expand Up @@ -285,6 +286,18 @@ type Plugin struct {
Load string `toml:"load" json:"load"`
}

// PessimisticTxn is the config for pessimistic transaction.
type PessimisticTxn struct {
// Enable must be true for 'begin lock' or session variable to start a pessimistic transaction.
Enable bool `toml:"enable" json:"enable"`
// Starts a pessimistic transaction by default when Enable is true.
Default bool `toml:"default" json:"default"`
// The max count of retry for a single statement in a pessimistic transaction.
MaxRetryCount uint `toml:"max-retry-count" json:"max-retry-count"`
// The pessimistic lock ttl in milliseconds.
TTL uint64 `toml:"ttl" json:"ttl"`
}

var defaultConf = Config{
Host: "0.0.0.0",
AdvertiseAddress: "",
Expand Down Expand Up @@ -368,6 +381,12 @@ var defaultConf = Config{
WriteTimeout: "15s",
Strategy: "range",
},
PessimisticTxn: PessimisticTxn{
Enable: false,
Default: false,
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
MaxRetryCount: 256,
TTL: 60 * 1000,
},
}

var globalConf = defaultConf
Expand Down
15 changes: 14 additions & 1 deletion config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -282,4 +282,17 @@ ignore-error = false
binlog-socket = ""

# the strategy for sending binlog to pump, value can be "range" or "hash" now.
strategy = "range"
strategy = "range"

[pessimistic-txn]
# enable pessimistic transaction.
enable = false

# start pessimistic transaction by default.
default = false

# max retry count for a statement in a pessimistic transaction.
max-retry-count = 256

# default TTL in milliseconds for pessimistic lock.
ttl = 60000
2 changes: 1 addition & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ func (w *addIndexWorker) backfillIndexInTxn(handleRange reorgIndexTask) (taskCtx

// Lock the row key to notify us that someone delete or update the row,
// then we should not backfill the index of it, otherwise the adding index is redundant.
err := txn.LockKeys(idxRecord.key)
err := txn.LockKeys(context.Background(), 0, idxRecord.key)
if err != nil {
return errors.Trace(err)
}
Expand Down
208 changes: 201 additions & 7 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"math"
"strconv"
"strings"
"sync/atomic"
"time"
Expand All @@ -39,6 +40,7 @@ import (
"github.com/pingcap/tidb/plugin"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
tidbutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
Expand Down Expand Up @@ -151,8 +153,9 @@ type ExecStmt struct {

Ctx sessionctx.Context
// StartTime stands for the starting time when executing the statement.
StartTime time.Time
isPreparedStmt bool
StartTime time.Time
isPreparedStmt bool
isSelectForUpdate bool
}

// OriginText returns original statement as a string.
Expand Down Expand Up @@ -246,8 +249,18 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) {
a.Ctx.GetSessionVars().StmtCtx.StmtType = GetStmtLabel(a.StmtNode)
}

isPessimistic := sctx.GetSessionVars().TxnCtx.IsPessimistic

// Special handle for "select for update statement" in pessimistic transaction.
if isPessimistic && a.isSelectForUpdate {
return a.handlePessimisticSelectForUpdate(ctx, sctx, e)
}

// If the executor doesn't return any result to the client, we execute it without delay.
if e.Schema().Len() == 0 {
if isPessimistic {
return nil, a.handlePessimisticDML(ctx, sctx, e)
}
return a.handleNoDelayExecutor(ctx, sctx, e)
} else if proj, ok := e.(*ProjectionExec); ok && proj.calculateNoDelay {
// Currently this is only for the "DO" statement. Take "DO 1, @a=2;" as an example:
Expand All @@ -257,9 +270,9 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) {
}

var txnStartTS uint64
txn, err1 := sctx.Txn(false)
if err1 != nil {
return nil, err
txn, err := sctx.Txn(false)
if err != nil {
return nil, errors.Trace(err)
lysu marked this conversation as resolved.
Show resolved Hide resolved
}
if txn.Valid() {
txnStartTS = txn.StartTS()
Expand All @@ -271,6 +284,102 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) {
}, nil
}

type chunkRowRecordSet struct {
rows []chunk.Row
idx int
fields []*ast.ResultField
e Executor
}

func (c *chunkRowRecordSet) Fields() []*ast.ResultField {
return c.fields
}

func (c *chunkRowRecordSet) Next(ctx context.Context, req *chunk.RecordBatch) error {
chk := req.Chunk
chk.Reset()
for !chk.IsFull() && c.idx < len(c.rows) {
chk.AppendRow(c.rows[c.idx])
c.idx++
}
return nil
}

func (c *chunkRowRecordSet) NewRecordBatch() *chunk.RecordBatch {
return chunk.NewRecordBatch(c.e.newFirstChunk())
}

func (c *chunkRowRecordSet) Close() error {
return nil
}

func (a *ExecStmt) handlePessimisticSelectForUpdate(ctx context.Context, sctx sessionctx.Context, e Executor) (sqlexec.RecordSet, error) {
txnCtx := sctx.GetSessionVars().TxnCtx
retryCnt := uint(0)
for {
rs, err := a.runPessimisticSelectForUpdate(ctx, sctx, e)
if err == nil {
return rs, nil
}
// Retry this "select for update" statement using a new startTS.
if !strings.Contains(err.Error(), tidbutil.WriteConflictMarker) {
jackysp marked this conversation as resolved.
Show resolved Hide resolved
return nil, err
}
if retryCnt >= config.GetGlobalConfig().PessimisticTxn.MaxRetryCount {
return nil, errors.New("pessimistic max retry count reached")
}
retryCnt++
conflictTS := extractConflictTS(err.Error())
if conflictTS > txnCtx.GetForUpdateTS() {
txnCtx.SetForUpdateTS(conflictTS)
} else {
ts, err1 := sctx.GetStore().GetOracle().GetTimestamp(ctx)
jackysp marked this conversation as resolved.
Show resolved Hide resolved
if err1 != nil {
return nil, err1
}
txnCtx.SetForUpdateTS(ts)
}
e, err = a.buildExecutor(sctx)
if err != nil {
return nil, errors.Trace(err)
}
if err = e.Open(ctx); err != nil {
return nil, errors.Trace(err)
}
}
}

func (a *ExecStmt) runPessimisticSelectForUpdate(ctx context.Context, sctx sessionctx.Context, e Executor) (sqlexec.RecordSet, error) {
rs := &recordSet{
executor: e,
stmt: a,
}
defer func() {
terror.Log(rs.Close())
}()

var rows []chunk.Row
var err error
fields := rs.Fields()
req := rs.NewRecordBatch()
for {
err = rs.Next(ctx, req)
if err != nil {
// Handle 'write conflict' error.
break
}
if req.NumRows() == 0 {
return &chunkRowRecordSet{rows: rows, fields: fields, e: e}, nil
lysu marked this conversation as resolved.
Show resolved Hide resolved
}
iter := chunk.NewIterator4Chunk(req.Chunk)
for r := iter.Begin(); r != iter.End(); r = iter.Next() {
rows = append(rows, r)
}
req.Chunk = chunk.Renew(req.Chunk, sctx.GetSessionVars().MaxChunkSize)
}
return nil, err
}

func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Context, e Executor) (sqlexec.RecordSet, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("executor.handleNoDelayExecutor", opentracing.ChildOf(span.Context()))
Expand Down Expand Up @@ -307,8 +416,92 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Co
if err != nil {
return nil, err
}
return nil, err
}

func (a *ExecStmt) handlePessimisticDML(ctx context.Context, sctx sessionctx.Context, e Executor) error {
jackysp marked this conversation as resolved.
Show resolved Hide resolved
txn, err := sctx.Txn(true)
if err != nil {
return err
}
txnCtx := sctx.GetSessionVars().TxnCtx
retryCnt := uint(0)
for {
_, err = a.handleNoDelayExecutor(ctx, sctx, e)
if err != nil {
return err
}
p := txn.(pessimisticTxn)
keys, err1 := p.KeysNeedToLock()
if err1 != nil {
return err1
}
if len(keys) == 0 {
return nil
}
forUpdateTS := txnCtx.GetForUpdateTS()
err = txn.LockKeys(ctx, forUpdateTS, keys...)
if err == nil || !strings.Contains(err.Error(), tidbutil.WriteConflictMarker) {
return err
}
if retryCnt >= config.GetGlobalConfig().PessimisticTxn.MaxRetryCount {
return errors.New("pessimistic lock retry limit reached")
}
retryCnt++
conflictTS := extractConflictTS(err.Error())
if conflictTS == 0 {
logutil.Logger(ctx).Warn("failed to extract conflictTS from a conflict error")
}
logutil.Logger(ctx).Info("pessimistic write conflict, retry statement",
zap.Uint64("txn", txn.StartTS()),
zap.Uint64("forUpdateTS", forUpdateTS),
zap.Uint64("conflictTS", conflictTS))
if conflictTS > forUpdateTS {
txnCtx.SetForUpdateTS(conflictTS)
} else {
ts, err1 := sctx.GetStore().GetOracle().GetTimestamp(ctx)
if err1 != nil {
return err1
}
txnCtx.SetForUpdateTS(ts)
}
e, err = a.buildExecutor(sctx)
if err != nil {
return err
}

// Rollback the statement change before retry it.
sctx.StmtRollback()
sctx.GetSessionVars().StmtCtx.ResetForRetry()

if err = e.Open(ctx); err != nil {
return err
}
}
}

func extractConflictTS(errStr string) uint64 {
strs := strings.Split(errStr, "conflictTS=")
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
if len(strs) != 2 {
return 0
}
tsPart := strs[1]
length := strings.IndexByte(tsPart, ',')
if length < 0 {
return 0
}
tsStr := tsPart[:length]
ts, err := strconv.ParseUint(tsStr, 10, 64)
if err != nil {
return 0
}
return ts
}

return nil, nil
type pessimisticTxn interface {
kv.Transaction
// KeysNeedToLock returns the keys need to be locked.
KeysNeedToLock() ([]kv.Key, error)
}

// buildExecutor build a executor from plan, prepared statement may need additional procedure.
Expand Down Expand Up @@ -354,14 +547,15 @@ func (a *ExecStmt) buildExecutor(ctx sessionctx.Context) (Executor, error) {

// ExecuteExec is not a real Executor, we only use it to build another Executor from a prepared statement.
if executorExec, ok := e.(*ExecuteExec); ok {
err := executorExec.Build()
err := executorExec.Build(b)
if err != nil {
return nil, err
}
a.isPreparedStmt = true
a.Plan = executorExec.plan
e = executorExec.stmtExec
}
a.isSelectForUpdate = b.isSelectForUpdate
return e, nil
}

Expand Down
2 changes: 1 addition & 1 deletion executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ func (e *RecoverIndexExec) backfillIndexInTxn(ctx context.Context, txn kv.Transa
}

recordKey := e.table.RecordKey(row.handle)
err := txn.LockKeys(recordKey)
err := txn.LockKeys(ctx, 0, recordKey)
if err != nil {
return result, err
}
Expand Down
10 changes: 9 additions & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ type executorBuilder struct {
is infoschema.InfoSchema
startTS uint64 // cached when the first time getStartTS() is called
// err is set when there is error happened during Executor building process.
err error
err error
isSelectForUpdate bool
}

func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema) *executorBuilder {
Expand Down Expand Up @@ -470,6 +471,10 @@ func (b *executorBuilder) buildDeallocate(v *plannercore.Deallocate) Executor {
}

func (b *executorBuilder) buildSelectLock(v *plannercore.PhysicalLock) Executor {
b.isSelectForUpdate = true
// Build 'select for update' using the 'for update' ts.
b.startTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add txn.IsPessimistic check?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line seems not necessary

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not set b.startTS at the beginning, when builder is constructed?


src := b.build(v.Children()[0])
if b.err != nil {
return nil
Expand Down Expand Up @@ -590,6 +595,7 @@ func (b *executorBuilder) buildSet(v *plannercore.Set) Executor {
}

func (b *executorBuilder) buildInsert(v *plannercore.Insert) Executor {
b.startTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()
selectExec := b.build(v.SelectPlan)
if b.err != nil {
return nil
Expand Down Expand Up @@ -1254,6 +1260,7 @@ func (b *executorBuilder) buildUpdate(v *plannercore.Update) Executor {
for id := range v.SelectPlan.Schema().TblID2Handle {
tblID2table[id], _ = b.is.TableByID(id)
}
b.startTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()
selExec := b.build(v.SelectPlan)
if b.err != nil {
return nil
Expand Down Expand Up @@ -1335,6 +1342,7 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor {
for id := range v.SelectPlan.Schema().TblID2Handle {
tblID2table[id], _ = b.is.TableByID(id)
}
b.startTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()
selExec := b.build(v.SelectPlan)
if b.err != nil {
return nil
Expand Down
Loading