Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] *: GC API Refactorying #8989

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,5 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/pingcap/kvproto => github.com/MyonKeminta/kvproto v0.0.0-20250108081236-f8be07eac6e5
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/MyonKeminta/kvproto v0.0.0-20250108081236-f8be07eac6e5 h1:aBpjrLJlsSIkxh0pg6QuIAzMbajgPLvoELAnNNyFn2E=
github.com/MyonKeminta/kvproto v0.0.0-20250108081236-f8be07eac6e5/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
Expand Down Expand Up @@ -49,8 +51,6 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE=
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4=
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 h1:xYNSJjYNur4Dr5bV+9BXK9n5E0T1zlcAN25XX68+mOg=
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,11 @@ error = '''
etcd transaction failed, conflicted and rolled back
'''

[PD:etcd:ErrEtcdTxnResponse]
error = '''
etcd transaction returned invalid response: %v
'''

["PD:etcd:ErrEtcdTxnInternal"]
error = '''
internal etcd transaction error occurred
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,5 @@ require (
moul.io/zapgorm2 v1.1.0 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
)

replace github.com/pingcap/kvproto => github.com/MyonKeminta/kvproto v0.0.0-20250108081236-f8be07eac6e5
1,716 changes: 1,707 additions & 9 deletions go.sum

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ var (
ErrEtcdGrantLease = errors.Normalize("etcd lease failed", errors.RFCCodeText("PD:etcd:ErrEtcdGrantLease"))
ErrEtcdTxnInternal = errors.Normalize("internal etcd transaction error occurred", errors.RFCCodeText("PD:etcd:ErrEtcdTxnInternal"))
ErrEtcdTxnConflict = errors.Normalize("etcd transaction failed, conflicted and rolled back", errors.RFCCodeText("PD:etcd:ErrEtcdTxnConflict"))
ErrEtcdTxnResponse = errors.Normalize("etcd transaction returned invalid response: %v", errors.RFCCodeText("PD:etcd:ErrEtcdTxnResponse"))
ErrEtcdKVPut = errors.Normalize("etcd KV put failed", errors.RFCCodeText("PD:etcd:ErrEtcdKVPut"))
ErrEtcdKVDelete = errors.Normalize("etcd KV delete failed", errors.RFCCodeText("PD:etcd:ErrEtcdKVDelete"))
ErrEtcdKVGet = errors.Normalize("etcd KV get failed", errors.RFCCodeText("PD:etcd:ErrEtcdKVGet"))
Expand Down
49 changes: 29 additions & 20 deletions pkg/gc/safepoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/tikv/pd/pkg/mcs/utils/constant"

"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/syncutil"
Expand All @@ -28,31 +29,31 @@ import (
var blockGCSafePointErrmsg = "don't allow update gc safe point v1."
var blockServiceSafepointErrmsg = "don't allow update service safe point v1."

// SafePointManager is the manager for safePoint of GC and services.
type SafePointManager struct {
gcLock syncutil.Mutex
serviceGCLock syncutil.Mutex
store endpoint.GCSafePointStorage
cfg config.PDServerConfig
// GCStateManager is the manager for safePoint of GC and services.
type GCStateManager struct {
lock *syncutil.RWLockGroup
store endpoint.GCStateStorage
cfg config.PDServerConfig
}

// NewSafePointManager creates a SafePointManager of GC and services.
func NewSafePointManager(store endpoint.GCSafePointStorage, cfg config.PDServerConfig) *SafePointManager {
return &SafePointManager{store: store, cfg: cfg}
// NewGCStateManager creates a GCStateManager of GC and services.
func NewGCStateManager(store endpoint.GCStateStorage, cfg config.PDServerConfig) *GCStateManager {
return &GCStateManager{store: store, cfg: cfg}
}

// LoadGCSafePoint loads current GC safe point from storage.
func (manager *SafePointManager) LoadGCSafePoint() (uint64, error) {
return manager.store.LoadGCSafePoint()
func (manager *GCStateManager) LoadGCSafePoint(keyspaceID uint32) (uint64, error) {
// No need to acquire the lock as a single read operation is inherently atomic.
return manager.store.LoadGCSafePoint(keyspaceID)
}

// UpdateGCSafePoint updates the safepoint if it is greater than the previous one
// it returns the old safepoint in the storage.
func (manager *SafePointManager) UpdateGCSafePoint(newSafePoint uint64) (oldSafePoint uint64, err error) {
manager.gcLock.Lock()
defer manager.gcLock.Unlock()
func (manager *GCStateManager) UpdateGCSafePoint(keyspaceID uint32, newSafePoint uint64) (oldSafePoint uint64, err error) {
manager.lock.Lock(keyspaceID)
defer manager.lock.Unlock(keyspaceID)
// TODO: cache the safepoint in the storage.
oldSafePoint, err = manager.store.LoadGCSafePoint()
oldSafePoint, err = manager.store.LoadGCSafePoint(keyspaceID)
if err != nil {
return
}
Expand All @@ -64,20 +65,21 @@ func (manager *SafePointManager) UpdateGCSafePoint(newSafePoint uint64) (oldSafe
if oldSafePoint >= newSafePoint {
return
}
err = manager.store.SaveGCSafePoint(newSafePoint)
if err == nil {
err = manager.store.SaveGCSafePoint(keyspaceID, newSafePoint)
if err == nil && keyspaceID == constant.NullKeyspaceID {
gcSafePointGauge.WithLabelValues("gc_safepoint").Set(float64(newSafePoint))
}
return
}

// UpdateServiceGCSafePoint update the safepoint for a specific service.
func (manager *SafePointManager) UpdateServiceGCSafePoint(serviceID string, newSafePoint uint64, ttl int64, now time.Time) (minServiceSafePoint *endpoint.ServiceSafePoint, updated bool, err error) {
func (manager *GCStateManager) UpdateServiceGCSafePoint(serviceID string, newSafePoint uint64, ttl int64, now time.Time) (minServiceSafePoint *endpoint.ServiceSafePoint, updated bool, err error) {
if manager.cfg.BlockSafePointV1 {
return nil, false, errors.New(blockServiceSafepointErrmsg)
}
manager.serviceGCLock.Lock()
defer manager.serviceGCLock.Unlock()
// This function won't support keyspace as it's being deprecated.
manager.lock.Lock(constant.NullKeyspaceID)
defer manager.lock.Unlock(constant.NullKeyspaceID)
minServiceSafePoint, err = manager.store.LoadMinServiceGCSafePoint(now)
if err != nil || ttl <= 0 || newSafePoint < minServiceSafePoint.SafePoint {
return minServiceSafePoint, false, err
Expand All @@ -101,3 +103,10 @@ func (manager *SafePointManager) UpdateServiceGCSafePoint(serviceID string, newS
}
return minServiceSafePoint, true, err
}

// UpdateTxnSafePoint updates the txn safe point.
func (manager *GCStateManager) UpdateTxnSafePoint(keyspaceID uint32, target uint64) (uint64, error) {
manager.lock.Lock(keyspaceID)
defer manager.lock.Unlock(keyspaceID)

}
10 changes: 5 additions & 5 deletions pkg/gc/safepoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ import (
"github.com/tikv/pd/server/config"
)

func newGCStorage() endpoint.GCSafePointStorage {
func newGCStorage() endpoint.GCStateStorage {
return endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
}

func TestGCSafePointUpdateSequentially(t *testing.T) {
gcSafePointManager := NewSafePointManager(newGCStorage(), config.PDServerConfig{})
gcSafePointManager := NewGCStateManager(newGCStorage(), config.PDServerConfig{})
re := require.New(t)
curSafePoint := uint64(0)
// update gc safePoint with asc value.
Expand Down Expand Up @@ -61,7 +61,7 @@ func TestGCSafePointUpdateSequentially(t *testing.T) {
}

func TestGCSafePointUpdateCurrently(t *testing.T) {
gcSafePointManager := NewSafePointManager(newGCStorage(), config.PDServerConfig{})
gcSafePointManager := NewGCStateManager(newGCStorage(), config.PDServerConfig{})
maxSafePoint := uint64(1000)
wg := sync.WaitGroup{}
re := require.New(t)
Expand All @@ -85,7 +85,7 @@ func TestGCSafePointUpdateCurrently(t *testing.T) {

func TestServiceGCSafePointUpdate(t *testing.T) {
re := require.New(t)
manager := NewSafePointManager(newGCStorage(), config.PDServerConfig{})
manager := NewGCStateManager(newGCStorage(), config.PDServerConfig{})
gcWorkerServiceID := "gc_worker"
cdcServiceID := "cdc"
brServiceID := "br"
Expand Down Expand Up @@ -167,7 +167,7 @@ func TestServiceGCSafePointUpdate(t *testing.T) {

func TestBlockUpdateSafePointV1(t *testing.T) {
re := require.New(t)
manager := NewSafePointManager(newGCStorage(), config.PDServerConfig{BlockSafePointV1: true})
manager := NewGCStateManager(newGCStorage(), config.PDServerConfig{BlockSafePointV1: true})
gcworkerServiceID := "gc_worker"
gcWorkerSafePoint := uint64(8)

Expand Down
4 changes: 2 additions & 2 deletions pkg/gc/safepoint_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ type SafePointV2Manager struct {
// v2Storage is the storage GCSafePointV2 and ServiceSafePointV2.
v2Storage endpoint.SafePointV2Storage
// v1Storage is the storage for v1 format GCSafePoint and ServiceGCSafePoint, it's used during pd update.
v1Storage endpoint.GCSafePointStorage
v1Storage endpoint.GCStateStorage
}

// NewSafePointManagerV2 returns a new SafePointV2Manager.
func NewSafePointManagerV2(
ctx context.Context,
keyspaceStore endpoint.KeyspaceStorage,
v2Storage endpoint.SafePointV2Storage,
v1Storage endpoint.GCSafePointStorage,
v1Storage endpoint.GCStateStorage,
) *SafePointV2Manager {
return &SafePointV2Manager{
ctx: ctx,
Expand Down
33 changes: 24 additions & 9 deletions pkg/storage/endpoint/gc_safe_point.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,34 @@ import (
// ServiceSafePoint is the safepoint for a specific service
// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it.
// This type is in sync with `client/http/types.go`.
// ServiceSafePoint is also directly used for storing GC barriers in order to make GC barriers in new versions
// can be backward-compatible with service safe points in old versions.
type ServiceSafePoint struct {
ServiceID string `json:"service_id"`
ExpiredAt int64 `json:"expired_at"`
SafePoint uint64 `json:"safe_point"`
}

// GCSafePointStorage defines the storage operations on the GC safe point.
type GCSafePointStorage interface {
LoadGCSafePoint() (uint64, error)
SaveGCSafePoint(safePoint uint64) error
LoadMinServiceGCSafePoint(now time.Time) (*ServiceSafePoint, error)
LoadAllServiceGCSafePoints() ([]*ServiceSafePoint, error)
SaveServiceGCSafePoint(ssp *ServiceSafePoint) error
RemoveServiceGCSafePoint(serviceID string) error
type GCBarrier struct {
BarrierID string `json:"barrier_id"`
BarrierTS uint64 `json:"barrier_ts"`
ExpirationTime string `json:"expiration_time"`
}

var _ GCSafePointStorage = (*StorageEndpoint)(nil)
// GCStateStorage defines the storage operations on the GC safe point.
type GCStateStorage interface {
LoadGCSafePoint(keyspaceID uint32) (uint64, error)
SaveGCSafePoint(keyspaceID uint32, target uint64) error
//LoadMinServiceGCSafePoint(now time.Time) (*ServiceSafePoint, error)
//LoadAllServiceGCSafePoints() ([]*ServiceSafePoint, error)
//SaveServiceGCSafePoint(ssp *ServiceSafePoint) error
//RemoveServiceGCSafePoint(serviceID string) error

UpdateTxnSafePoint(keyspaceID, target uint64) (newTxnSafePoint uint64, err error)
SetGCBarrier(keyspaceID uint32, barrierID string, barrierTS uint64, ttl time.Duration) error
}

var _ GCStateStorage = (*StorageEndpoint)(nil)

// LoadGCSafePoint loads current GC safe point from storage.
func (se *StorageEndpoint) LoadGCSafePoint() (uint64, error) {
Expand Down Expand Up @@ -186,3 +197,7 @@ func (se *StorageEndpoint) RemoveServiceGCSafePoint(serviceID string) error {
key := keypath.GCSafePointServicePath(serviceID)
return se.Remove(key)
}

func (se *StorageEndpoint) SetGCBarrier(keyspaceID uint32, barrierID string, barrierTS uint64, ttl time.Duration) error {

}
123 changes: 123 additions & 0 deletions pkg/storage/kv/etcd_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package kv

import (
"context"
"fmt"
"path"
"strings"
"time"
Expand Down Expand Up @@ -139,6 +140,13 @@ func (kv *etcdKVBase) Remove(key string) error {
return nil
}

func (kv *etcdKVBase) CreateLowLevelTxn() LowLevelTxn {
return &lowLevelTxnWrapper{
inner: NewSlowLogTxn(kv.client),
rootPath: kv.rootPath,
}
}

// SlowLogTxn wraps etcd transaction and log slow one.
type SlowLogTxn struct {
clientv3.Txn
Expand Down Expand Up @@ -296,3 +304,118 @@ func (txn *etcdTxn) commit() error {
}
return nil
}

type lowLevelTxnWrapper struct {
inner clientv3.Txn
rootPath string
}

func (l *lowLevelTxnWrapper) If(conditions ...LowLevelTxnCondition) LowLevelTxn {
cmpList := make([]clientv3.Cmp, 0, len(conditions))
for _, c := range conditions {
key := strings.Join([]string{l.rootPath, c.Key}, "/")
if c.CmpType == LowLevelCmpExists {
cmpList = append(cmpList, clientv3.Compare(clientv3.CreateRevision(key), ">", 0))
} else if c.CmpType == LowLevelCmpNotExists {
cmpList = append(cmpList, clientv3.Compare(clientv3.CreateRevision(key), "=", 0))
} else {
var cmpOp string
switch c.CmpType {
case LowLevelCmpEqual:
cmpOp = "="
case LowLevelCmpNotEqual:
cmpOp = "!="
case LowLevelCmpGreater:
cmpOp = ">"
case LowLevelCmpLess:
cmpOp = "<"
default:
panic(fmt.Sprintf("unknown cmp type %v", c.CmpType))
}
cmpList = append(cmpList, clientv3.Compare(clientv3.Value(key), cmpOp, c.Value))
}
}
l.inner = l.inner.If(cmpList...)
return l
}

func (l *lowLevelTxnWrapper) convertOps(ops []LowLevelTxnOp) []clientv3.Op {
opsList := make([]clientv3.Op, 0, len(ops))
for _, op := range ops {
key := strings.Join([]string{l.rootPath, op.Key}, "/")
switch op.OpType {
case LowLevelOpPut:
opsList = append(opsList, clientv3.OpPut(key, op.Value))
case LowLevelOpDelete:
opsList = append(opsList, clientv3.OpDelete(key))
case LowLevelOpGet:
opsList = append(opsList, clientv3.OpGet(key))
case LowLevelOpGetRange:
if op.EndKey == "\x00" {
opsList = append(opsList, clientv3.OpGet(key, clientv3.WithPrefix(), clientv3.WithLimit(int64(op.Limit))))
} else {
endKey := strings.Join([]string{l.rootPath, op.EndKey}, "/")
opsList = append(opsList, clientv3.OpGet(key, clientv3.WithRange(endKey), clientv3.WithLimit(int64(op.Limit))))
}
default:
panic(fmt.Sprintf("unknown op type %v", op.OpType))
}
}
return opsList
}

func (l *lowLevelTxnWrapper) Then(ops ...LowLevelTxnOp) LowLevelTxn {
l.inner = l.inner.Then(l.convertOps(ops)...)
return l
}

func (l *lowLevelTxnWrapper) Else(ops ...LowLevelTxnOp) LowLevelTxn {
l.inner = l.inner.Else(l.convertOps(ops)...)
return l
}

func (l *lowLevelTxnWrapper) Commit(_ctx context.Context) (LowLevelTxnResult, error) {
resp, err := l.inner.Commit()
if err != nil {
return LowLevelTxnResult{}, err
}
items := make([]LowLevelTxnResultItem, 0, len(resp.Responses))
for i, respItem := range resp.Responses {
var resultItem LowLevelTxnResultItem
if put := respItem.GetResponsePut(); put != nil {
// Put and delete operations of etcd's transaction won't return any previous data. Skip handling it.
resultItem = LowLevelTxnResultItem{}
if put.PrevKv != nil {
key := strings.TrimPrefix(strings.TrimPrefix(string(put.PrevKv.Key), l.rootPath), "/")
resultItem.KeyValuePairs = []KeyValuePair{{
Key: key,
Value: string(put.PrevKv.Value),
}}
}
} else if del := respItem.GetResponseDeleteRange(); del != nil {
// Put and delete operations of etcd's transaction won't return any previous data. Skip handling it.
resultItem = LowLevelTxnResultItem{}
} else if rangeResp := respItem.GetResponseRange(); rangeResp != nil {
kvs := make([]KeyValuePair, 0, len(rangeResp.Kvs))
for _, kv := range rangeResp.Kvs {
key := strings.TrimPrefix(strings.TrimPrefix(string(kv.Key), l.rootPath), "/")
kvs = append(kvs, KeyValuePair{
Key: key,
Value: string(kv.Value),
})
}
resultItem = LowLevelTxnResultItem{
KeyValuePairs: kvs,
}
} else {
return LowLevelTxnResult{}, errs.ErrEtcdTxnResponse.GenWithStackByArgs(
fmt.Sprintf("succeeded: %v, index: %v, response: %v", resp.Succeeded, i, respItem),
)
}
items = append(items, resultItem)
}
return LowLevelTxnResult{
Succeeded: resp.Succeeded,
Items: items,
}, nil
}
Loading