From 1b778f235434e440d3533ec18ba51b0e77215a84 Mon Sep 17 00:00:00 2001 From: David <8039876+AmoebaProtozoa@users.noreply.github.com> Date: Tue, 10 Jan 2023 17:08:23 +0800 Subject: [PATCH] kv: added RunInTransaction style transaction interface (#5347) ref tikv/pd#5293 Signed-off-by: David <8039876+AmoebaProtozoa@users.noreply.github.com> Co-authored-by: Ti Chi Robot --- pkg/storage/kv/etcd_kv.go | 113 ++++++++++++++++++++++++++++++++++++ pkg/storage/kv/kv.go | 30 ++++++++-- pkg/storage/kv/kv_test.go | 53 +++++++++++++++++ pkg/storage/kv/levedb_kv.go | 71 ++++++++++++++++++++++ pkg/storage/kv/mem_kv.go | 102 ++++++++++++++++++++++++++++++++ 5 files changed, 365 insertions(+), 4 deletions(-) diff --git a/pkg/storage/kv/etcd_kv.go b/pkg/storage/kv/etcd_kv.go index 5f8f92e4e98..94646689676 100644 --- a/pkg/storage/kv/etcd_kv.go +++ b/pkg/storage/kv/etcd_kv.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/syncutil" "go.etcd.io/etcd/clientv3" "go.uber.org/zap" ) @@ -181,3 +182,115 @@ func (t *SlowLogTxn) Commit() (*clientv3.TxnResponse, error) { return resp, errors.WithStack(err) } + +// etcdTxn is used to record user's action during RunInTxn, +// It stores modification in operations to apply as a single transaction during commit. +// All load/loadRange result will be stored in conditions. +// Transaction commit will be successful only if all conditions are met, +// aka, no other transaction has modified values loaded during current transaction. +type etcdTxn struct { + kv *etcdKVBase + ctx context.Context + // mu protects conditions and operations. + mu syncutil.Mutex + conditions []clientv3.Cmp + operations []clientv3.Op +} + +// RunInTxn runs user provided function f in a transaction. +func (kv *etcdKVBase) RunInTxn(ctx context.Context, f func(txn Txn) error) error { + txn := &etcdTxn{ + kv: kv, + ctx: ctx, + } + err := f(txn) + if err != nil { + return err + } + return txn.commit() +} + +// Save puts a put operation into operations. +// Note that save result are not immediately observable before current transaction commit. +func (txn *etcdTxn) Save(key, value string) error { + key = path.Join(txn.kv.rootPath, key) + operation := clientv3.OpPut(key, value) + txn.mu.Lock() + defer txn.mu.Unlock() + txn.operations = append(txn.operations, operation) + return nil +} + +// Remove puts a delete operation into operations. +func (txn *etcdTxn) Remove(key string) error { + key = path.Join(txn.kv.rootPath, key) + operation := clientv3.OpDelete(key) + txn.mu.Lock() + defer txn.mu.Unlock() + txn.operations = append(txn.operations, operation) + return nil +} + +// Load loads the target value from etcd and puts a comparator into conditions. +func (txn *etcdTxn) Load(key string) (string, error) { + key = path.Join(txn.kv.rootPath, key) + resp, err := etcdutil.EtcdKVGet(txn.kv.client, key) + if err != nil { + return "", err + } + var condition clientv3.Cmp + var value string + switch respLen := len(resp.Kvs); { + case respLen == 0: + // If target key does not contain a value, pin the CreateRevision of the key to 0. + // Returned value should be empty string. + value = "" + condition = clientv3.Compare(clientv3.CreateRevision(key), "=", 0) + case respLen == 1: + // If target key has value, must make sure it stays the same at the time of commit. + value = string(resp.Kvs[0].Value) + condition = clientv3.Compare(clientv3.Value(key), "=", value) + default: + // If response contains multiple kvs, error occurred. + return "", errs.ErrEtcdKVGetResponse.GenWithStackByArgs(resp.Kvs) + } + // Append the check condition to transaction. + txn.mu.Lock() + defer txn.mu.Unlock() + txn.conditions = append(txn.conditions, condition) + return value, nil +} + +// LoadRange loads the target range from etcd, +// Then for each value loaded, it puts a comparator into conditions. +func (txn *etcdTxn) LoadRange(key, endKey string, limit int) (keys []string, values []string, err error) { + keys, values, err = txn.kv.LoadRange(key, endKey, limit) + // If LoadRange failed, preserve the failure behavior of base LoadRange. + if err != nil { + return keys, values, err + } + // If LoadRange successful, must make sure values stay the same before commit. + txn.mu.Lock() + defer txn.mu.Unlock() + for i := range keys { + fullKey := path.Join(txn.kv.rootPath, keys[i]) + condition := clientv3.Compare(clientv3.Value(fullKey), "=", values[i]) + txn.conditions = append(txn.conditions, condition) + } + return keys, values, err +} + +// commit perform the operations on etcd, with pre-condition that values observed by user has not been changed. +func (txn *etcdTxn) commit() error { + baseTxn := txn.kv.client.Txn(txn.ctx) + baseTxn.If(txn.conditions...) + baseTxn.Then(txn.operations...) + resp, err := baseTxn.Commit() + if err != nil { + return err + } + if !resp.Succeeded { + return errs.ErrEtcdTxnConflict.FastGenByArgs() + } + return nil +} diff --git a/pkg/storage/kv/kv.go b/pkg/storage/kv/kv.go index 2f1fa06e144..a6e870db9c9 100644 --- a/pkg/storage/kv/kv.go +++ b/pkg/storage/kv/kv.go @@ -14,10 +14,32 @@ package kv -// Base is an abstract interface for load/save pd cluster data. -type Base interface { - Load(key string) (string, error) - LoadRange(key, endKey string, limit int) (keys []string, values []string, err error) +import "context" + +// Txn bundles multiple operations into a single executable unit. +// It enables kv to atomically apply a set of updates. +type Txn interface { Save(key, value string) error Remove(key string) error + Load(key string) (string, error) + LoadRange(key, endKey string, limit int) (keys []string, values []string, err error) +} + +// Base is an abstract interface for load/save pd cluster data. +type Base interface { + Txn + // RunInTxn runs the user provided function in a Transaction. + // If user provided function f returns a non-nil error, then + // transaction will not be committed, the same error will be + // returned by RunInTxn. + // Otherwise, it returns the error occurred during the + // transaction. + // Note that transaction are not committed until RunInTxn returns nil. + // Note: + // 1. Load and LoadRange operations provides only stale read. + // Values saved/ removed during transaction will not be immediately + // observable in the same transaction. + // 2. Only when storage is etcd, does RunInTxn checks that + // values loaded during transaction has not been modified before commit. + RunInTxn(ctx context.Context, f func(txn Txn) error) error } diff --git a/pkg/storage/kv/kv_test.go b/pkg/storage/kv/kv_test.go index 509376c864f..9474eb23f4f 100644 --- a/pkg/storage/kv/kv_test.go +++ b/pkg/storage/kv/kv_test.go @@ -15,6 +15,7 @@ package kv import ( + "context" "fmt" "net/url" "path" @@ -45,6 +46,8 @@ func TestEtcd(t *testing.T) { kv := NewEtcdKVBase(client, rootPath) testReadWrite(re, kv) testRange(re, kv) + testSaveMultiple(re, kv, 20) + testLoadConflict(re, kv) } func TestLevelDB(t *testing.T) { @@ -55,6 +58,7 @@ func TestLevelDB(t *testing.T) { testReadWrite(re, kv) testRange(re, kv) + testSaveMultiple(re, kv, 20) } func TestMemKV(t *testing.T) { @@ -62,6 +66,7 @@ func TestMemKV(t *testing.T) { kv := NewMemoryKV() testReadWrite(re, kv) testRange(re, kv) + testSaveMultiple(re, kv, 20) } func testReadWrite(re *require.Assertions, kv Base) { @@ -137,3 +142,51 @@ func newTestSingleConfig(t *testing.T) *embed.Config { cfg.ClusterState = embed.ClusterStateFlagNew return cfg } + +func testSaveMultiple(re *require.Assertions, kv Base, count int) { + err := kv.RunInTxn(context.Background(), func(txn Txn) error { + var saveErr error + for i := 0; i < count; i++ { + saveErr = txn.Save("key"+strconv.Itoa(i), "val"+strconv.Itoa(i)) + if saveErr != nil { + return saveErr + } + } + return nil + }) + re.NoError(err) + for i := 0; i < count; i++ { + val, loadErr := kv.Load("key" + strconv.Itoa(i)) + re.NoError(loadErr) + re.Equal("val"+strconv.Itoa(i), val) + } +} + +// testLoadConflict checks that if any value loaded during the current transaction +// has been modified by another transaction before the current one commit, +// then the current transaction must fail. +func testLoadConflict(re *require.Assertions, kv Base) { + re.NoError(kv.Save("testKey", "initialValue")) + // loader loads the test key value. + loader := func(txn Txn) error { + _, err := txn.Load("testKey") + if err != nil { + return err + } + return nil + } + // When no other writer, loader must succeed. + re.NoError(kv.RunInTxn(context.Background(), loader)) + + conflictLoader := func(txn Txn) error { + _, err := txn.Load("testKey") + // update key after load. + re.NoError(kv.Save("testKey", "newValue")) + if err != nil { + return err + } + return nil + } + // When other writer exists, loader must error. + re.Error(kv.RunInTxn(context.Background(), conflictLoader)) +} diff --git a/pkg/storage/kv/levedb_kv.go b/pkg/storage/kv/levedb_kv.go index 92dd05a7810..6f93cd0237f 100644 --- a/pkg/storage/kv/levedb_kv.go +++ b/pkg/storage/kv/levedb_kv.go @@ -15,10 +15,13 @@ package kv import ( + "context" + "github.com/pingcap/errors" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/util" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/utils/syncutil" ) // LevelDBKV is a kv store using LevelDB. @@ -74,3 +77,71 @@ func (kv *LevelDBKV) Save(key, value string) error { func (kv *LevelDBKV) Remove(key string) error { return errors.WithStack(kv.Delete([]byte(key), nil)) } + +// levelDBTxn implements kv.Txn. +// It utilizes leveldb.Batch to batch user operations to an atomic execution unit. +type levelDBTxn struct { + kv *LevelDBKV + ctx context.Context + // mu protects batch. + mu syncutil.Mutex + batch *leveldb.Batch +} + +// RunInTxn runs user provided function f in a transaction. +// If user provided function returns error, then transaction will not be committed. +func (kv *LevelDBKV) RunInTxn(ctx context.Context, f func(txn Txn) error) error { + txn := &levelDBTxn{ + kv: kv, + ctx: ctx, + batch: new(leveldb.Batch), + } + err := f(txn) + if err != nil { + return err + } + return txn.commit() +} + +// Save puts a save operation with target key value into levelDB batch. +func (txn *levelDBTxn) Save(key, value string) error { + txn.mu.Lock() + defer txn.mu.Unlock() + + txn.batch.Put([]byte(key), []byte(value)) + return nil +} + +// Remove puts a delete operation with target key into levelDB batch. +func (txn *levelDBTxn) Remove(key string) error { + txn.mu.Lock() + defer txn.mu.Unlock() + + txn.batch.Delete([]byte(key)) + return nil +} + +// Load executes base's load. +func (txn *levelDBTxn) Load(key string) (string, error) { + return txn.kv.Load(key) +} + +// LoadRange executes base's load range. +func (txn *levelDBTxn) LoadRange(key, endKey string, limit int) (keys []string, values []string, err error) { + return txn.kv.LoadRange(key, endKey, limit) +} + +// commit writes the batch constructed into levelDB. +func (txn *levelDBTxn) commit() error { + // Check context first to make sure transaction is not cancelled. + select { + default: + case <-txn.ctx.Done(): + return txn.ctx.Err() + } + + txn.mu.Lock() + defer txn.mu.Unlock() + + return txn.kv.Write(txn.batch, nil) +} diff --git a/pkg/storage/kv/mem_kv.go b/pkg/storage/kv/mem_kv.go index 71faf8caa36..91d13c04e61 100644 --- a/pkg/storage/kv/mem_kv.go +++ b/pkg/storage/kv/mem_kv.go @@ -15,6 +15,8 @@ package kv import ( + "context" + "github.com/google/btree" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -89,3 +91,103 @@ func (kv *memoryKV) Remove(key string) error { kv.tree.Delete(memoryKVItem{key, ""}) return nil } + +// memTxn implements kv.Txn. +type memTxn struct { + kv *memoryKV + ctx context.Context + // mu protects ops. + mu syncutil.Mutex + ops []*op +} + +// op represents an Operation that memKV can execute. +type op struct { + t opType + key string + val string +} + +type opType int + +const ( + tPut opType = iota + tDelete +) + +// RunInTxn runs the user provided function f in a transaction. +// If user provided function returns error, then transaction will not be committed. +func (kv *memoryKV) RunInTxn(ctx context.Context, f func(txn Txn) error) error { + txn := &memTxn{ + kv: kv, + ctx: ctx, + } + err := f(txn) + if err != nil { + return err + } + return txn.commit() +} + +// Save appends a save operation to ops. +func (txn *memTxn) Save(key, value string) error { + txn.mu.Lock() + defer txn.mu.Unlock() + + txn.ops = append(txn.ops, &op{ + t: tPut, + key: key, + val: value, + }) + return nil +} + +// Remove appends a remove operation to ops. +func (txn *memTxn) Remove(key string) error { + txn.mu.Lock() + defer txn.mu.Unlock() + + txn.ops = append(txn.ops, &op{ + t: tDelete, + key: key, + }) + return nil +} + +// Load executes base's load directly. +func (txn *memTxn) Load(key string) (string, error) { + return txn.kv.Load(key) +} + +// LoadRange executes base's load range directly. +func (txn *memTxn) LoadRange(key, endKey string, limit int) (keys []string, values []string, err error) { + return txn.kv.LoadRange(key, endKey, limit) +} + +// commit executes operations in ops. +func (txn *memTxn) commit() error { + // Check context first to make sure transaction is not cancelled. + select { + default: + case <-txn.ctx.Done(): + return txn.ctx.Err() + } + // Lock txn.mu to protect memTxn ops. + txn.mu.Lock() + defer txn.mu.Unlock() + // Lock kv.lock to protect the execution of the batch, + // making the execution atomic. + txn.kv.Lock() + defer txn.kv.Unlock() + // Execute mutations in order. + // Note: executions in mem_kv never fails. + for _, op := range txn.ops { + switch op.t { + case tPut: + txn.kv.tree.ReplaceOrInsert(memoryKVItem{op.key, op.val}) + case tDelete: + txn.kv.tree.Delete(memoryKVItem{op.key, ""}) + } + } + return nil +}