Skip to content

Commit

Permalink
keysapce: wait region split when creating keyspace (tikv#6414)
Browse files Browse the repository at this point in the history
ref tikv#6231

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>
Signed-off-by: lhy1024 <admin@liudos.us>

Co-authored-by: zzm <zhouzemin@pingcap.com>
Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored and rleungx committed Aug 2, 2023
1 parent 2f00984 commit 06cfdba
Show file tree
Hide file tree
Showing 12 changed files with 257 additions and 30 deletions.
125 changes: 99 additions & 26 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package keyspace

import (
"bytes"
"context"
"strconv"
"time"
Expand Down Expand Up @@ -55,6 +56,9 @@ const (
// Config is the interface for keyspace config.
type Config interface {
GetPreAlloc() []string
ToWaitRegionSplit() bool
GetWaitRegionSplitTimeout() time.Duration
GetCheckRegionSplitInterval() time.Duration
}

// Manager manages keyspace related data.
Expand Down Expand Up @@ -86,6 +90,8 @@ type CreateKeyspaceRequest struct {
Config map[string]string
// CreateTime is the timestamp used to record creation time.
CreateTime int64
// IsPreAlloc indicates whether the keyspace is pre-allocated when the cluster starts.
IsPreAlloc bool
}

// NewKeyspaceManager creates a Manager of keyspace related data.
Expand All @@ -112,7 +118,7 @@ func NewKeyspaceManager(
// Bootstrap saves default keyspace info.
func (manager *Manager) Bootstrap() error {
// Split Keyspace Region for default keyspace.
if err := manager.splitKeyspaceRegion(utils.DefaultKeyspaceID); err != nil {
if err := manager.splitKeyspaceRegion(utils.DefaultKeyspaceID, false); err != nil {
return err
}
now := time.Now().Unix()
Expand Down Expand Up @@ -148,6 +154,7 @@ func (manager *Manager) Bootstrap() error {
req := &CreateKeyspaceRequest{
Name: keyspaceName,
CreateTime: now,
IsPreAlloc: true,
Config: config,
}
keyspace, err := manager.CreateKeyspace(req)
Expand All @@ -162,6 +169,11 @@ func (manager *Manager) Bootstrap() error {
return nil
}

// UpdateConfig update keyspace manager's config.
func (manager *Manager) UpdateConfig(cfg Config) {
manager.config = cfg
}

// CreateKeyspace create a keyspace meta with given config and save it to storage.
func (manager *Manager) CreateKeyspace(request *CreateKeyspaceRequest) (*keyspacepb.KeyspaceMeta, error) {
// Validate purposed name's legality.
Expand All @@ -173,8 +185,11 @@ func (manager *Manager) CreateKeyspace(request *CreateKeyspaceRequest) (*keyspac
if err != nil {
return nil, err
}
// If the request to create a keyspace is pre-allocated when the PD starts,
// there is no need to wait for the region split, because TiKV has not started.
waitRegionSplit := !request.IsPreAlloc && manager.config.ToWaitRegionSplit()
// Split keyspace region.
err = manager.splitKeyspaceRegion(newID)
err = manager.splitKeyspaceRegion(newID, waitRegionSplit)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -203,7 +218,7 @@ func (manager *Manager) CreateKeyspace(request *CreateKeyspaceRequest) (*keyspac
err = manager.saveNewKeyspace(keyspace)
if err != nil {
log.Warn("[keyspace] failed to create keyspace",
zap.Uint32("ID", keyspace.GetId()),
zap.Uint32("keyspace-id", keyspace.GetId()),
zap.String("name", keyspace.GetName()),
zap.Error(err),
)
Expand All @@ -213,7 +228,7 @@ func (manager *Manager) CreateKeyspace(request *CreateKeyspaceRequest) (*keyspac
return nil, err
}
log.Info("[keyspace] keyspace created",
zap.Uint32("ID", keyspace.GetId()),
zap.Uint32("keyspace-id", keyspace.GetId()),
zap.String("name", keyspace.GetName()),
)
return keyspace, nil
Expand Down Expand Up @@ -252,27 +267,85 @@ func (manager *Manager) saveNewKeyspace(keyspace *keyspacepb.KeyspaceMeta) error

// splitKeyspaceRegion add keyspace's boundaries to region label. The corresponding
// region will then be split by Coordinator's patrolRegion.
func (manager *Manager) splitKeyspaceRegion(id uint32) error {
func (manager *Manager) splitKeyspaceRegion(id uint32, waitRegionSplit bool) (err error) {
failpoint.Inject("skipSplitRegion", func() {
failpoint.Return(nil)
})

start := time.Now()
keyspaceRule := makeLabelRule(id)
if cl, ok := manager.cluster.(interface{ GetRegionLabeler() *labeler.RegionLabeler }); ok {
err := cl.GetRegionLabeler().SetLabelRule(keyspaceRule)
cl, ok := manager.cluster.(interface{ GetRegionLabeler() *labeler.RegionLabeler })
if !ok {
return errors.New("cluster does not support region label")
}
err = cl.GetRegionLabeler().SetLabelRule(keyspaceRule)
if err != nil {
log.Warn("[keyspace] failed to add region label for keyspace",
zap.Uint32("keyspace-id", id),
zap.Error(err),
)
return err
}
defer func() {
if err != nil {
log.Warn("[keyspace] failed to add region label for keyspace",
zap.Uint32("keyspaceID", id),
zap.Error(err),
)
cl.GetRegionLabeler().DeleteLabelRule(keyspaceRule.ID)
}
}()

if waitRegionSplit {
ranges := keyspaceRule.Data.([]*labeler.KeyRangeRule)
if len(ranges) < 2 {
log.Warn("[keyspace] failed to split keyspace region with insufficient range", zap.Any("label-rule", keyspaceRule))
return ErrRegionSplitFailed
}
rawLeftBound, rawRightBound := ranges[0].StartKey, ranges[0].EndKey
txnLeftBound, txnRightBound := ranges[1].StartKey, ranges[1].EndKey

ticker := time.NewTicker(manager.config.GetCheckRegionSplitInterval())
timer := time.NewTimer(manager.config.GetWaitRegionSplitTimeout())
defer func() {
ticker.Stop()
timer.Stop()
}()
for {
select {
case <-ticker.C:
regionsInfo := manager.cluster.GetBasicCluster().RegionsInfo
region := regionsInfo.GetRegionByKey(rawLeftBound)
if region == nil || !bytes.Equal(region.GetStartKey(), rawLeftBound) {
continue
}
region = regionsInfo.GetRegionByKey(rawRightBound)
if region == nil || !bytes.Equal(region.GetStartKey(), rawRightBound) {
continue
}
region = regionsInfo.GetRegionByKey(txnLeftBound)
if region == nil || !bytes.Equal(region.GetStartKey(), txnLeftBound) {
continue
}
region = regionsInfo.GetRegionByKey(txnRightBound)
if region == nil || !bytes.Equal(region.GetStartKey(), txnRightBound) {
continue
}
case <-timer.C:
log.Warn("[keyspace] wait region split timeout",
zap.Uint32("keyspace-id", id),
zap.Error(err),
)
err = ErrRegionSplitTimeout
return
}
log.Info("[keyspace] wait region split successfully", zap.Uint32("keyspace-id", id))
break
}
log.Info("[keyspace] added region label for keyspace",
zap.Uint32("keyspaceID", id),
zap.Any("LabelRule", keyspaceRule),
)
return nil
}
return errors.New("cluster does not support region label")

log.Info("[keyspace] added region label for keyspace",
zap.Uint32("keyspace-id", id),
zap.Any("label-rule", keyspaceRule),
zap.Duration("takes", time.Since(start)),
)
return
}

// LoadKeyspace returns the keyspace specified by name.
Expand Down Expand Up @@ -413,16 +486,16 @@ func (manager *Manager) UpdateKeyspaceConfig(name string, mutations []*Mutation)

if err != nil {
log.Warn("[keyspace] failed to update keyspace config",
zap.Uint32("ID", meta.GetId()),
zap.Uint32("keyspace-id", meta.GetId()),
zap.String("name", meta.GetName()),
zap.Error(err),
)
return nil, err
}
log.Info("[keyspace] keyspace config updated",
zap.Uint32("ID", meta.GetId()),
zap.Uint32("keyspace-id", meta.GetId()),
zap.String("name", meta.GetName()),
zap.Any("new config", meta.GetConfig()),
zap.Any("new-config", meta.GetConfig()),
)
return meta, nil
}
Expand Down Expand Up @@ -465,16 +538,16 @@ func (manager *Manager) UpdateKeyspaceState(name string, newState keyspacepb.Key
})
if err != nil {
log.Warn("[keyspace] failed to update keyspace config",
zap.Uint32("ID", meta.GetId()),
zap.Uint32("keyspace-id", meta.GetId()),
zap.String("name", meta.GetName()),
zap.Error(err),
)
return nil, err
}
log.Info("[keyspace] keyspace state updated",
zap.Uint32("ID", meta.GetId()),
zap.String("name", meta.GetName()),
zap.String("new state", newState.String()),
zap.String("keyspace-id", meta.GetName()),
zap.String("new-state", newState.String()),
)
return meta, nil
}
Expand Down Expand Up @@ -510,16 +583,16 @@ func (manager *Manager) UpdateKeyspaceStateByID(id uint32, newState keyspacepb.K
})
if err != nil {
log.Warn("[keyspace] failed to update keyspace config",
zap.Uint32("ID", meta.GetId()),
zap.Uint32("keyspace-id", meta.GetId()),
zap.String("name", meta.GetName()),
zap.Error(err),
)
return nil, err
}
log.Info("[keyspace] keyspace state updated",
zap.Uint32("ID", meta.GetId()),
zap.Uint32("keyspace-id", meta.GetId()),
zap.String("name", meta.GetName()),
zap.String("new state", newState.String()),
zap.String("new-state", newState.String()),
)
return meta, nil
}
Expand Down
23 changes: 21 additions & 2 deletions pkg/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/tikv/pd/pkg/mock/mockid"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/typeutil"
)

const (
Expand All @@ -51,10 +52,27 @@ func TestKeyspaceTestSuite(t *testing.T) {
}

type mockConfig struct {
PreAlloc []string
PreAlloc []string
WaitRegionSplit bool
WaitRegionSplitTimeout typeutil.Duration
CheckRegionSplitInterval typeutil.Duration
}

func (m *mockConfig) GetPreAlloc() []string { return m.PreAlloc }
func (m *mockConfig) GetPreAlloc() []string {
return m.PreAlloc
}

func (m *mockConfig) ToWaitRegionSplit() bool {
return m.WaitRegionSplit
}

func (m *mockConfig) GetWaitRegionSplitTimeout() time.Duration {
return m.WaitRegionSplitTimeout.Duration
}

func (m *mockConfig) GetCheckRegionSplitInterval() time.Duration {
return m.CheckRegionSplitInterval.Duration
}

func (suite *keyspaceTestSuite) SetupTest() {
suite.ctx, suite.cancel = context.WithCancel(context.Background())
Expand Down Expand Up @@ -89,6 +107,7 @@ func makeCreateKeyspaceRequests(count int) []*CreateKeyspaceRequest {
testConfig2: "200",
},
CreateTime: now,
IsPreAlloc: true, // skip wait region split
}
}
return requests
Expand Down
4 changes: 4 additions & 0 deletions pkg/keyspace/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ const (
var (
// ErrKeyspaceNotFound is used to indicate target keyspace does not exist.
ErrKeyspaceNotFound = errors.New("keyspace does not exist")
// ErrRegionSplitTimeout indices to split region timeout
ErrRegionSplitTimeout = errors.New("region split timeout")
// ErrRegionSplitFailed indices to split region failed
ErrRegionSplitFailed = errors.New("region split failed")
// ErrKeyspaceExists indicates target keyspace already exists.
// It's used when creating a new keyspace.
ErrKeyspaceExists = errors.New("keyspace already exists")
Expand Down
18 changes: 18 additions & 0 deletions server/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,28 @@ func (h *confHandler) updateConfig(cfg *config.Config, key string, value interfa
case "cluster-version":
return h.updateClusterVersion(value)
case "label-property": // TODO: support changing label-property
case "keyspace":
return h.updateKeyspaceConfig(cfg, kp[len(kp)-1], value)
}
return errors.Errorf("config prefix %s not found", kp[0])
}

func (h *confHandler) updateKeyspaceConfig(config *config.Config, key string, value interface{}) error {
updated, found, err := jsonutil.AddKeyValue(&config.Keyspace, key, value)
if err != nil {
return err
}

if !found {
return errors.Errorf("config item %s not found", key)
}

if updated {
err = h.svr.SetKeyspaceConfig(config.Keyspace)
}
return err
}

func (h *confHandler) updateSchedule(config *config.Config, key string, value interface{}) error {
updated, found, err := jsonutil.AddKeyValue(&config.Schedule, key, value)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions server/apiv2/handlers/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func CreateKeyspace(c *gin.Context) {
Name: createParams.Name,
Config: createParams.Config,
CreateTime: time.Now().Unix(),
IsPreAlloc: false,
}
meta, err := manager.CreateKeyspace(req)
if err != nil {
Expand Down
Loading

0 comments on commit 06cfdba

Please sign in to comment.