Skip to content

Commit

Permalink
Merge branch 'tikv:master' into tso-load-ksg-assignment
Browse files Browse the repository at this point in the history
  • Loading branch information
binshi-bing authored Mar 30, 2023
2 parents b2816a1 + 21a333c commit c578432
Show file tree
Hide file tree
Showing 32 changed files with 158 additions and 68 deletions.
4 changes: 2 additions & 2 deletions pkg/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,10 @@ func (s *StoreInfo) IsEvictedAsSlowTrend() bool {
}

// IsAvailable returns if the store bucket of limitation is available
func (s *StoreInfo) IsAvailable(limitType storelimit.Type) bool {
func (s *StoreInfo) IsAvailable(limitType storelimit.Type, level constant.PriorityLevel) bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.limiter.Available(storelimit.RegionInfluence[limitType], limitType, constant.Low)
return s.limiter.Available(storelimit.RegionInfluence[limitType], limitType, level)
}

// IsTiFlash returns true if the store is tiflash.
Expand Down
9 changes: 9 additions & 0 deletions pkg/core/store_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,15 @@ func ResetStoreLimit(limitType storelimit.Type, ratePerSec ...float64) StoreCrea
}
}

// SetStoreLimit set the store for a store, it may switch the store limit mode.
func SetStoreLimit(limit storelimit.StoreLimit) StoreCreateOption {
return func(store *StoreInfo) {
store.mu.Lock()
defer store.mu.Unlock()
store.limiter = limit
}
}

// SetLastAwakenTime sets last awaken time for the store.
func SetLastAwakenTime(lastAwaken time.Time) StoreCreateOption {
return func(store *StoreInfo) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/discovery/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
)

// Discover is used to get all the service instances of the specified service name.
func Discover(cli *clientv3.Client, serviceName string) ([]string, error) {
key := discoveryPath(serviceName) + "/"
func Discover(cli *clientv3.Client, clusterID, serviceName string) ([]string, error) {
key := discoveryPath(clusterID, serviceName) + "/"
endKey := clientv3.GetPrefixRangeEnd(key) + "/"

withRange := clientv3.WithRange(endKey)
Expand Down
16 changes: 8 additions & 8 deletions pkg/mcs/discovery/discover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ func TestDiscover(t *testing.T) {
re.NoError(err)

<-etcd.Server.ReadyNotify()
sr1 := NewServiceRegister(context.Background(), client, "test_service", "127.0.0.1:1", "127.0.0.1:1", 1)
sr1 := NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:1", "127.0.0.1:1", 1)
err = sr1.Register()
re.NoError(err)
sr2 := NewServiceRegister(context.Background(), client, "test_service", "127.0.0.1:2", "127.0.0.1:2", 1)
sr2 := NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:2", "127.0.0.1:2", 1)
err = sr2.Register()
re.NoError(err)

endpoints, err := Discover(client, "test_service")
endpoints, err := Discover(client, "12345", "test_service")
re.NoError(err)
re.Len(endpoints, 2)
re.Equal("127.0.0.1:1", endpoints[0])
Expand All @@ -57,7 +57,7 @@ func TestDiscover(t *testing.T) {
sr1.cancel()
sr2.cancel()
time.Sleep(3 * time.Second)
endpoints, err = Discover(client, "test_service")
endpoints, err = Discover(client, "12345", "test_service")
re.NoError(err)
re.Empty(endpoints)
}
Expand All @@ -81,17 +81,17 @@ func TestServiceRegistryEntry(t *testing.T) {
entry1 := &ServiceRegistryEntry{ServiceAddr: "127.0.0.1:1"}
s1, err := entry1.Serialize()
re.NoError(err)
sr1 := NewServiceRegister(context.Background(), client, "test_service", "127.0.0.1:1", s1, 1)
sr1 := NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:1", s1, 1)
err = sr1.Register()
re.NoError(err)
entry2 := &ServiceRegistryEntry{ServiceAddr: "127.0.0.1:2"}
s2, err := entry2.Serialize()
re.NoError(err)
sr2 := NewServiceRegister(context.Background(), client, "test_service", "127.0.0.1:2", s2, 1)
sr2 := NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:2", s2, 1)
err = sr2.Register()
re.NoError(err)

endpoints, err := Discover(client, "test_service")
endpoints, err := Discover(client, "12345", "test_service")
re.NoError(err)
re.Len(endpoints, 2)
returnedEntry1 := &ServiceRegistryEntry{}
Expand All @@ -104,7 +104,7 @@ func TestServiceRegistryEntry(t *testing.T) {
sr1.cancel()
sr2.cancel()
time.Sleep(3 * time.Second)
endpoints, err = Discover(client, "test_service")
endpoints, err = Discover(client, "12345", "test_service")
re.NoError(err)
re.Empty(endpoints)
}
12 changes: 6 additions & 6 deletions pkg/mcs/discovery/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@

package discovery

import "path"
import "strings"

const (
registryPrefix = "/pd/microservice"
registryPrefix = "/ms"
registryKey = "registry"
)

func registryPath(serviceName, serviceAddr string) string {
return path.Join(registryPrefix, serviceName, registryKey, serviceAddr)
func registryPath(clusterID, serviceName, serviceAddr string) string {
return strings.Join([]string{registryPrefix, clusterID, serviceName, registryKey, serviceAddr}, "/")
}

func discoveryPath(serviceName string) string {
return path.Join(registryPrefix, serviceName, registryKey)
func discoveryPath(clusterID, serviceName string) string {
return strings.Join([]string{registryPrefix, clusterID, serviceName, registryKey}, "/")
}
4 changes: 2 additions & 2 deletions pkg/mcs/discovery/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ type ServiceRegister struct {
}

// NewServiceRegister creates a new ServiceRegister.
func NewServiceRegister(ctx context.Context, cli *clientv3.Client, serviceName, serviceAddr, serializedValue string, ttl int64) *ServiceRegister {
func NewServiceRegister(ctx context.Context, cli *clientv3.Client, clusterID, serviceName, serviceAddr, serializedValue string, ttl int64) *ServiceRegister {
cctx, cancel := context.WithCancel(ctx)
serviceKey := registryPath(serviceName, serviceAddr)
serviceKey := registryPath(clusterID, serviceName, serviceAddr)
return &ServiceRegister{
ctx: cctx,
cancel: cancel,
Expand Down
8 changes: 5 additions & 3 deletions pkg/mcs/discovery/register_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,23 @@ func TestRegister(t *testing.T) {
re.NoError(err)

<-etcd.Server.ReadyNotify()
sr := NewServiceRegister(context.Background(), client, "test_service", "127.0.0.1:1", "127.0.0.1:1", 10)
// with http prefix
sr := NewServiceRegister(context.Background(), client, "12345", "test_service", "http://127.0.0.1:1", "http://127.0.0.1:1", 10)
re.NoError(err)
err = sr.Register()
re.NoError(err)
re.Equal("/ms/12345/test_service/registry/http://127.0.0.1:1", sr.key)
resp, err := client.Get(context.Background(), sr.key)
re.NoError(err)
re.Equal("127.0.0.1:1", string(resp.Kvs[0].Value))
re.Equal("http://127.0.0.1:1", string(resp.Kvs[0].Value))

err = sr.Deregister()
re.NoError(err)
resp, err = client.Get(context.Background(), sr.key)
re.NoError(err)
re.Empty(resp.Kvs)

sr = NewServiceRegister(context.Background(), client, "test_service", "127.0.0.1:2", "127.0.0.1:2", 1)
sr = NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:2", "127.0.0.1:2", 1)
re.NoError(err)
err = sr.Register()
re.NoError(err)
Expand Down
6 changes: 4 additions & 2 deletions pkg/mcs/resource_manager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"os"
"os/signal"
"path"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -399,12 +400,13 @@ func (s *Server) startServer() (err error) {
}

// Server has started.
entry := &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.ListenAddr}
entry := &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr}
serializedEntry, err := entry.Serialize()
if err != nil {
return err
}
s.serviceRegister = discovery.NewServiceRegister(s.ctx, s.etcdClient, utils.ResourceManagerServiceName, s.cfg.ListenAddr, serializedEntry, discovery.DefaultLeaseInSeconds)
s.serviceRegister = discovery.NewServiceRegister(s.ctx, s.etcdClient, strconv.FormatUint(s.clusterID, 10),
utils.ResourceManagerServiceName, s.cfg.AdvertiseListenAddr, serializedEntry, discovery.DefaultLeaseInSeconds)
if err := s.serviceRegister.Register(); err != nil {
log.Error("failed to regiser the service", zap.String("service-name", utils.ResourceManagerServiceName), errs.ZapError(err))
return err
Expand Down
7 changes: 4 additions & 3 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,14 +466,15 @@ func (s *Server) startServer() (err error) {
}

// Server has started.
entry := &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.ListenAddr}
entry := &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr}
serializedEntry, err := entry.Serialize()
if err != nil {
return err
}
s.serviceRegister = discovery.NewServiceRegister(s.ctx, s.etcdClient, mcsutils.TSOServiceName, s.cfg.ListenAddr, serializedEntry, discovery.DefaultLeaseInSeconds)
s.serviceRegister = discovery.NewServiceRegister(s.ctx, s.etcdClient, strconv.FormatUint(s.clusterID, 10),
mcsutils.TSOServiceName, s.cfg.AdvertiseListenAddr, serializedEntry, discovery.DefaultLeaseInSeconds)
if err := s.serviceRegister.Register(); err != nil {
log.Error("failed to regiser the service", zap.String("service-name", mcsutils.TSOServiceName), errs.ZapError(err))
log.Error("failed to register the service", zap.String("service-name", mcsutils.TSOServiceName), errs.ZapError(err))
return err
}

Expand Down
15 changes: 12 additions & 3 deletions pkg/schedule/checker/replica_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package checker
import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/schedule/filter"
"go.uber.org/zap"
Expand Down Expand Up @@ -53,11 +54,15 @@ func (s *ReplicaStrategy) SelectStoreToAdd(coLocationStores []*core.StoreInfo, e
//
// The reason for it is to prevent the non-optimal replica placement due
// to the short-term state, resulting in redundant scheduling.
level := constant.High
if s.fastFailover {
level = constant.Urgent
}
filters := []filter.Filter{
filter.NewExcludedFilter(s.checkerName, nil, s.region.GetStoreIDs()),
filter.NewStorageThresholdFilter(s.checkerName),
filter.NewSpecialUseFilter(s.checkerName),
&filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true, AllowTemporaryStates: true},
&filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true, AllowTemporaryStates: true, OperatorLevel: level},
}
if len(s.locationLabels) > 0 && s.isolationLevel != "" {
filters = append(filters, filter.NewIsolationFilter(s.checkerName, s.isolationLevel, s.locationLabels, coLocationStores))
Expand All @@ -70,7 +75,7 @@ func (s *ReplicaStrategy) SelectStoreToAdd(coLocationStores []*core.StoreInfo, e
}

isolationComparer := filter.IsolationComparer(s.locationLabels, coLocationStores)
strictStateFilter := &filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true, AllowFastFailover: s.fastFailover}
strictStateFilter := &filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true, AllowFastFailover: s.fastFailover, OperatorLevel: level}
targetCandidate := filter.NewCandidates(s.cluster.GetStores()).
FilterTarget(s.cluster.GetOpts(), nil, nil, filters...).
KeepTheTopStores(isolationComparer, false) // greater isolation score is better
Expand Down Expand Up @@ -123,8 +128,12 @@ func (s *ReplicaStrategy) swapStoreToFirst(stores []*core.StoreInfo, id uint64)
// SelectStoreToRemove returns the best option to remove from the region.
func (s *ReplicaStrategy) SelectStoreToRemove(coLocationStores []*core.StoreInfo) uint64 {
isolationComparer := filter.IsolationComparer(s.locationLabels, coLocationStores)
level := constant.High
if s.fastFailover {
level = constant.Urgent
}
source := filter.NewCandidates(coLocationStores).
FilterSource(s.cluster.GetOpts(), nil, nil, &filter.StoreStateFilter{ActionScope: replicaCheckerName, MoveRegion: true}).
FilterSource(s.cluster.GetOpts(), nil, nil, &filter.StoreStateFilter{ActionScope: replicaCheckerName, MoveRegion: true, OperatorLevel: level}).
KeepTheTopStores(isolationComparer, true).
PickTheTopStore(filter.RegionScoreComparer(s.cluster.GetOpts()), false)
if source == nil {
Expand Down
12 changes: 10 additions & 2 deletions pkg/schedule/filter/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/placement"
Expand Down Expand Up @@ -325,6 +326,12 @@ type StoreStateFilter struct {
AllowFastFailover bool
// Set true if allows temporary states.
AllowTemporaryStates bool
// Set the priority level of the filter, it should be same with the operator level.
// The priority level can be higher than the operator level in checker,
// the operator controller should check it again by using the actual operator level.
// If it checks failed, the operator will be put back to the waiting queue util the limit is available.
// But the scheduler should keep the same with the operator level.
OperatorLevel constant.PriorityLevel
// Reason is used to distinguish the reason of store state filter
Reason filterType
}
Expand Down Expand Up @@ -417,7 +424,7 @@ func (f *StoreStateFilter) isBusy(_ config.Config, store *core.StoreInfo) *plan.
}

func (f *StoreStateFilter) exceedRemoveLimit(_ config.Config, store *core.StoreInfo) *plan.Status {
if !f.AllowTemporaryStates && !store.IsAvailable(storelimit.RemovePeer) {
if !f.AllowTemporaryStates && !store.IsAvailable(storelimit.RemovePeer, f.OperatorLevel) {
f.Reason = storeStateExceedRemoveLimit
return statusStoreRemoveLimit
}
Expand All @@ -426,7 +433,7 @@ func (f *StoreStateFilter) exceedRemoveLimit(_ config.Config, store *core.StoreI
}

func (f *StoreStateFilter) exceedAddLimit(_ config.Config, store *core.StoreInfo) *plan.Status {
if !f.AllowTemporaryStates && !store.IsAvailable(storelimit.AddPeer) {
if !f.AllowTemporaryStates && !store.IsAvailable(storelimit.AddPeer, f.OperatorLevel) {
f.Reason = storeStateExceedAddLimit
return statusStoreAddLimit
}
Expand Down Expand Up @@ -531,6 +538,7 @@ func (f *StoreStateFilter) Source(conf config.Config, store *core.StoreInfo) (st
return
}
}

return statusOK
}

Expand Down
15 changes: 15 additions & 0 deletions pkg/schedule/filter/filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/mock/mockcluster"
"github.com/tikv/pd/pkg/mock/mockconfig"
"github.com/tikv/pd/pkg/schedule/placement"
Expand Down Expand Up @@ -148,6 +150,19 @@ func TestRuleFitFilter(t *testing.T) {
re.False(leaderFilter.Target(testCluster.GetOpts(), testCluster.GetStore(6)).IsOK())
}

func TestSendStateFilter(t *testing.T) {
re := require.New(t)
store := core.NewStoreInfoWithLabel(1, map[string]string{}).Clone(core.SetStoreLimit(storelimit.NewSlidingWindows(1000)))
region := core.NewTestRegionInfo(1, 1, []byte(""), []byte(""))

snapshotFilter := NewSnapshotSendFilter([]*core.StoreInfo{store}, constant.Medium)
re.NotNil(SelectOneRegion([]*core.RegionInfo{region}, nil, snapshotFilter))
re.True(store.GetStoreLimit().Take(1000, storelimit.SendSnapshot, constant.Medium))
re.True(store.GetStoreLimit().Take(1000, storelimit.SendSnapshot, constant.Medium))
snapshotFilter = NewSnapshotSendFilter([]*core.StoreInfo{store}, constant.Medium)
re.Nil(SelectOneRegion([]*core.RegionInfo{region}, nil, snapshotFilter))
}

func TestStoreStateFilter(t *testing.T) {
re := require.New(t)
filters := []Filter{
Expand Down
28 changes: 28 additions & 0 deletions pkg/schedule/filter/region_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package filter

import (
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/schedule/plan"
"github.com/tikv/pd/pkg/slice"
Expand Down Expand Up @@ -164,3 +166,29 @@ func (f *regionWitnessFilter) Select(region *core.RegionInfo) *plan.Status {
}
return statusOK
}

// SnapshotSenderFilter filer the region who's leader store reaches the limit.
type SnapshotSenderFilter struct {
senders map[uint64]struct{}
}

// NewSnapshotSendFilter returns creates a RegionFilter that filters regions with witness peer on the specific store.
// level should be set as same with the operator priority level.
func NewSnapshotSendFilter(stores []*core.StoreInfo, level constant.PriorityLevel) RegionFilter {
senders := make(map[uint64]struct{})
for _, store := range stores {
if store.IsAvailable(storelimit.SendSnapshot, level) && !store.IsBusy() {
senders[store.GetID()] = struct{}{}
}
}
return &SnapshotSenderFilter{senders: senders}
}

// Select returns ok if the region leader in the senders.
func (f *SnapshotSenderFilter) Select(region *core.RegionInfo) *plan.Status {
leaderStoreID := region.GetLeader().GetStoreId()
if _, ok := f.senders[leaderStoreID]; ok {
return statusOK
}
return statusRegionLeaderSendSnapshotThrottled
}
13 changes: 7 additions & 6 deletions pkg/schedule/filter/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ var (
statusStoreNotMatchIsolation = plan.NewStatus(plan.StatusStoreNotMatchIsolation)

// region filter status
statusRegionPendingPeer = plan.NewStatus(plan.StatusRegionUnhealthy)
statusRegionDownPeer = plan.NewStatus(plan.StatusRegionUnhealthy)
statusRegionEmpty = plan.NewStatus(plan.StatusRegionEmpty)
statusRegionNotMatchRule = plan.NewStatus(plan.StatusRegionNotMatchRule)
statusRegionNotReplicated = plan.NewStatus(plan.StatusRegionNotReplicated)
statusRegionWitnessPeer = plan.NewStatus(plan.StatusRegionNotMatchRule)
statusRegionPendingPeer = plan.NewStatus(plan.StatusRegionUnhealthy)
statusRegionDownPeer = plan.NewStatus(plan.StatusRegionUnhealthy)
statusRegionEmpty = plan.NewStatus(plan.StatusRegionEmpty)
statusRegionNotMatchRule = plan.NewStatus(plan.StatusRegionNotMatchRule)
statusRegionNotReplicated = plan.NewStatus(plan.StatusRegionNotReplicated)
statusRegionWitnessPeer = plan.NewStatus(plan.StatusRegionNotMatchRule)
statusRegionLeaderSendSnapshotThrottled = plan.NewStatus(plan.StatusRegionSendSnapshotThrottled)
)
2 changes: 2 additions & 0 deletions pkg/schedule/plan/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ const (
StatusNoTargetRegion
// StatusRegionLabelReject represents the plan conflicts with region label.
StatusRegionLabelReject
// StatusRegionSendSnapshotThrottled represents the plan conflicts with send snapshot.
StatusRegionSendSnapshotThrottled
)

const (
Expand Down
Loading

0 comments on commit c578432

Please sign in to comment.