Skip to content

Commit

Permalink
kv: added RunInTransaction style transaction interface (#5347)
Browse files Browse the repository at this point in the history
ref #5293

Signed-off-by: David <8039876+AmoebaProtozoa@users.noreply.github.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
AmoebaProtozoa and ti-chi-bot authored Jan 10, 2023
1 parent 275c43e commit 1b778f2
Show file tree
Hide file tree
Showing 5 changed files with 365 additions and 4 deletions.
113 changes: 113 additions & 0 deletions pkg/storage/kv/etcd_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -181,3 +182,115 @@ func (t *SlowLogTxn) Commit() (*clientv3.TxnResponse, error) {

return resp, errors.WithStack(err)
}

// etcdTxn is used to record user's action during RunInTxn,
// It stores modification in operations to apply as a single transaction during commit.
// All load/loadRange result will be stored in conditions.
// Transaction commit will be successful only if all conditions are met,
// aka, no other transaction has modified values loaded during current transaction.
type etcdTxn struct {
kv *etcdKVBase
ctx context.Context
// mu protects conditions and operations.
mu syncutil.Mutex
conditions []clientv3.Cmp
operations []clientv3.Op
}

// RunInTxn runs user provided function f in a transaction.
func (kv *etcdKVBase) RunInTxn(ctx context.Context, f func(txn Txn) error) error {
txn := &etcdTxn{
kv: kv,
ctx: ctx,
}
err := f(txn)
if err != nil {
return err
}
return txn.commit()
}

// Save puts a put operation into operations.
// Note that save result are not immediately observable before current transaction commit.
func (txn *etcdTxn) Save(key, value string) error {
key = path.Join(txn.kv.rootPath, key)
operation := clientv3.OpPut(key, value)
txn.mu.Lock()
defer txn.mu.Unlock()
txn.operations = append(txn.operations, operation)
return nil
}

// Remove puts a delete operation into operations.
func (txn *etcdTxn) Remove(key string) error {
key = path.Join(txn.kv.rootPath, key)
operation := clientv3.OpDelete(key)
txn.mu.Lock()
defer txn.mu.Unlock()
txn.operations = append(txn.operations, operation)
return nil
}

// Load loads the target value from etcd and puts a comparator into conditions.
func (txn *etcdTxn) Load(key string) (string, error) {
key = path.Join(txn.kv.rootPath, key)
resp, err := etcdutil.EtcdKVGet(txn.kv.client, key)
if err != nil {
return "", err
}
var condition clientv3.Cmp
var value string
switch respLen := len(resp.Kvs); {
case respLen == 0:
// If target key does not contain a value, pin the CreateRevision of the key to 0.
// Returned value should be empty string.
value = ""
condition = clientv3.Compare(clientv3.CreateRevision(key), "=", 0)
case respLen == 1:
// If target key has value, must make sure it stays the same at the time of commit.
value = string(resp.Kvs[0].Value)
condition = clientv3.Compare(clientv3.Value(key), "=", value)
default:
// If response contains multiple kvs, error occurred.
return "", errs.ErrEtcdKVGetResponse.GenWithStackByArgs(resp.Kvs)
}
// Append the check condition to transaction.
txn.mu.Lock()
defer txn.mu.Unlock()
txn.conditions = append(txn.conditions, condition)
return value, nil
}

// LoadRange loads the target range from etcd,
// Then for each value loaded, it puts a comparator into conditions.
func (txn *etcdTxn) LoadRange(key, endKey string, limit int) (keys []string, values []string, err error) {
keys, values, err = txn.kv.LoadRange(key, endKey, limit)
// If LoadRange failed, preserve the failure behavior of base LoadRange.
if err != nil {
return keys, values, err
}
// If LoadRange successful, must make sure values stay the same before commit.
txn.mu.Lock()
defer txn.mu.Unlock()
for i := range keys {
fullKey := path.Join(txn.kv.rootPath, keys[i])
condition := clientv3.Compare(clientv3.Value(fullKey), "=", values[i])
txn.conditions = append(txn.conditions, condition)
}
return keys, values, err
}

// commit perform the operations on etcd, with pre-condition that values observed by user has not been changed.
func (txn *etcdTxn) commit() error {
baseTxn := txn.kv.client.Txn(txn.ctx)
baseTxn.If(txn.conditions...)
baseTxn.Then(txn.operations...)
resp, err := baseTxn.Commit()
if err != nil {
return err
}
if !resp.Succeeded {
return errs.ErrEtcdTxnConflict.FastGenByArgs()
}
return nil
}
30 changes: 26 additions & 4 deletions pkg/storage/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,32 @@

package kv

// Base is an abstract interface for load/save pd cluster data.
type Base interface {
Load(key string) (string, error)
LoadRange(key, endKey string, limit int) (keys []string, values []string, err error)
import "context"

// Txn bundles multiple operations into a single executable unit.
// It enables kv to atomically apply a set of updates.
type Txn interface {
Save(key, value string) error
Remove(key string) error
Load(key string) (string, error)
LoadRange(key, endKey string, limit int) (keys []string, values []string, err error)
}

// Base is an abstract interface for load/save pd cluster data.
type Base interface {
Txn
// RunInTxn runs the user provided function in a Transaction.
// If user provided function f returns a non-nil error, then
// transaction will not be committed, the same error will be
// returned by RunInTxn.
// Otherwise, it returns the error occurred during the
// transaction.
// Note that transaction are not committed until RunInTxn returns nil.
// Note:
// 1. Load and LoadRange operations provides only stale read.
// Values saved/ removed during transaction will not be immediately
// observable in the same transaction.
// 2. Only when storage is etcd, does RunInTxn checks that
// values loaded during transaction has not been modified before commit.
RunInTxn(ctx context.Context, f func(txn Txn) error) error
}
53 changes: 53 additions & 0 deletions pkg/storage/kv/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package kv

import (
"context"
"fmt"
"net/url"
"path"
Expand Down Expand Up @@ -45,6 +46,8 @@ func TestEtcd(t *testing.T) {
kv := NewEtcdKVBase(client, rootPath)
testReadWrite(re, kv)
testRange(re, kv)
testSaveMultiple(re, kv, 20)
testLoadConflict(re, kv)
}

func TestLevelDB(t *testing.T) {
Expand All @@ -55,13 +58,15 @@ func TestLevelDB(t *testing.T) {

testReadWrite(re, kv)
testRange(re, kv)
testSaveMultiple(re, kv, 20)
}

func TestMemKV(t *testing.T) {
re := require.New(t)
kv := NewMemoryKV()
testReadWrite(re, kv)
testRange(re, kv)
testSaveMultiple(re, kv, 20)
}

func testReadWrite(re *require.Assertions, kv Base) {
Expand Down Expand Up @@ -137,3 +142,51 @@ func newTestSingleConfig(t *testing.T) *embed.Config {
cfg.ClusterState = embed.ClusterStateFlagNew
return cfg
}

func testSaveMultiple(re *require.Assertions, kv Base, count int) {
err := kv.RunInTxn(context.Background(), func(txn Txn) error {
var saveErr error
for i := 0; i < count; i++ {
saveErr = txn.Save("key"+strconv.Itoa(i), "val"+strconv.Itoa(i))
if saveErr != nil {
return saveErr
}
}
return nil
})
re.NoError(err)
for i := 0; i < count; i++ {
val, loadErr := kv.Load("key" + strconv.Itoa(i))
re.NoError(loadErr)
re.Equal("val"+strconv.Itoa(i), val)
}
}

// testLoadConflict checks that if any value loaded during the current transaction
// has been modified by another transaction before the current one commit,
// then the current transaction must fail.
func testLoadConflict(re *require.Assertions, kv Base) {
re.NoError(kv.Save("testKey", "initialValue"))
// loader loads the test key value.
loader := func(txn Txn) error {
_, err := txn.Load("testKey")
if err != nil {
return err
}
return nil
}
// When no other writer, loader must succeed.
re.NoError(kv.RunInTxn(context.Background(), loader))

conflictLoader := func(txn Txn) error {
_, err := txn.Load("testKey")
// update key after load.
re.NoError(kv.Save("testKey", "newValue"))
if err != nil {
return err
}
return nil
}
// When other writer exists, loader must error.
re.Error(kv.RunInTxn(context.Background(), conflictLoader))
}
71 changes: 71 additions & 0 deletions pkg/storage/kv/levedb_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
package kv

import (
"context"

"github.com/pingcap/errors"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/util"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/utils/syncutil"
)

// LevelDBKV is a kv store using LevelDB.
Expand Down Expand Up @@ -74,3 +77,71 @@ func (kv *LevelDBKV) Save(key, value string) error {
func (kv *LevelDBKV) Remove(key string) error {
return errors.WithStack(kv.Delete([]byte(key), nil))
}

// levelDBTxn implements kv.Txn.
// It utilizes leveldb.Batch to batch user operations to an atomic execution unit.
type levelDBTxn struct {
kv *LevelDBKV
ctx context.Context
// mu protects batch.
mu syncutil.Mutex
batch *leveldb.Batch
}

// RunInTxn runs user provided function f in a transaction.
// If user provided function returns error, then transaction will not be committed.
func (kv *LevelDBKV) RunInTxn(ctx context.Context, f func(txn Txn) error) error {
txn := &levelDBTxn{
kv: kv,
ctx: ctx,
batch: new(leveldb.Batch),
}
err := f(txn)
if err != nil {
return err
}
return txn.commit()
}

// Save puts a save operation with target key value into levelDB batch.
func (txn *levelDBTxn) Save(key, value string) error {
txn.mu.Lock()
defer txn.mu.Unlock()

txn.batch.Put([]byte(key), []byte(value))
return nil
}

// Remove puts a delete operation with target key into levelDB batch.
func (txn *levelDBTxn) Remove(key string) error {
txn.mu.Lock()
defer txn.mu.Unlock()

txn.batch.Delete([]byte(key))
return nil
}

// Load executes base's load.
func (txn *levelDBTxn) Load(key string) (string, error) {
return txn.kv.Load(key)
}

// LoadRange executes base's load range.
func (txn *levelDBTxn) LoadRange(key, endKey string, limit int) (keys []string, values []string, err error) {
return txn.kv.LoadRange(key, endKey, limit)
}

// commit writes the batch constructed into levelDB.
func (txn *levelDBTxn) commit() error {
// Check context first to make sure transaction is not cancelled.
select {
default:
case <-txn.ctx.Done():
return txn.ctx.Err()
}

txn.mu.Lock()
defer txn.mu.Unlock()

return txn.kv.Write(txn.batch, nil)
}
Loading

0 comments on commit 1b778f2

Please sign in to comment.