Skip to content

Commit

Permalink
*: use txn for saving timestamp (#6199)
Browse files Browse the repository at this point in the history
ref #5895

Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
rleungx and ti-chi-bot authored Mar 22, 2023
1 parent 738e15f commit 27b9474
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 2 deletions.
24 changes: 22 additions & 2 deletions pkg/storage/endpoint/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
package endpoint

import (
"context"
"strings"
"time"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/typeutil"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
Expand Down Expand Up @@ -65,6 +67,24 @@ func (se *StorageEndpoint) LoadTimestamp(prefix string) (time.Time, error) {

// SaveTimestamp saves the timestamp to the storage.
func (se *StorageEndpoint) SaveTimestamp(key string, ts time.Time) error {
data := typeutil.Uint64ToBytes(uint64(ts.UnixNano()))
return se.Save(key, string(data))
return se.RunInTxn(context.Background(), func(txn kv.Txn) error {
value, err := txn.Load(key)
if err != nil {
return err
}

previousTS := typeutil.ZeroTime
if value != "" {
previousTS, err = typeutil.ParseTimestamp([]byte(value))
if err != nil {
log.Error("parse timestamp failed", zap.String("key", key), zap.String("value", value), zap.Error(err))
return err
}
}
if previousTS != typeutil.ZeroTime && typeutil.SubRealTimeByWallClock(ts, previousTS) <= 0 {
return nil
}
data := typeutil.Uint64ToBytes(uint64(ts.UnixNano()))
return txn.Save(key, string(data))
})
}
31 changes: 31 additions & 0 deletions pkg/storage/storage_tso_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,34 @@ func TestGlobalLocalTimestamp(t *testing.T) {
re.NoError(err)
re.Equal(localTS1, ts)
}

func TestTimestampTxn(t *testing.T) {
re := require.New(t)

cfg := etcdutil.NewTestSingleConfig(t)
etcd, err := embed.StartEtcd(cfg)
re.NoError(err)
defer etcd.Close()

ep := cfg.LCUrls[0].String()
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
})
re.NoError(err)
rootPath := path.Join("/pd", strconv.FormatUint(100, 10))
storage := NewStorageWithEtcdBackend(client, rootPath)

timestampKey := "timestamp"

globalTS1 := time.Now().Round(0)
err = storage.SaveTimestamp(timestampKey, globalTS1)
re.NoError(err)

globalTS2 := globalTS1.Add(-time.Millisecond).Round(0)
err = storage.SaveTimestamp(timestampKey, globalTS2)
re.NoError(err)

ts, err := storage.LoadTimestamp("")
re.NoError(err)
re.Equal(globalTS1, ts)
}

0 comments on commit 27b9474

Please sign in to comment.