Skip to content

Commit

Permalink
store/tikv: move transaction options out to /kv (#24619)
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing authored May 13, 2021
1 parent 9c3f1b7 commit d2bdfd5
Show file tree
Hide file tree
Showing 21 changed files with 153 additions and 152 deletions.
2 changes: 1 addition & 1 deletion ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ func iterateSnapshotRows(store kv.Storage, priority int, t table.Table, version

ver := kv.Version{Ver: version}
snap := store.GetSnapshot(ver)
snap.SetOption(tikvstore.Priority, priority)
snap.SetOption(kv.Priority, priority)

it, err := snap.Iter(firstKey, upperBound)
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -1346,7 +1345,7 @@ func (w *updateColumnWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (t
errInTxn = kv.RunInNewTxn(context.Background(), w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error {
taskCtx.addedCount = 0
taskCtx.scanCount = 0
txn.SetOption(tikvstore.Priority, w.priority)
txn.SetOption(kv.Priority, w.priority)

rowRecords, nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange)
if err != nil {
Expand Down
5 changes: 2 additions & 3 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/tikv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
Expand Down Expand Up @@ -1117,7 +1116,7 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC
errInTxn = kv.RunInNewTxn(context.Background(), w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error {
taskCtx.addedCount = 0
taskCtx.scanCount = 0
txn.SetOption(tikvstore.Priority, w.priority)
txn.SetOption(kv.Priority, w.priority)

idxRecords, nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange)
if err != nil {
Expand Down Expand Up @@ -1329,7 +1328,7 @@ func (w *cleanUpIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (t
errInTxn = kv.RunInNewTxn(context.Background(), w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error {
taskCtx.addedCount = 0
taskCtx.scanCount = 0
txn.SetOption(tikvstore.Priority, w.priority)
txn.SetOption(kv.Priority, w.priority)

idxRecords, nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange)
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
tikverr "github.com/pingcap/tidb/store/tikv/error"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/util"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -629,7 +628,7 @@ func UpdateForUpdateTS(seCtx sessionctx.Context, newForUpdateTS uint64) error {
newForUpdateTS = version.Ver
}
seCtx.GetSessionVars().TxnCtx.SetForUpdateTS(newForUpdateTS)
txn.SetOption(tikvstore.SnapshotTS, seCtx.GetSessionVars().TxnCtx.GetForUpdateTS())
txn.SetOption(kv.SnapshotTS, seCtx.GetSessionVars().TxnCtx.GetForUpdateTS())
return nil
}

Expand Down
18 changes: 9 additions & 9 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -1121,9 +1121,9 @@ func (e *AnalyzeFastExec) activateTxnForRowCount() (rollbackFn func() error, err
return nil, errors.Trace(err)
}
}
txn.SetOption(tikvstore.Priority, kv.PriorityLow)
txn.SetOption(tikvstore.IsolationLevel, kv.RC)
txn.SetOption(tikvstore.NotFillCache, true)
txn.SetOption(kv.Priority, kv.PriorityLow)
txn.SetOption(kv.IsolationLevel, kv.RC)
txn.SetOption(kv.NotFillCache, true)
return rollbackFn, nil
}

Expand Down Expand Up @@ -1322,7 +1322,7 @@ func (e *AnalyzeFastExec) handleScanIter(iter kv.Iterator) (scanKeysSize int, er
func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err error) {
snapshot := e.ctx.GetStore().GetSnapshot(kv.MaxVersion)
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
snapshot.SetOption(tikvstore.ReplicaRead, tikvstore.ReplicaReadFollower)
snapshot.SetOption(kv.ReplicaRead, tikvstore.ReplicaReadFollower)
}
for _, t := range e.scanTasks {
iter, err := snapshot.Iter(kv.Key(t.StartKey), kv.Key(t.EndKey))
Expand All @@ -1341,11 +1341,11 @@ func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err
func (e *AnalyzeFastExec) handleSampTasks(workID int, step uint32, err *error) {
defer e.wg.Done()
snapshot := e.ctx.GetStore().GetSnapshot(kv.MaxVersion)
snapshot.SetOption(tikvstore.NotFillCache, true)
snapshot.SetOption(tikvstore.IsolationLevel, kv.RC)
snapshot.SetOption(tikvstore.Priority, kv.PriorityLow)
snapshot.SetOption(kv.NotFillCache, true)
snapshot.SetOption(kv.IsolationLevel, kv.RC)
snapshot.SetOption(kv.Priority, kv.PriorityLow)
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
snapshot.SetOption(tikvstore.ReplicaRead, tikvstore.ReplicaReadFollower)
snapshot.SetOption(kv.ReplicaRead, tikvstore.ReplicaReadFollower)
}

rander := rand.New(rand.NewSource(e.randSeed))
Expand All @@ -1356,7 +1356,7 @@ func (e *AnalyzeFastExec) handleSampTasks(workID int, step uint32, err *error) {
lower, upper := step-uint32(2*math.Sqrt(float64(step))), step
step = uint32(rander.Intn(int(upper-lower))) + lower
}
snapshot.SetOption(tikvstore.SampleStep, step)
snapshot.SetOption(kv.SampleStep, step)
kvMap := make(map[string][]byte)
var iter kv.Iterator
iter, *err = snapshot.Iter(kv.Key(task.StartKey), kv.Key(task.EndKey))
Expand Down
12 changes: 6 additions & 6 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,17 +113,17 @@ func (e *BatchPointGetExec) Open(context.Context) error {
e.stats = &runtimeStatsWithSnapshot{
SnapshotRuntimeStats: snapshotStats,
}
snapshot.SetOption(tikvstore.CollectRuntimeStats, snapshotStats)
snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats)
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
snapshot.SetOption(tikvstore.ReplicaRead, tikvstore.ReplicaReadFollower)
snapshot.SetOption(kv.ReplicaRead, tikvstore.ReplicaReadFollower)
}
snapshot.SetOption(tikvstore.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID)
snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID)
isStaleness := e.ctx.GetSessionVars().TxnCtx.IsStaleness
snapshot.SetOption(tikvstore.IsStalenessReadOnly, isStaleness)
snapshot.SetOption(kv.IsStalenessReadOnly, isStaleness)
if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != oracle.GlobalTxnScope {
snapshot.SetOption(tikvstore.MatchStoreLabels, []*metapb.StoreLabel{
snapshot.SetOption(kv.MatchStoreLabels, []*metapb.StoreLabel{
{
Key: placement.DCLabelKey,
Value: e.ctx.GetSessionVars().TxnCtx.TxnScope,
Expand All @@ -149,7 +149,7 @@ func (e *BatchPointGetExec) Open(context.Context) error {
// Close implements the Executor interface.
func (e *BatchPointGetExec) Close() error {
if e.runtimeStats != nil && e.snapshot != nil {
e.snapshot.DelOption(tikvstore.CollectRuntimeStats)
e.snapshot.DelOption(kv.CollectRuntimeStats)
}
e.inited = 0
e.index = 0
Expand Down
5 changes: 2 additions & 3 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -215,8 +214,8 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D

if e.collectRuntimeStatsEnabled() {
if snapshot := txn.GetSnapshot(); snapshot != nil {
snapshot.SetOption(tikvstore.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
defer snapshot.DelOption(tikvstore.CollectRuntimeStats)
snapshot.SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
defer snapshot.DelOption(kv.CollectRuntimeStats)
}
}
prefetchStart := time.Now()
Expand Down
5 changes: 2 additions & 3 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/tikv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -1049,8 +1048,8 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
}
if e.collectRuntimeStatsEnabled() {
if snapshot := txn.GetSnapshot(); snapshot != nil {
snapshot.SetOption(tikvstore.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
defer snapshot.DelOption(tikvstore.CollectRuntimeStats)
snapshot.SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
defer snapshot.DelOption(kv.CollectRuntimeStats)
}
}
prefetchStart := time.Now()
Expand Down
14 changes: 7 additions & 7 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,17 +144,17 @@ func (e *PointGetExecutor) Open(context.Context) error {
e.stats = &runtimeStatsWithSnapshot{
SnapshotRuntimeStats: snapshotStats,
}
e.snapshot.SetOption(tikvstore.CollectRuntimeStats, snapshotStats)
e.snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats)
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
e.snapshot.SetOption(tikvstore.ReplicaRead, tikvstore.ReplicaReadFollower)
e.snapshot.SetOption(kv.ReplicaRead, tikvstore.ReplicaReadFollower)
}
e.snapshot.SetOption(tikvstore.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID)
e.snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID)
isStaleness := e.ctx.GetSessionVars().TxnCtx.IsStaleness
e.snapshot.SetOption(tikvstore.IsStalenessReadOnly, isStaleness)
e.snapshot.SetOption(kv.IsStalenessReadOnly, isStaleness)
if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != oracle.GlobalTxnScope {
e.snapshot.SetOption(tikvstore.MatchStoreLabels, []*metapb.StoreLabel{
e.snapshot.SetOption(kv.MatchStoreLabels, []*metapb.StoreLabel{
{
Key: placement.DCLabelKey,
Value: e.ctx.GetSessionVars().TxnCtx.TxnScope,
Expand All @@ -167,7 +167,7 @@ func (e *PointGetExecutor) Open(context.Context) error {
// Close implements the Executor interface.
func (e *PointGetExecutor) Close() error {
if e.runtimeStats != nil && e.snapshot != nil {
e.snapshot.DelOption(tikvstore.CollectRuntimeStats)
e.snapshot.DelOption(kv.CollectRuntimeStats)
}
if e.idxInfo != nil && e.tblInfo != nil {
actRows := int64(0)
Expand Down Expand Up @@ -391,7 +391,7 @@ func (e *PointGetExecutor) get(ctx context.Context, key kv.Key) ([]byte, error)
}

func (e *PointGetExecutor) verifyTxnScope() error {
txnScope := e.txn.GetOption(tikvstore.TxnScope).(string)
txnScope := e.txn.GetOption(kv.TxnScope).(string)
if txnScope == "" || txnScope == oracle.GlobalTxnScope {
return nil
}
Expand Down
5 changes: 2 additions & 3 deletions executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/stmtctx"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -221,8 +220,8 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error {

if e.collectRuntimeStatsEnabled() {
if snapshot := txn.GetSnapshot(); snapshot != nil {
snapshot.SetOption(tikvstore.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
defer snapshot.DelOption(tikvstore.CollectRuntimeStats)
snapshot.SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
defer snapshot.DelOption(kv.CollectRuntimeStats)
}
}
prefetchStart := time.Now()
Expand Down
5 changes: 2 additions & 3 deletions executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
tikvutil "github.com/pingcap/tidb/store/tikv/util"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -606,10 +605,10 @@ func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error {
return err
}
if e.ctx.GetSessionVars().TxnCtx.IsPessimistic {
txn.SetOption(tikvstore.Pessimistic, true)
txn.SetOption(kv.Pessimistic, true)
}
if s.CausalConsistencyOnly {
txn.SetOption(tikvstore.GuaranteeLinearizability, false)
txn.SetOption(kv.GuaranteeLinearizability, false)
}
return nil
}
Expand Down
5 changes: 2 additions & 3 deletions executor/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/pingcap/tidb/kv"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/store/tikv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -261,7 +260,7 @@ func (e *UpdateExec) updateRows(ctx context.Context) (int, error) {
if e.collectRuntimeStatsEnabled() {
txn, err := e.ctx.Txn(false)
if err == nil && txn.GetSnapshot() != nil {
txn.GetSnapshot().SetOption(tikvstore.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
txn.GetSnapshot().SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
}
}
for rowIdx := 0; rowIdx < chk.NumRows(); rowIdx++ {
Expand Down Expand Up @@ -408,7 +407,7 @@ func (e *UpdateExec) Close() error {
if e.runtimeStats != nil && e.stats != nil {
txn, err := e.ctx.Txn(false)
if err == nil && txn.GetSnapshot() != nil {
txn.GetSnapshot().DelOption(tikvstore.CollectRuntimeStats)
txn.GetSnapshot().DelOption(kv.CollectRuntimeStats)
}
}
return e.children[0].Close()
Expand Down
3 changes: 1 addition & 2 deletions kv/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"context"

. "github.com/pingcap/check"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
)

Expand All @@ -35,7 +34,7 @@ func (s testMockSuite) TestInterface(c *C) {
snapshot := storage.GetSnapshot(version)
_, err = snapshot.BatchGet(context.Background(), []Key{Key("abc"), Key("def")})
c.Check(err, IsNil)
snapshot.SetOption(tikvstore.Priority, PriorityNormal)
snapshot.SetOption(Priority, PriorityNormal)

transaction, err := storage.Begin()
c.Check(err, IsNil)
Expand Down
62 changes: 62 additions & 0 deletions kv/option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2021 PingCAP, Inc.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package kv

// Transaction options
const (
// BinlogInfo contains the binlog data and client.
BinlogInfo int = iota + 1
// SchemaChecker is used for checking schema-validity.
SchemaChecker
// IsolationLevel sets isolation level for current transaction. The default level is SI.
IsolationLevel
// Priority marks the priority of this transaction.
Priority
// NotFillCache makes this request do not touch the LRU cache of the underlying storage.
NotFillCache
// SyncLog decides whether the WAL(write-ahead log) of this request should be synchronized.
SyncLog
// KeyOnly retrieve only keys, it can be used in scan now.
KeyOnly
// Pessimistic is defined for pessimistic lock
Pessimistic
// SnapshotTS is defined to set snapshot ts.
SnapshotTS
// Set replica read
ReplicaRead
// Set task ID
TaskID
// InfoSchema is schema version used by txn startTS.
InfoSchema
// CollectRuntimeStats is used to enable collect runtime stats.
CollectRuntimeStats
// SchemaAmender is used to amend mutations for pessimistic transactions
SchemaAmender
// SampleStep skips 'SampleStep - 1' number of keys after each returned key.
SampleStep
// CommitHook is a callback function called right after the transaction gets committed
CommitHook
// EnableAsyncCommit indicates whether async commit is enabled
EnableAsyncCommit
// Enable1PC indicates whether one-phase commit is enabled
Enable1PC
// GuaranteeLinearizability indicates whether to guarantee linearizability at the cost of an extra tso request before prewrite
GuaranteeLinearizability
// TxnScope indicates which @@txn_scope this transaction will work with.
TxnScope
// StalenessReadOnly indicates whether the transaction is staleness read only transaction
IsStalenessReadOnly
// MatchStoreLabels indicates the labels the store should be matched
MatchStoreLabels
)
5 changes: 2 additions & 3 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/structure"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -94,8 +93,8 @@ type Meta struct {
// NewMeta creates a Meta in transaction txn.
// If the current Meta needs to handle a job, jobListKey is the type of the job's list.
func NewMeta(txn kv.Transaction, jobListKeys ...JobListKeyType) *Meta {
txn.SetOption(tikvstore.Priority, kv.PriorityHigh)
txn.SetOption(tikvstore.SyncLog, struct{}{})
txn.SetOption(kv.Priority, kv.PriorityHigh)
txn.SetOption(kv.SyncLog, struct{}{})
t := structure.NewStructure(txn, txn, mMetaPrefix)
listKey := DefaultJobListKey
if len(jobListKeys) != 0 {
Expand Down
Loading

0 comments on commit d2bdfd5

Please sign in to comment.