Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: unify the keyspace naming #5863

Merged
merged 1 commit into from
Jan 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 24 additions & 24 deletions pkg/storage/endpoint/gc_key_space.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,33 +29,33 @@ import (
"go.uber.org/zap"
)

// KeySpaceGCSafePoint is gcWorker's safepoint for specific key-space
type KeySpaceGCSafePoint struct {
// KeyspaceGCSafePoint is gcWorker's safepoint for specific key-space
type KeyspaceGCSafePoint struct {
SpaceID string `json:"space_id"`
SafePoint uint64 `json:"safe_point,omitempty"`
}

// KeySpaceGCSafePointStorage defines the storage operations on KeySpaces' safe points
type KeySpaceGCSafePointStorage interface {
// KeyspaceGCSafePointStorage defines the storage operations on Keyspaces' safe points
type KeyspaceGCSafePointStorage interface {
// Service safe point interfaces.
SaveServiceSafePoint(spaceID string, ssp *ServiceSafePoint) error
LoadServiceSafePoint(spaceID, serviceID string) (*ServiceSafePoint, error)
LoadMinServiceSafePoint(spaceID string, now time.Time) (*ServiceSafePoint, error)
RemoveServiceSafePoint(spaceID, serviceID string) error
// GC safe point interfaces.
SaveKeySpaceGCSafePoint(spaceID string, safePoint uint64) error
LoadKeySpaceGCSafePoint(spaceID string) (uint64, error)
LoadAllKeySpaceGCSafePoints(withGCSafePoint bool) ([]*KeySpaceGCSafePoint, error)
SaveKeyspaceGCSafePoint(spaceID string, safePoint uint64) error
LoadKeyspaceGCSafePoint(spaceID string) (uint64, error)
LoadAllKeyspaceGCSafePoints(withGCSafePoint bool) ([]*KeyspaceGCSafePoint, error)
}

var _ KeySpaceGCSafePointStorage = (*StorageEndpoint)(nil)
var _ KeyspaceGCSafePointStorage = (*StorageEndpoint)(nil)

// SaveServiceSafePoint saves service safe point under given key-space.
func (se *StorageEndpoint) SaveServiceSafePoint(spaceID string, ssp *ServiceSafePoint) error {
if ssp.ServiceID == "" {
return errors.New("service id of service safepoint cannot be empty")
}
key := KeySpaceServiceSafePointPath(spaceID, ssp.ServiceID)
key := KeyspaceServiceSafePointPath(spaceID, ssp.ServiceID)
value, err := json.Marshal(ssp)
if err != nil {
return err
Expand All @@ -66,7 +66,7 @@ func (se *StorageEndpoint) SaveServiceSafePoint(spaceID string, ssp *ServiceSafe
// LoadServiceSafePoint reads ServiceSafePoint for the given key-space ID and service name.
// Return nil if no safepoint exist for given service or just expired.
func (se *StorageEndpoint) LoadServiceSafePoint(spaceID, serviceID string) (*ServiceSafePoint, error) {
key := KeySpaceServiceSafePointPath(spaceID, serviceID)
key := KeyspaceServiceSafePointPath(spaceID, serviceID)
value, err := se.Load(key)
if err != nil || value == "" {
return nil, err
Expand All @@ -90,7 +90,7 @@ func (se *StorageEndpoint) LoadServiceSafePoint(spaceID, serviceID string) (*Ser
// Note that gc worker safe point are store separately.
// If no service safe point exist for the given key-space or all the service safe points just expired, return nil.
func (se *StorageEndpoint) LoadMinServiceSafePoint(spaceID string, now time.Time) (*ServiceSafePoint, error) {
prefix := KeySpaceServiceSafePointPrefix(spaceID)
prefix := KeyspaceServiceSafePointPrefix(spaceID)
prefixEnd := clientv3.GetPrefixRangeEnd(prefix)
keys, values, err := se.LoadRange(prefix, prefixEnd, 0)
if err != nil {
Expand Down Expand Up @@ -141,20 +141,20 @@ func (se *StorageEndpoint) LoadMinServiceSafePoint(spaceID string, now time.Time

// RemoveServiceSafePoint removes target ServiceSafePoint
func (se *StorageEndpoint) RemoveServiceSafePoint(spaceID, serviceID string) error {
key := KeySpaceServiceSafePointPath(spaceID, serviceID)
key := KeyspaceServiceSafePointPath(spaceID, serviceID)
return se.Remove(key)
}

// SaveKeySpaceGCSafePoint saves GCSafePoint to the given key-space.
func (se *StorageEndpoint) SaveKeySpaceGCSafePoint(spaceID string, safePoint uint64) error {
// SaveKeyspaceGCSafePoint saves GCSafePoint to the given key-space.
func (se *StorageEndpoint) SaveKeyspaceGCSafePoint(spaceID string, safePoint uint64) error {
value := strconv.FormatUint(safePoint, 16)
return se.Save(KeySpaceGCSafePointPath(spaceID), value)
return se.Save(KeyspaceGCSafePointPath(spaceID), value)
}

// LoadKeySpaceGCSafePoint reads GCSafePoint for the given key-space.
// LoadKeyspaceGCSafePoint reads GCSafePoint for the given key-space.
// Returns 0 if target safepoint not exist.
func (se *StorageEndpoint) LoadKeySpaceGCSafePoint(spaceID string) (uint64, error) {
value, err := se.Load(KeySpaceGCSafePointPath(spaceID))
func (se *StorageEndpoint) LoadKeyspaceGCSafePoint(spaceID string) (uint64, error) {
value, err := se.Load(KeyspaceGCSafePointPath(spaceID))
if err != nil || value == "" {
return 0, err
}
Expand All @@ -165,23 +165,23 @@ func (se *StorageEndpoint) LoadKeySpaceGCSafePoint(spaceID string) (uint64, erro
return safePoint, nil
}

// LoadAllKeySpaceGCSafePoints returns slice of KeySpaceGCSafePoint.
// LoadAllKeyspaceGCSafePoints returns slice of KeyspaceGCSafePoint.
// If withGCSafePoint set to false, returned safePoints will be 0.
func (se *StorageEndpoint) LoadAllKeySpaceGCSafePoints(withGCSafePoint bool) ([]*KeySpaceGCSafePoint, error) {
prefix := KeySpaceSafePointPrefix()
func (se *StorageEndpoint) LoadAllKeyspaceGCSafePoints(withGCSafePoint bool) ([]*KeyspaceGCSafePoint, error) {
prefix := KeyspaceSafePointPrefix()
prefixEnd := clientv3.GetPrefixRangeEnd(prefix)
suffix := KeySpaceGCSafePointSuffix()
suffix := KeyspaceGCSafePointSuffix()
keys, values, err := se.LoadRange(prefix, prefixEnd, 0)
if err != nil {
return nil, err
}
safePoints := make([]*KeySpaceGCSafePoint, 0, len(values))
safePoints := make([]*KeyspaceGCSafePoint, 0, len(values))
for i := range keys {
// skip non gc safe points
if !strings.HasSuffix(keys[i], suffix) {
continue
}
safePoint := &KeySpaceGCSafePoint{}
safePoint := &KeyspaceGCSafePoint{}
spaceID := strings.TrimPrefix(keys[i], prefix)
spaceID = strings.TrimSuffix(spaceID, suffix)
safePoint.SpaceID = spaceID
Expand Down
22 changes: 11 additions & 11 deletions pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,33 +148,33 @@ func ExternalTimestampPath() string {
return path.Join(clusterPath, externalTimeStamp)
}

// KeySpaceServiceSafePointPrefix returns the prefix of given service's service safe point.
// KeyspaceServiceSafePointPrefix returns the prefix of given service's service safe point.
// Prefix: /keyspaces/gc_safepoint/{space_id}/service/
func KeySpaceServiceSafePointPrefix(spaceID string) string {
func KeyspaceServiceSafePointPrefix(spaceID string) string {
return path.Join(keyspaceSafePointPrefix, spaceID, "service") + "/"
}

// KeySpaceGCSafePointPath returns the gc safe point's path of the given key-space.
// KeyspaceGCSafePointPath returns the gc safe point's path of the given key-space.
// Path: /keyspaces/gc_safepoint/{space_id}/gc
func KeySpaceGCSafePointPath(spaceID string) string {
func KeyspaceGCSafePointPath(spaceID string) string {
return path.Join(keyspaceSafePointPrefix, spaceID, keyspaceGCSafePointSuffix)
}

// KeySpaceServiceSafePointPath returns the path of given service's service safe point.
// KeyspaceServiceSafePointPath returns the path of given service's service safe point.
// Path: /keyspaces/gc_safepoint/{space_id}/service/{service_id}
func KeySpaceServiceSafePointPath(spaceID, serviceID string) string {
return path.Join(KeySpaceServiceSafePointPrefix(spaceID), serviceID)
func KeyspaceServiceSafePointPath(spaceID, serviceID string) string {
return path.Join(KeyspaceServiceSafePointPrefix(spaceID), serviceID)
}

// KeySpaceSafePointPrefix returns prefix for all key-spaces' safe points.
// KeyspaceSafePointPrefix returns prefix for all key-spaces' safe points.
// Path: /keyspaces/gc_safepoint/
func KeySpaceSafePointPrefix() string {
func KeyspaceSafePointPrefix() string {
return keyspaceSafePointPrefix + "/"
}

// KeySpaceGCSafePointSuffix returns the suffix for any gc safepoint.
// KeyspaceGCSafePointSuffix returns the suffix for any gc safepoint.
// Postfix: /gc
func KeySpaceGCSafePointSuffix() string {
func KeyspaceGCSafePointSuffix() string {
return "/" + keyspaceGCSafePointSuffix
}

Expand Down
2 changes: 1 addition & 1 deletion server/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Storage interface {
endpoint.GCSafePointStorage
endpoint.MinResolvedTSStorage
endpoint.ExternalTSStorage
endpoint.KeySpaceGCSafePointStorage
endpoint.KeyspaceGCSafePointStorage
endpoint.KeyspaceStorage
endpoint.ResourceGroupStorage
}
Expand Down
32 changes: 16 additions & 16 deletions server/storage/storage_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,28 +97,28 @@ func TestLoadMinServiceSafePoint(t *testing.T) {
{ServiceID: "2", ExpiredAt: expireAt3, SafePoint: 300},
}

testKeySpace := "test"
testKeyspace := "test"
for _, serviceSafePoint := range serviceSafePoints {
re.NoError(storage.SaveServiceSafePoint(testKeySpace, serviceSafePoint))
re.NoError(storage.SaveServiceSafePoint(testKeyspace, serviceSafePoint))
}
// enabling failpoint to make expired key removal immediately observable
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/storage/endpoint/removeExpiredKeys", "return(true)"))
minSafePoint, err := storage.LoadMinServiceSafePoint(testKeySpace, currentTime)
minSafePoint, err := storage.LoadMinServiceSafePoint(testKeyspace, currentTime)
re.NoError(err)
re.Equal(serviceSafePoints[0], minSafePoint)

// the safePoint with ServiceID 0 should be removed due to expiration
minSafePoint2, err := storage.LoadMinServiceSafePoint(testKeySpace, currentTime.Add(150*time.Second))
minSafePoint2, err := storage.LoadMinServiceSafePoint(testKeyspace, currentTime.Add(150*time.Second))
re.NoError(err)
re.Equal(serviceSafePoints[1], minSafePoint2)

// verify that service safe point with ServiceID 0 has been removed
ssp, err := storage.LoadServiceSafePoint(testKeySpace, "0")
ssp, err := storage.LoadServiceSafePoint(testKeyspace, "0")
re.NoError(err)
re.Nil(ssp)

// all remaining service safePoints should be removed due to expiration
ssp, err = storage.LoadMinServiceSafePoint(testKeySpace, currentTime.Add(500*time.Second))
ssp, err = storage.LoadMinServiceSafePoint(testKeyspace, currentTime.Add(500*time.Second))
re.NoError(err)
re.Nil(ssp)
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/storage/endpoint/removeExpiredKeys"))
Expand Down Expand Up @@ -151,23 +151,23 @@ func TestSaveLoadGCSafePoint(t *testing.T) {
for i := range testSpaceIDs {
testSpaceID := testSpaceIDs[i]
testSafePoint := testSafePoints[i]
err := storage.SaveKeySpaceGCSafePoint(testSpaceID, testSafePoint)
err := storage.SaveKeyspaceGCSafePoint(testSpaceID, testSafePoint)
re.NoError(err)
loaded, err := storage.LoadKeySpaceGCSafePoint(testSpaceID)
loaded, err := storage.LoadKeyspaceGCSafePoint(testSpaceID)
re.NoError(err)
re.Equal(testSafePoint, loaded)
}
}

func TestLoadAllKeySpaceGCSafePoints(t *testing.T) {
func TestLoadAllKeyspaceGCSafePoints(t *testing.T) {
re := require.New(t)
storage := NewStorageWithMemoryBackend()
testSpaceIDs, testSafePoints := testGCSafePoints()
for i := range testSpaceIDs {
err := storage.SaveKeySpaceGCSafePoint(testSpaceIDs[i], testSafePoints[i])
err := storage.SaveKeyspaceGCSafePoint(testSpaceIDs[i], testSafePoints[i])
re.NoError(err)
}
loadedSafePoints, err := storage.LoadAllKeySpaceGCSafePoints(true)
loadedSafePoints, err := storage.LoadAllKeyspaceGCSafePoints(true)
re.NoError(err)
for i := range loadedSafePoints {
re.Equal(testSpaceIDs[i], loadedSafePoints[i].SpaceID)
Expand All @@ -181,15 +181,15 @@ func TestLoadAllKeySpaceGCSafePoints(t *testing.T) {
}

// verify that service safe points do not interfere with gc safe points.
loadedSafePoints, err = storage.LoadAllKeySpaceGCSafePoints(true)
loadedSafePoints, err = storage.LoadAllKeyspaceGCSafePoints(true)
re.NoError(err)
for i := range loadedSafePoints {
re.Equal(testSpaceIDs[i], loadedSafePoints[i].SpaceID)
re.Equal(testSafePoints[i], loadedSafePoints[i].SafePoint)
}

// verify that when withGCSafePoint set to false, returned safePoints is 0
loadedSafePoints, err = storage.LoadAllKeySpaceGCSafePoints(false)
loadedSafePoints, err = storage.LoadAllKeyspaceGCSafePoints(false)
re.NoError(err)
for i := range loadedSafePoints {
re.Equal(testSpaceIDs[i], loadedSafePoints[i].SpaceID)
Expand All @@ -202,17 +202,17 @@ func TestLoadEmpty(t *testing.T) {
storage := NewStorageWithMemoryBackend()

// loading non-existing GC safepoint should return 0
gcSafePoint, err := storage.LoadKeySpaceGCSafePoint("testKeySpace")
gcSafePoint, err := storage.LoadKeyspaceGCSafePoint("testKeyspace")
re.NoError(err)
re.Equal(uint64(0), gcSafePoint)

// loading non-existing service safepoint should return nil
serviceSafePoint, err := storage.LoadServiceSafePoint("testKeySpace", "testService")
serviceSafePoint, err := storage.LoadServiceSafePoint("testKeyspace", "testService")
re.NoError(err)
re.Nil(serviceSafePoint)

// loading empty key spaces should return empty slices
safePoints, err := storage.LoadAllKeySpaceGCSafePoints(true)
safePoints, err := storage.LoadAllKeyspaceGCSafePoints(true)
re.NoError(err)
re.Empty(safePoints)
}