Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: move transaction options out to /kv #24619

Merged
merged 3 commits into from
May 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ func iterateSnapshotRows(store kv.Storage, priority int, t table.Table, version

ver := kv.Version{Ver: version}
snap := store.GetSnapshot(ver)
snap.SetOption(tikvstore.Priority, priority)
snap.SetOption(kv.Priority, priority)

it, err := snap.Iter(firstKey, upperBound)
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -1346,7 +1345,7 @@ func (w *updateColumnWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (t
errInTxn = kv.RunInNewTxn(context.Background(), w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error {
taskCtx.addedCount = 0
taskCtx.scanCount = 0
txn.SetOption(tikvstore.Priority, w.priority)
txn.SetOption(kv.Priority, w.priority)

rowRecords, nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange)
if err != nil {
Expand Down
5 changes: 2 additions & 3 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/tikv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
Expand Down Expand Up @@ -1117,7 +1116,7 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC
errInTxn = kv.RunInNewTxn(context.Background(), w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error {
taskCtx.addedCount = 0
taskCtx.scanCount = 0
txn.SetOption(tikvstore.Priority, w.priority)
txn.SetOption(kv.Priority, w.priority)

idxRecords, nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange)
if err != nil {
Expand Down Expand Up @@ -1329,7 +1328,7 @@ func (w *cleanUpIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (t
errInTxn = kv.RunInNewTxn(context.Background(), w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error {
taskCtx.addedCount = 0
taskCtx.scanCount = 0
txn.SetOption(tikvstore.Priority, w.priority)
txn.SetOption(kv.Priority, w.priority)

idxRecords, nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange)
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
tikverr "github.com/pingcap/tidb/store/tikv/error"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/util"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -629,7 +628,7 @@ func UpdateForUpdateTS(seCtx sessionctx.Context, newForUpdateTS uint64) error {
newForUpdateTS = version.Ver
}
seCtx.GetSessionVars().TxnCtx.SetForUpdateTS(newForUpdateTS)
txn.SetOption(tikvstore.SnapshotTS, seCtx.GetSessionVars().TxnCtx.GetForUpdateTS())
txn.SetOption(kv.SnapshotTS, seCtx.GetSessionVars().TxnCtx.GetForUpdateTS())
return nil
}

Expand Down
18 changes: 9 additions & 9 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -1121,9 +1121,9 @@ func (e *AnalyzeFastExec) activateTxnForRowCount() (rollbackFn func() error, err
return nil, errors.Trace(err)
}
}
txn.SetOption(tikvstore.Priority, kv.PriorityLow)
txn.SetOption(tikvstore.IsolationLevel, kv.RC)
txn.SetOption(tikvstore.NotFillCache, true)
txn.SetOption(kv.Priority, kv.PriorityLow)
txn.SetOption(kv.IsolationLevel, kv.RC)
txn.SetOption(kv.NotFillCache, true)
return rollbackFn, nil
}

Expand Down Expand Up @@ -1322,7 +1322,7 @@ func (e *AnalyzeFastExec) handleScanIter(iter kv.Iterator) (scanKeysSize int, er
func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err error) {
snapshot := e.ctx.GetStore().GetSnapshot(kv.MaxVersion)
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
snapshot.SetOption(tikvstore.ReplicaRead, tikvstore.ReplicaReadFollower)
snapshot.SetOption(kv.ReplicaRead, tikvstore.ReplicaReadFollower)
}
for _, t := range e.scanTasks {
iter, err := snapshot.Iter(kv.Key(t.StartKey), kv.Key(t.EndKey))
Expand All @@ -1341,11 +1341,11 @@ func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err
func (e *AnalyzeFastExec) handleSampTasks(workID int, step uint32, err *error) {
defer e.wg.Done()
snapshot := e.ctx.GetStore().GetSnapshot(kv.MaxVersion)
snapshot.SetOption(tikvstore.NotFillCache, true)
snapshot.SetOption(tikvstore.IsolationLevel, kv.RC)
snapshot.SetOption(tikvstore.Priority, kv.PriorityLow)
snapshot.SetOption(kv.NotFillCache, true)
snapshot.SetOption(kv.IsolationLevel, kv.RC)
snapshot.SetOption(kv.Priority, kv.PriorityLow)
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
snapshot.SetOption(tikvstore.ReplicaRead, tikvstore.ReplicaReadFollower)
snapshot.SetOption(kv.ReplicaRead, tikvstore.ReplicaReadFollower)
}

rander := rand.New(rand.NewSource(e.randSeed))
Expand All @@ -1356,7 +1356,7 @@ func (e *AnalyzeFastExec) handleSampTasks(workID int, step uint32, err *error) {
lower, upper := step-uint32(2*math.Sqrt(float64(step))), step
step = uint32(rander.Intn(int(upper-lower))) + lower
}
snapshot.SetOption(tikvstore.SampleStep, step)
snapshot.SetOption(kv.SampleStep, step)
kvMap := make(map[string][]byte)
var iter kv.Iterator
iter, *err = snapshot.Iter(kv.Key(task.StartKey), kv.Key(task.EndKey))
Expand Down
12 changes: 6 additions & 6 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,17 +113,17 @@ func (e *BatchPointGetExec) Open(context.Context) error {
e.stats = &runtimeStatsWithSnapshot{
SnapshotRuntimeStats: snapshotStats,
}
snapshot.SetOption(tikvstore.CollectRuntimeStats, snapshotStats)
snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats)
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
snapshot.SetOption(tikvstore.ReplicaRead, tikvstore.ReplicaReadFollower)
snapshot.SetOption(kv.ReplicaRead, tikvstore.ReplicaReadFollower)
}
snapshot.SetOption(tikvstore.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID)
snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID)
isStaleness := e.ctx.GetSessionVars().TxnCtx.IsStaleness
snapshot.SetOption(tikvstore.IsStalenessReadOnly, isStaleness)
snapshot.SetOption(kv.IsStalenessReadOnly, isStaleness)
if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != oracle.GlobalTxnScope {
snapshot.SetOption(tikvstore.MatchStoreLabels, []*metapb.StoreLabel{
snapshot.SetOption(kv.MatchStoreLabels, []*metapb.StoreLabel{
{
Key: placement.DCLabelKey,
Value: e.ctx.GetSessionVars().TxnCtx.TxnScope,
Expand All @@ -149,7 +149,7 @@ func (e *BatchPointGetExec) Open(context.Context) error {
// Close implements the Executor interface.
func (e *BatchPointGetExec) Close() error {
if e.runtimeStats != nil && e.snapshot != nil {
e.snapshot.DelOption(tikvstore.CollectRuntimeStats)
e.snapshot.DelOption(kv.CollectRuntimeStats)
}
e.inited = 0
e.index = 0
Expand Down
5 changes: 2 additions & 3 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -215,8 +214,8 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D

if e.collectRuntimeStatsEnabled() {
if snapshot := txn.GetSnapshot(); snapshot != nil {
snapshot.SetOption(tikvstore.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
defer snapshot.DelOption(tikvstore.CollectRuntimeStats)
snapshot.SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
defer snapshot.DelOption(kv.CollectRuntimeStats)
}
}
prefetchStart := time.Now()
Expand Down
5 changes: 2 additions & 3 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/tikv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -1049,8 +1048,8 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
}
if e.collectRuntimeStatsEnabled() {
if snapshot := txn.GetSnapshot(); snapshot != nil {
snapshot.SetOption(tikvstore.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
defer snapshot.DelOption(tikvstore.CollectRuntimeStats)
snapshot.SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
defer snapshot.DelOption(kv.CollectRuntimeStats)
}
}
prefetchStart := time.Now()
Expand Down
14 changes: 7 additions & 7 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,17 +144,17 @@ func (e *PointGetExecutor) Open(context.Context) error {
e.stats = &runtimeStatsWithSnapshot{
SnapshotRuntimeStats: snapshotStats,
}
e.snapshot.SetOption(tikvstore.CollectRuntimeStats, snapshotStats)
e.snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats)
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
e.snapshot.SetOption(tikvstore.ReplicaRead, tikvstore.ReplicaReadFollower)
e.snapshot.SetOption(kv.ReplicaRead, tikvstore.ReplicaReadFollower)
}
e.snapshot.SetOption(tikvstore.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID)
e.snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID)
isStaleness := e.ctx.GetSessionVars().TxnCtx.IsStaleness
e.snapshot.SetOption(tikvstore.IsStalenessReadOnly, isStaleness)
e.snapshot.SetOption(kv.IsStalenessReadOnly, isStaleness)
if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != oracle.GlobalTxnScope {
e.snapshot.SetOption(tikvstore.MatchStoreLabels, []*metapb.StoreLabel{
e.snapshot.SetOption(kv.MatchStoreLabels, []*metapb.StoreLabel{
{
Key: placement.DCLabelKey,
Value: e.ctx.GetSessionVars().TxnCtx.TxnScope,
Expand All @@ -167,7 +167,7 @@ func (e *PointGetExecutor) Open(context.Context) error {
// Close implements the Executor interface.
func (e *PointGetExecutor) Close() error {
if e.runtimeStats != nil && e.snapshot != nil {
e.snapshot.DelOption(tikvstore.CollectRuntimeStats)
e.snapshot.DelOption(kv.CollectRuntimeStats)
}
if e.idxInfo != nil && e.tblInfo != nil {
actRows := int64(0)
Expand Down Expand Up @@ -391,7 +391,7 @@ func (e *PointGetExecutor) get(ctx context.Context, key kv.Key) ([]byte, error)
}

func (e *PointGetExecutor) verifyTxnScope() error {
txnScope := e.txn.GetOption(tikvstore.TxnScope).(string)
txnScope := e.txn.GetOption(kv.TxnScope).(string)
if txnScope == "" || txnScope == oracle.GlobalTxnScope {
return nil
}
Expand Down
5 changes: 2 additions & 3 deletions executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/stmtctx"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -221,8 +220,8 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error {

if e.collectRuntimeStatsEnabled() {
if snapshot := txn.GetSnapshot(); snapshot != nil {
snapshot.SetOption(tikvstore.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
defer snapshot.DelOption(tikvstore.CollectRuntimeStats)
snapshot.SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
defer snapshot.DelOption(kv.CollectRuntimeStats)
}
}
prefetchStart := time.Now()
Expand Down
5 changes: 2 additions & 3 deletions executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
tikvutil "github.com/pingcap/tidb/store/tikv/util"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -606,10 +605,10 @@ func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error {
return err
}
if e.ctx.GetSessionVars().TxnCtx.IsPessimistic {
txn.SetOption(tikvstore.Pessimistic, true)
txn.SetOption(kv.Pessimistic, true)
}
if s.CausalConsistencyOnly {
txn.SetOption(tikvstore.GuaranteeLinearizability, false)
txn.SetOption(kv.GuaranteeLinearizability, false)
}
return nil
}
Expand Down
5 changes: 2 additions & 3 deletions executor/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/pingcap/tidb/kv"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/store/tikv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -261,7 +260,7 @@ func (e *UpdateExec) updateRows(ctx context.Context) (int, error) {
if e.collectRuntimeStatsEnabled() {
txn, err := e.ctx.Txn(false)
if err == nil && txn.GetSnapshot() != nil {
txn.GetSnapshot().SetOption(tikvstore.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
txn.GetSnapshot().SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
}
}
for rowIdx := 0; rowIdx < chk.NumRows(); rowIdx++ {
Expand Down Expand Up @@ -408,7 +407,7 @@ func (e *UpdateExec) Close() error {
if e.runtimeStats != nil && e.stats != nil {
txn, err := e.ctx.Txn(false)
if err == nil && txn.GetSnapshot() != nil {
txn.GetSnapshot().DelOption(tikvstore.CollectRuntimeStats)
txn.GetSnapshot().DelOption(kv.CollectRuntimeStats)
}
}
return e.children[0].Close()
Expand Down
3 changes: 1 addition & 2 deletions kv/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"context"

. "github.com/pingcap/check"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
)

Expand All @@ -35,7 +34,7 @@ func (s testMockSuite) TestInterface(c *C) {
snapshot := storage.GetSnapshot(version)
_, err = snapshot.BatchGet(context.Background(), []Key{Key("abc"), Key("def")})
c.Check(err, IsNil)
snapshot.SetOption(tikvstore.Priority, PriorityNormal)
snapshot.SetOption(Priority, PriorityNormal)

transaction, err := storage.Begin()
c.Check(err, IsNil)
Expand Down
62 changes: 62 additions & 0 deletions kv/option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2021 PingCAP, Inc.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package kv

// Transaction options
const (
// BinlogInfo contains the binlog data and client.
BinlogInfo int = iota + 1
// SchemaChecker is used for checking schema-validity.
SchemaChecker
// IsolationLevel sets isolation level for current transaction. The default level is SI.
IsolationLevel
// Priority marks the priority of this transaction.
Priority
// NotFillCache makes this request do not touch the LRU cache of the underlying storage.
NotFillCache
// SyncLog decides whether the WAL(write-ahead log) of this request should be synchronized.
SyncLog
// KeyOnly retrieve only keys, it can be used in scan now.
KeyOnly
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think KeyOnly is already unused.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean this variable.

// Pessimistic is defined for pessimistic lock
Pessimistic
// SnapshotTS is defined to set snapshot ts.
SnapshotTS
// Set replica read
ReplicaRead
// Set task ID
TaskID
// InfoSchema is schema version used by txn startTS.
InfoSchema
// CollectRuntimeStats is used to enable collect runtime stats.
CollectRuntimeStats
// SchemaAmender is used to amend mutations for pessimistic transactions
SchemaAmender
// SampleStep skips 'SampleStep - 1' number of keys after each returned key.
SampleStep
// CommitHook is a callback function called right after the transaction gets committed
CommitHook
// EnableAsyncCommit indicates whether async commit is enabled
EnableAsyncCommit
// Enable1PC indicates whether one-phase commit is enabled
Enable1PC
// GuaranteeLinearizability indicates whether to guarantee linearizability at the cost of an extra tso request before prewrite
GuaranteeLinearizability
// TxnScope indicates which @@txn_scope this transaction will work with.
TxnScope
// StalenessReadOnly indicates whether the transaction is staleness read only transaction
IsStalenessReadOnly
// MatchStoreLabels indicates the labels the store should be matched
MatchStoreLabels
)
5 changes: 2 additions & 3 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/structure"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -94,8 +93,8 @@ type Meta struct {
// NewMeta creates a Meta in transaction txn.
// If the current Meta needs to handle a job, jobListKey is the type of the job's list.
func NewMeta(txn kv.Transaction, jobListKeys ...JobListKeyType) *Meta {
txn.SetOption(tikvstore.Priority, kv.PriorityHigh)
txn.SetOption(tikvstore.SyncLog, struct{}{})
txn.SetOption(kv.Priority, kv.PriorityHigh)
txn.SetOption(kv.SyncLog, struct{}{})
t := structure.NewStructure(txn, txn, mMetaPrefix)
listKey := DefaultJobListKey
if len(jobListKeys) != 0 {
Expand Down
Loading