Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

keysapce: wait region split when creating keyspace #6414

Merged
merged 4 commits into from
May 9, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 84 additions & 15 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package keyspace

import (
"bytes"
"context"
"strconv"
"time"
Expand Down Expand Up @@ -55,6 +56,9 @@ const (
// Config is the interface for keyspace config.
type Config interface {
GetPreAlloc() []string
ToWaitRegionSplit() bool
GetWaitRegionSplitTimeout() time.Duration
GetCheckRegionSplitInterval() time.Duration
}

// Manager manages keyspace related data.
Expand Down Expand Up @@ -86,6 +90,8 @@ type CreateKeyspaceRequest struct {
Config map[string]string
// CreateTime is the timestamp used to record creation time.
CreateTime int64
// IsPreAlloc indicates whether the keyspace is pre-allocated when the cluster starts.
IsPreAlloc bool
}

// NewKeyspaceManager creates a Manager of keyspace related data.
Expand All @@ -112,7 +118,7 @@ func NewKeyspaceManager(
// Bootstrap saves default keyspace info.
func (manager *Manager) Bootstrap() error {
// Split Keyspace Region for default keyspace.
if err := manager.splitKeyspaceRegion(utils.DefaultKeyspaceID); err != nil {
if err := manager.splitKeyspaceRegion(utils.DefaultKeyspaceID, false); err != nil {
return err
}
now := time.Now().Unix()
Expand Down Expand Up @@ -148,6 +154,7 @@ func (manager *Manager) Bootstrap() error {
req := &CreateKeyspaceRequest{
Name: keyspaceName,
CreateTime: now,
IsPreAlloc: true,
Config: config,
}
keyspace, err := manager.CreateKeyspace(req)
Expand All @@ -162,6 +169,11 @@ func (manager *Manager) Bootstrap() error {
return nil
}

// UpdateConfig update keyspace manager's config.
func (manager *Manager) UpdateConfig(cfg Config) {
manager.config = cfg
}

// CreateKeyspace create a keyspace meta with given config and save it to storage.
func (manager *Manager) CreateKeyspace(request *CreateKeyspaceRequest) (*keyspacepb.KeyspaceMeta, error) {
// Validate purposed name's legality.
Expand All @@ -173,8 +185,11 @@ func (manager *Manager) CreateKeyspace(request *CreateKeyspaceRequest) (*keyspac
if err != nil {
return nil, err
}
// If the request to create a keyspace is pre-allocated when the PD starts,
// there is no need to wait for the region split, because TiKV has not started.
waitRegionSplit := !request.IsPreAlloc && manager.config.ToWaitRegionSplit()
// Split keyspace region.
err = manager.splitKeyspaceRegion(newID)
err = manager.splitKeyspaceRegion(newID, waitRegionSplit)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -252,27 +267,81 @@ func (manager *Manager) saveNewKeyspace(keyspace *keyspacepb.KeyspaceMeta) error

// splitKeyspaceRegion add keyspace's boundaries to region label. The corresponding
// region will then be split by Coordinator's patrolRegion.
func (manager *Manager) splitKeyspaceRegion(id uint32) error {
func (manager *Manager) splitKeyspaceRegion(id uint32, waitRegionSplit bool) (err error) {
failpoint.Inject("skipSplitRegion", func() {
failpoint.Return(nil)
})

start := time.Now()
keyspaceRule := makeLabelRule(id)
if cl, ok := manager.cluster.(interface{ GetRegionLabeler() *labeler.RegionLabeler }); ok {
err := cl.GetRegionLabeler().SetLabelRule(keyspaceRule)
if err != nil {
log.Warn("[keyspace] failed to add region label for keyspace",
zap.Uint32("keyspaceID", id),
zap.Error(err),
)
}
log.Info("[keyspace] added region label for keyspace",
cl, ok := manager.cluster.(interface{ GetRegionLabeler() *labeler.RegionLabeler })
if !ok {
return errors.New("cluster does not support region label")
}
err = cl.GetRegionLabeler().SetLabelRule(keyspaceRule)
if err != nil {
log.Warn("[keyspace] failed to add region label for keyspace",
zap.Uint32("keyspaceID", id),
zap.Any("LabelRule", keyspaceRule),
zap.Error(err),
)
return nil
return err
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when meeting this error it will return nil in pd master, but it returns an error in pd-cse

}
return errors.New("cluster does not support region label")
defer func() {
if err != nil {
cl.GetRegionLabeler().DeleteLabelRule(keyspaceRule.ID)
}
}()

if waitRegionSplit {
ranges := keyspaceRule.Data.([]*labeler.KeyRangeRule)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check the length?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added. cc @zeminzhou

rawLeftBound, rawRightBound := ranges[0].StartKey, ranges[0].EndKey
txnLeftBound, txnRightBound := ranges[1].StartKey, ranges[1].EndKey

ticker := time.NewTicker(manager.config.GetCheckRegionSplitInterval())
timer := time.NewTimer(manager.config.GetWaitRegionSplitTimeout())
defer func() {
ticker.Stop()
timer.Stop()
}()
for {
select {
case <-ticker.C:
regionsInfo := manager.cluster.GetBasicCluster().RegionsInfo
region := regionsInfo.GetRegionByKey(rawLeftBound)
if region == nil || !bytes.Equal(region.GetStartKey(), rawLeftBound) {
continue
}
region = regionsInfo.GetRegionByKey(rawRightBound)
if region == nil || !bytes.Equal(region.GetStartKey(), rawRightBound) {
continue
}
region = regionsInfo.GetRegionByKey(txnLeftBound)
if region == nil || !bytes.Equal(region.GetStartKey(), txnLeftBound) {
continue
}
region = regionsInfo.GetRegionByKey(txnRightBound)
if region == nil || !bytes.Equal(region.GetStartKey(), txnRightBound) {
continue
}
case <-timer.C:
log.Warn("[keyspace] wait region split timeout",
zap.Uint32("keyspaceID", id),
zap.Error(err),
)
err = ErrRegionSplitTimeout
return
}
log.Info("[keyspace] wait reigon split successfully", zap.Uint32("keyspaceID", id))
break
}
}

log.Info("[keyspace] added region label for keyspace",
zap.Uint32("keyspaceID", id),
zap.Any("LabelRule", keyspaceRule),
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
zap.Duration("takes", time.Since(start)),
)
return
}

// LoadKeyspace returns the keyspace specified by name.
Expand Down
23 changes: 21 additions & 2 deletions pkg/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/tikv/pd/pkg/mock/mockid"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/typeutil"
)

const (
Expand All @@ -51,10 +52,27 @@ func TestKeyspaceTestSuite(t *testing.T) {
}

type mockConfig struct {
PreAlloc []string
PreAlloc []string
WaitRegionSplit bool
WaitRegionSplitTimeout typeutil.Duration
CheckRegionSplitInterval typeutil.Duration
}

func (m *mockConfig) GetPreAlloc() []string { return m.PreAlloc }
func (m *mockConfig) GetPreAlloc() []string {
return m.PreAlloc
}

func (m *mockConfig) ToWaitRegionSplit() bool {
return m.WaitRegionSplit
}

func (m *mockConfig) GetWaitRegionSplitTimeout() time.Duration {
return m.WaitRegionSplitTimeout.Duration
}

func (m *mockConfig) GetCheckRegionSplitInterval() time.Duration {
return m.CheckRegionSplitInterval.Duration
}

func (suite *keyspaceTestSuite) SetupTest() {
suite.ctx, suite.cancel = context.WithCancel(context.Background())
Expand Down Expand Up @@ -89,6 +107,7 @@ func makeCreateKeyspaceRequests(count int) []*CreateKeyspaceRequest {
testConfig2: "200",
},
CreateTime: now,
IsPreAlloc: true, // skip wait region split
}
}
return requests
Expand Down
2 changes: 2 additions & 0 deletions pkg/keyspace/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ const (
var (
// ErrKeyspaceNotFound is used to indicate target keyspace does not exist.
ErrKeyspaceNotFound = errors.New("keyspace does not exist")
// ErrRegionSplitTimeout indices to split region timeout
ErrRegionSplitTimeout = errors.New("region split timeout")
// ErrKeyspaceExists indicates target keyspace already exists.
// It's used when creating a new keyspace.
ErrKeyspaceExists = errors.New("keyspace already exists")
Expand Down
18 changes: 18 additions & 0 deletions server/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,28 @@ func (h *confHandler) updateConfig(cfg *config.Config, key string, value interfa
case "cluster-version":
return h.updateClusterVersion(value)
case "label-property": // TODO: support changing label-property
case "keyspace":
return h.updateKeyspaceConfig(cfg, kp[len(kp)-1], value)
}
return errors.Errorf("config prefix %s not found", kp[0])
}

func (h *confHandler) updateKeyspaceConfig(config *config.Config, key string, value interface{}) error {
updated, found, err := jsonutil.AddKeyValue(&config.Keyspace, key, value)
if err != nil {
return err
}

if !found {
return errors.Errorf("config item %s not found", key)
}

if updated {
err = h.svr.SetKeyspaceConfig(config.Keyspace)
}
return err
}

func (h *confHandler) updateSchedule(config *config.Config, key string, value interface{}) error {
updated, found, err := jsonutil.AddKeyValue(&config.Schedule, key, value)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions server/apiv2/handlers/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func CreateKeyspace(c *gin.Context) {
Name: createParams.Name,
Config: createParams.Config,
CreateTime: time.Now().Unix(),
IsPreAlloc: false,
}
meta, err := manager.CreateKeyspace(req)
if err != nil {
Expand Down
60 changes: 60 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,11 @@ const (
defaultGCTunerThreshold = 0.6
minGCTunerThreshold = 0
maxGCTunerThreshold = 0.9

defaultWaitRegionSplitTimeout = 30 * time.Second
defaultCheckRegionSplitInterval = 50 * time.Millisecond
minCheckRegionSplitInterval = 1 * time.Millisecond
maxCheckRegionSplitInterval = 100 * time.Millisecond
)

// Special keys for Labels
Expand Down Expand Up @@ -496,6 +501,8 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {

c.ReplicationMode.adjust(configMetaData.Child("replication-mode"))

c.Keyspace.adjust(configMetaData.Child("keyspace"))

c.Security.Encryption.Adjust()

if len(c.Log.Format) == 0 {
Expand Down Expand Up @@ -1399,9 +1406,62 @@ func (c *DRAutoSyncReplicationConfig) adjust(meta *configutil.ConfigMetaData) {
type KeyspaceConfig struct {
// PreAlloc contains the keyspace to be allocated during keyspace manager initialization.
PreAlloc []string `toml:"pre-alloc" json:"pre-alloc"`
// WaitRegionSplit indicates whether to wait for the region split to complete
WaitRegionSplit bool `toml:"wait-region-split" json:"wait-region-split"`
// WaitRegionSplitTimeout indicates the max duration to wait region split.
WaitRegionSplitTimeout typeutil.Duration `toml:"wait-region-split-timeout" json:"wait-region-split-timeout"`
// CheckRegionSplitInterval indicates the interval to check whether the region split is complete
CheckRegionSplitInterval typeutil.Duration `toml:"check-region-split-interval" json:"check-region-split-interval"`
}

// Validate checks if keyspace config falls within acceptable range.
func (c *KeyspaceConfig) Validate() error {
if c.CheckRegionSplitInterval.Duration > maxCheckRegionSplitInterval || c.CheckRegionSplitInterval.Duration < minCheckRegionSplitInterval {
return errors.New(fmt.Sprintf("[keyspace] check-region-split-interval should between %v and %v",
minCheckRegionSplitInterval, maxCheckRegionSplitInterval))
}
if c.CheckRegionSplitInterval.Duration >= c.WaitRegionSplitTimeout.Duration {
return errors.New("[keyspace] check-region-split-interval should be less than wait-region-split-timeout")
}
return nil
}

func (c *KeyspaceConfig) adjust(meta *configutil.ConfigMetaData) {
if !meta.IsDefined("wait-region-split") {
c.WaitRegionSplit = true
}
if !meta.IsDefined("wait-region-split-timeout") {
c.WaitRegionSplitTimeout = typeutil.NewDuration(defaultWaitRegionSplitTimeout)
}
if !meta.IsDefined("check-region-split-interval") {
c.CheckRegionSplitInterval = typeutil.NewDuration(defaultCheckRegionSplitInterval)
}
}

// Clone makes a deep copy of the keyspace config.
func (c *KeyspaceConfig) Clone() *KeyspaceConfig {
preAlloc := append(c.PreAlloc[:0:0], c.PreAlloc...)
cfg := *c
cfg.PreAlloc = preAlloc
return &cfg
}

// GetPreAlloc returns the keyspace to be allocated during keyspace manager initialization.
func (c *KeyspaceConfig) GetPreAlloc() []string {
return c.PreAlloc
}

// ToWaitRegionSplit returns whether to wait for the region split to complete.
func (c *KeyspaceConfig) ToWaitRegionSplit() bool {
return c.WaitRegionSplit
}

// GetWaitRegionSplitTimeout returns the max duration to wait region split.
func (c *KeyspaceConfig) GetWaitRegionSplitTimeout() time.Duration {
return c.WaitRegionSplitTimeout.Duration
}

// GetCheckRegionSplitInterval returns the interval to check whether the region split is complete.
func (c *KeyspaceConfig) GetCheckRegionSplitInterval() time.Duration {
return c.CheckRegionSplitInterval.Duration
}
14 changes: 14 additions & 0 deletions server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type PersistOptions struct {
pdServerConfig atomic.Value
replicationMode atomic.Value
labelProperty atomic.Value
keyspace atomic.Value
clusterVersion unsafe.Pointer
}

Expand All @@ -62,6 +63,7 @@ func NewPersistOptions(cfg *Config) *PersistOptions {
o.pdServerConfig.Store(&cfg.PDServerCfg)
o.replicationMode.Store(&cfg.ReplicationMode)
o.labelProperty.Store(cfg.LabelProperty)
o.keyspace.Store(&cfg.Keyspace)
o.SetClusterVersion(&cfg.ClusterVersion)
o.ttl = nil
return o
Expand Down Expand Up @@ -117,6 +119,16 @@ func (o *PersistOptions) SetLabelPropertyConfig(cfg LabelPropertyConfig) {
o.labelProperty.Store(cfg)
}

// GetKeyspaceConfig returns the keyspace config.
func (o *PersistOptions) GetKeyspaceConfig() *KeyspaceConfig {
return o.keyspace.Load().(*KeyspaceConfig)
}

// SetKeyspaceConfig sets the keyspace configuration.
func (o *PersistOptions) SetKeyspaceConfig(cfg *KeyspaceConfig) {
o.keyspace.Store(cfg)
}

// GetClusterVersion returns the cluster version.
func (o *PersistOptions) GetClusterVersion() *semver.Version {
return (*semver.Version)(atomic.LoadPointer(&o.clusterVersion))
Expand Down Expand Up @@ -736,6 +748,7 @@ func (o *PersistOptions) Persist(storage endpoint.ConfigStorage) error {
PDServerCfg: *o.GetPDServerConfig(),
ReplicationMode: *o.GetReplicationModeConfig(),
LabelProperty: o.GetLabelPropertyConfig(),
Keyspace: *o.GetKeyspaceConfig(),
ClusterVersion: *o.GetClusterVersion(),
}
err := storage.SaveConfig(cfg)
Expand Down Expand Up @@ -763,6 +776,7 @@ func (o *PersistOptions) Reload(storage endpoint.ConfigStorage) error {
o.pdServerConfig.Store(&cfg.PDServerCfg)
o.replicationMode.Store(&cfg.ReplicationMode)
o.labelProperty.Store(cfg.LabelProperty)
o.keyspace.Store(&cfg.Keyspace)
o.SetClusterVersion(&cfg.ClusterVersion)
}
return nil
Expand Down
Loading