Skip to content

Commit

Permalink
distsql: disable read committed isolation level (#7280)
Browse files Browse the repository at this point in the history
read committed isolation may cause inconsistent read.
Only Analyze request use read committed isolation level.
  • Loading branch information
coocood authored Aug 6, 2018
1 parent 88bf9f7 commit a7fc9c3
Show file tree
Hide file tree
Showing 11 changed files with 22 additions and 71 deletions.
1 change: 0 additions & 1 deletion ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ func (d *ddlCtx) buildDescTableScan(ctx context.Context, startTS uint64, tbl tab

builder.Request.NotFillCache = true
builder.Request.Priority = kv.PriorityLow
builder.Request.IsolationLevel = kv.SI

kvReq, err := builder.Build()
sctx := newContext(d.store)
Expand Down
15 changes: 4 additions & 11 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"math"

"github.com/juju/errors"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/sessionctx/stmtctx"
Expand Down Expand Up @@ -125,15 +124,9 @@ func (builder *RequestBuilder) SetKeepOrder(order bool) *RequestBuilder {
return builder
}

func (builder *RequestBuilder) getIsolationLevel(sv *variable.SessionVars) kv.IsoLevel {
var isoLevel string
if sv.TxnIsolationLevelOneShot.State == 2 {
isoLevel = sv.TxnIsolationLevelOneShot.Value
}
if isoLevel == "" {
isoLevel, _ = sv.GetSystemVar(variable.TxnIsolation)
}
if isoLevel == ast.ReadCommitted {
func (builder *RequestBuilder) getIsolationLevel() kv.IsoLevel {
switch builder.Tp {
case kv.ReqTypeAnalyze:
return kv.RC
}
return kv.SI
Expand All @@ -155,7 +148,7 @@ func (builder *RequestBuilder) getKVPriority(sv *variable.SessionVars) int {
// "Concurrency", "IsolationLevel", "NotFillCache".
func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *RequestBuilder {
builder.Request.Concurrency = sv.DistSQLScanConcurrency
builder.Request.IsolationLevel = builder.getIsolationLevel(sv)
builder.Request.IsolationLevel = builder.getIsolationLevel()
builder.Request.NotFillCache = sv.StmtCtx.NotFillCache
builder.Request.Priority = builder.getKVPriority(sv)
return builder
Expand Down
2 changes: 1 addition & 1 deletion distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ func (s *testSuite) TestRequestBuilder5(c *C) {
KeepOrder: true,
Desc: false,
Concurrency: 15,
IsolationLevel: 0,
IsolationLevel: kv.RC,
Priority: 1,
NotFillCache: true,
SyncLog: false,
Expand Down
3 changes: 0 additions & 3 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/juju/errors"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
Expand Down Expand Up @@ -176,7 +175,6 @@ func (e *AnalyzeIndexExec) open() error {
SetKeepOrder(true).
Build()
kvReq.Concurrency = e.concurrency
kvReq.IsolationLevel = kv.RC
ctx := context.TODO()
e.result, err = distsql.Analyze(ctx, e.ctx.GetClient(), kvReq, e.ctx.GetSessionVars().KVVars)
if err != nil {
Expand Down Expand Up @@ -291,7 +289,6 @@ func (e *AnalyzeColumnsExec) buildResp(ranges []*ranger.Range) (distsql.SelectRe
SetAnalyzeRequest(e.analyzePB).
SetKeepOrder(e.keepOrder).
Build()
kvReq.IsolationLevel = kv.RC
kvReq.Concurrency = e.concurrency
if err != nil {
return nil, errors.Trace(err)
Expand Down
8 changes: 0 additions & 8 deletions executor/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/terror"
Expand Down Expand Up @@ -181,13 +180,6 @@ func (e *SetExecutor) setSysVariable(name string, v *expression.VarAssignment) e
log.Infof("con:%d %s=%s", sessionVars.ConnectionID, name, valStr)
}

if name == variable.TxnIsolation {
isoLevel, _ := sessionVars.GetSystemVar(variable.TxnIsolation)
if isoLevel == ast.ReadCommitted {
e.ctx.Txn().SetOption(kv.IsolationLevel, kv.RC)
}
}

return nil
}

Expand Down
4 changes: 0 additions & 4 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1351,10 +1351,6 @@ func (s *session) ActivePendingTxn() error {
return errors.Trace(err)
}
s.sessionVars.TxnCtx.StartTS = s.txn.StartTS()
isoLevel, _ := s.sessionVars.GetSystemVar(variable.TxnIsolation)
if isoLevel == ast.ReadCommitted {
s.txn.SetOption(kv.IsolationLevel, kv.RC)
}
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2000,7 +2000,7 @@ func (s *testSessionSuite) TestSetTransactionIsolationOneShot(c *C) {

// Check isolation level is set to read committed.
ctx := context.WithValue(context.Background(), "CheckSelectRequestHook", func(req *kv.Request) {
c.Assert(req.IsolationLevel, Equals, kv.RC)
c.Assert(req.IsolationLevel, Equals, kv.SI)
})
tk.Se.Execute(ctx, "select * from t where k = 1")

Expand Down
16 changes: 0 additions & 16 deletions store/tikv/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,22 +181,6 @@ func (s *testLockSuite) TestGetTxnStatus(c *C) {
c.Assert(status.IsCommitted(), IsFalse)
}

func (s *testLockSuite) TestRC(c *C) {
s.putKV(c, []byte("key"), []byte("v1"))

txn, err := s.store.Begin()
c.Assert(err, IsNil)
txn.Set([]byte("key"), []byte("v2"))
s.prewriteTxn(c, txn.(*tikvTxn))

txn2, err := s.store.Begin()
c.Assert(err, IsNil)
txn2.SetOption(kv.IsolationLevel, kv.RC)
val, err := txn2.Get([]byte("key"))
c.Assert(err, IsNil)
c.Assert(string(val), Equals, "v1")
}

func (s *testLockSuite) prewriteTxn(c *C, txn *tikvTxn) {
committer, err := newTwoPhaseCommitter(txn, 0)
c.Assert(err, IsNil)
Expand Down
5 changes: 2 additions & 3 deletions store/tikv/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,8 @@ func (s *Scanner) getData(bo *Backoffer) error {
Version: s.startTS(),
},
Context: pb.Context{
IsolationLevel: pbIsolationLevel(s.snapshot.isolationLevel),
Priority: s.snapshot.priority,
NotFillCache: s.snapshot.notFillCache,
Priority: s.snapshot.priority,
NotFillCache: s.snapshot.notFillCache,
},
}
resp, err := sender.SendReq(bo, req, loc.Region, ReadTimeoutMedium)
Expand Down
32 changes: 14 additions & 18 deletions store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,23 @@ const (

// tikvSnapshot implements the kv.Snapshot interface.
type tikvSnapshot struct {
store *tikvStore
version kv.Version
isolationLevel kv.IsoLevel
priority pb.CommandPri
notFillCache bool
syncLog bool
vars *kv.Variables
store *tikvStore
version kv.Version
priority pb.CommandPri
notFillCache bool
syncLog bool
vars *kv.Variables
}

var snapshotGP = gp.New(time.Minute)

// newTiKVSnapshot creates a snapshot of an TiKV store.
func newTiKVSnapshot(store *tikvStore, ver kv.Version) *tikvSnapshot {
return &tikvSnapshot{
store: store,
version: ver,
isolationLevel: kv.SI,
priority: pb.CommandPri_Normal,
vars: kv.DefaultVars,
store: store,
version: ver,
priority: pb.CommandPri_Normal,
vars: kv.DefaultVars,
}
}

Expand Down Expand Up @@ -151,9 +149,8 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll
Version: s.version.Ver,
},
Context: pb.Context{
Priority: s.priority,
IsolationLevel: pbIsolationLevel(s.isolationLevel),
NotFillCache: s.notFillCache,
Priority: s.priority,
NotFillCache: s.notFillCache,
},
}
resp, err := sender.SendReq(bo, req, batch.region, ReadTimeoutMedium)
Expand Down Expand Up @@ -233,9 +230,8 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) {
Version: s.version.Ver,
},
Context: pb.Context{
Priority: s.priority,
IsolationLevel: pbIsolationLevel(s.isolationLevel),
NotFillCache: s.notFillCache,
Priority: s.priority,
NotFillCache: s.notFillCache,
},
}
for {
Expand Down
5 changes: 0 additions & 5 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,6 @@ func (txn *tikvTxn) Delete(k kv.Key) error {
func (txn *tikvTxn) SetOption(opt kv.Option, val interface{}) {
txn.us.SetOption(opt, val)
switch opt {
case kv.IsolationLevel:
txn.snapshot.isolationLevel = val.(kv.IsoLevel)
case kv.Priority:
txn.snapshot.priority = kvPriorityToCommandPri(val.(int))
case kv.NotFillCache:
Expand All @@ -157,9 +155,6 @@ func (txn *tikvTxn) SetOption(opt kv.Option, val interface{}) {

func (txn *tikvTxn) DelOption(opt kv.Option) {
txn.us.DelOption(opt)
if opt == kv.IsolationLevel {
txn.snapshot.isolationLevel = kv.SI
}
}

func (txn *tikvTxn) Commit(ctx context.Context) error {
Expand Down

0 comments on commit a7fc9c3

Please sign in to comment.