From 27b9474ccff9880ffed1bef8bb390e70fb8fc9d1 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 22 Mar 2023 14:58:43 +0800 Subject: [PATCH] *: use txn for saving timestamp (#6199) ref tikv/pd#5895 Signed-off-by: Ryan Leung Co-authored-by: Ti Chi Robot --- pkg/storage/endpoint/tso.go | 24 ++++++++++++++++++++++-- pkg/storage/storage_tso_test.go | 31 +++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 2 deletions(-) diff --git a/pkg/storage/endpoint/tso.go b/pkg/storage/endpoint/tso.go index 81f2a2f6fd0..e4838318bd9 100644 --- a/pkg/storage/endpoint/tso.go +++ b/pkg/storage/endpoint/tso.go @@ -15,10 +15,12 @@ package endpoint import ( + "context" "strings" "time" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/typeutil" "go.etcd.io/etcd/clientv3" "go.uber.org/zap" @@ -65,6 +67,24 @@ func (se *StorageEndpoint) LoadTimestamp(prefix string) (time.Time, error) { // SaveTimestamp saves the timestamp to the storage. func (se *StorageEndpoint) SaveTimestamp(key string, ts time.Time) error { - data := typeutil.Uint64ToBytes(uint64(ts.UnixNano())) - return se.Save(key, string(data)) + return se.RunInTxn(context.Background(), func(txn kv.Txn) error { + value, err := txn.Load(key) + if err != nil { + return err + } + + previousTS := typeutil.ZeroTime + if value != "" { + previousTS, err = typeutil.ParseTimestamp([]byte(value)) + if err != nil { + log.Error("parse timestamp failed", zap.String("key", key), zap.String("value", value), zap.Error(err)) + return err + } + } + if previousTS != typeutil.ZeroTime && typeutil.SubRealTimeByWallClock(ts, previousTS) <= 0 { + return nil + } + data := typeutil.Uint64ToBytes(uint64(ts.UnixNano())) + return txn.Save(key, string(data)) + }) } diff --git a/pkg/storage/storage_tso_test.go b/pkg/storage/storage_tso_test.go index 22f6a718022..382e7017896 100644 --- a/pkg/storage/storage_tso_test.go +++ b/pkg/storage/storage_tso_test.go @@ -91,3 +91,34 @@ func TestGlobalLocalTimestamp(t *testing.T) { re.NoError(err) re.Equal(localTS1, ts) } + +func TestTimestampTxn(t *testing.T) { + re := require.New(t) + + cfg := etcdutil.NewTestSingleConfig(t) + etcd, err := embed.StartEtcd(cfg) + re.NoError(err) + defer etcd.Close() + + ep := cfg.LCUrls[0].String() + client, err := clientv3.New(clientv3.Config{ + Endpoints: []string{ep}, + }) + re.NoError(err) + rootPath := path.Join("/pd", strconv.FormatUint(100, 10)) + storage := NewStorageWithEtcdBackend(client, rootPath) + + timestampKey := "timestamp" + + globalTS1 := time.Now().Round(0) + err = storage.SaveTimestamp(timestampKey, globalTS1) + re.NoError(err) + + globalTS2 := globalTS1.Add(-time.Millisecond).Round(0) + err = storage.SaveTimestamp(timestampKey, globalTS2) + re.NoError(err) + + ts, err := storage.LoadTimestamp("") + re.NoError(err) + re.Equal(globalTS1, ts) +}