Skip to content

Commit

Permalink
tests: make TestSplitKeyspaceGroup stable (#6584)
Browse files Browse the repository at this point in the history
close #6571

Signed-off-by: lhy1024 <admin@liudos.us>
  • Loading branch information
lhy1024 authored Jun 13, 2023
1 parent 170d287 commit f069411
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 32 deletions.
2 changes: 1 addition & 1 deletion pkg/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (suite *keyspaceTestSuite) SetupTest() {
allocator := mockid.NewIDAllocator()
kgm := NewKeyspaceGroupManager(suite.ctx, store, nil, 0)
suite.manager = NewKeyspaceManager(suite.ctx, store, nil, allocator, &mockConfig{}, kgm)
suite.NoError(kgm.Bootstrap())
suite.NoError(kgm.Bootstrap(suite.ctx))
suite.NoError(suite.manager.Bootstrap())
}

Expand Down
55 changes: 36 additions & 19 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -51,9 +52,11 @@ const (

// GroupManager is the manager of keyspace group related data.
type GroupManager struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
client *clientv3.Client
clusterID uint64

sync.RWMutex
// groups is the cache of keyspace group related information.
Expand Down Expand Up @@ -90,24 +93,24 @@ func NewKeyspaceGroupManager(
cancel: cancel,
store: store,
groups: groups,
client: client,
clusterID: clusterID,
nodesBalancer: balancer.GenByPolicy[string](defaultBalancerPolicy),
serviceRegistryMap: make(map[string]string),
}

// If the etcd client is not nil, start the watch loop for the registered tso servers.
// The PD(TSO) Client relies on this info to discover tso servers.
if client != nil {
m.initTSONodesWatcher(client, clusterID)
m.wg.Add(2)
if m.client != nil {
m.initTSONodesWatcher(m.client, m.clusterID)
m.wg.Add(1)
go m.tsoNodesWatcher.StartWatchLoop()
go m.allocNodesToAllKeyspaceGroups()
}

return m
}

// Bootstrap saves default keyspace group info and init group mapping in the memory.
func (m *GroupManager) Bootstrap() error {
func (m *GroupManager) Bootstrap(ctx context.Context) error {
// Force the membership restriction that the default keyspace must belong to default keyspace group.
// Have no information to specify the distribution of the default keyspace group replicas, so just
// leave the replica/member list empty. The TSO service will assign the default keyspace group replica
Expand Down Expand Up @@ -137,6 +140,11 @@ func (m *GroupManager) Bootstrap() error {
m.groups[userKind].Put(group)
}

// It will only alloc node when the group manager is on API leader.
if m.client != nil {
m.wg.Add(1)
go m.allocNodesToAllKeyspaceGroups(ctx)
}
return nil
}

Expand All @@ -146,7 +154,7 @@ func (m *GroupManager) Close() {
m.wg.Wait()
}

func (m *GroupManager) allocNodesToAllKeyspaceGroups() {
func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) {
defer logutil.LogPanic()
defer m.wg.Done()
ticker := time.NewTicker(allocNodesToKeyspaceGroupsInterval)
Expand All @@ -158,7 +166,7 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups() {
log.Info("start to alloc nodes to all keyspace groups")
for {
select {
case <-m.ctx.Done():
case <-ctx.Done():
log.Info("stop to alloc nodes to all keyspace groups")
return
case <-ticker.C:
Expand Down Expand Up @@ -338,11 +346,6 @@ func (m *GroupManager) saveKeyspaceGroups(keyspaceGroups []*endpoint.KeyspaceGro
Members: keyspaceGroup.Members,
Keyspaces: keyspaceGroup.Keyspaces,
}
if oldKG.IsSplitting() {
newKG.SplitState = &endpoint.SplitState{
SplitSource: oldKG.SplitState.SplitSource,
}
}
err = m.store.SaveKeyspaceGroup(txn, newKG)
if err != nil {
return err
Expand Down Expand Up @@ -380,6 +383,8 @@ func (m *GroupManager) getKeyspaceConfigByKindLocked(userKind endpoint.UserKind)
return config, nil
}

var failpointOnce sync.Once

// UpdateKeyspaceForGroup updates the keyspace field for the keyspace group.
func (m *GroupManager) UpdateKeyspaceForGroup(userKind endpoint.UserKind, groupID string, keyspaceID uint32, mutation int) error {
// when server is not in API mode, we don't need to update the keyspace for keyspace group
Expand All @@ -391,6 +396,12 @@ func (m *GroupManager) UpdateKeyspaceForGroup(userKind endpoint.UserKind, groupI
return err
}

failpoint.Inject("externalAllocNode", func(val failpoint.Value) {
failpointOnce.Do(func() {
addrs := val.(string)
m.SetNodesForKeyspaceGroup(utils.DefaultKeyspaceGroupID, strings.Split(addrs, ","))
})
})
m.Lock()
defer m.Unlock()
return m.updateKeyspaceForGroupLocked(userKind, id, keyspaceID, mutation)
Expand Down Expand Up @@ -425,7 +436,6 @@ func (m *GroupManager) updateKeyspaceForGroupLocked(userKind endpoint.UserKind,
if err := m.saveKeyspaceGroups([]*endpoint.KeyspaceGroup{kg}, true); err != nil {
return err
}

m.groups[userKind].Put(kg)
}
return nil
Expand Down Expand Up @@ -696,8 +706,10 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount
func (m *GroupManager) SetNodesForKeyspaceGroup(id uint32, nodes []string) error {
m.Lock()
defer m.Unlock()
return m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
kg, err := m.store.LoadKeyspaceGroup(txn, id)
var kg *endpoint.KeyspaceGroup
err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
var err error
kg, err = m.store.LoadKeyspaceGroup(txn, id)
if err != nil {
return err
}
Expand All @@ -714,6 +726,11 @@ func (m *GroupManager) SetNodesForKeyspaceGroup(id uint32, nodes []string) error
kg.Members = members
return m.store.SaveKeyspaceGroup(txn, kg)
})
if err != nil {
return err
}
m.groups[endpoint.StringUserKind(kg.UserKind)].Put(kg)
return nil
}

// IsExistNode checks if the node exists.
Expand Down
2 changes: 1 addition & 1 deletion pkg/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (suite *keyspaceGroupTestSuite) SetupTest() {
idAllocator := mockid.NewIDAllocator()
cluster := mockcluster.NewCluster(suite.ctx, mockconfig.NewTestOptions())
suite.kg = NewKeyspaceManager(suite.ctx, store, cluster, idAllocator, &mockConfig{}, suite.kgm)
suite.NoError(suite.kgm.Bootstrap())
suite.NoError(suite.kgm.Bootstrap(suite.ctx))
}

func (suite *keyspaceGroupTestSuite) TearDownTest() {
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func (c *RaftCluster) Start(s Server) error {
}

if s.IsAPIServiceMode() {
err = c.keyspaceGroupManager.Bootstrap()
err = c.keyspaceGroupManager.Bootstrap(c.ctx)
if err != nil {
return err
}
Expand Down
51 changes: 48 additions & 3 deletions tests/pdctl/keyspace/keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,14 @@ func TestKeyspaceGroup(t *testing.T) {
}

func TestSplitKeyspaceGroup(t *testing.T) {
t.Skip("skip this super flaky split keyspace group test which impacts everyone's productivity.")
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayStartServerLoop", `return(true)`))
keyspaces := make([]string, 0)
for i := 0; i < 500; i++ {
// we test the case which exceed the default max txn ops limit in etcd, which is 128.
for i := 0; i < 129; i++ {
keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i))
}
tc, err := tests.NewTestAPICluster(ctx, 3, func(conf *config.Config, serverName string) {
Expand Down Expand Up @@ -127,8 +127,53 @@ func TestSplitKeyspaceGroup(t *testing.T) {
output, err := pdctl.ExecuteCommand(cmd, args...)
re.NoError(err)
return strings.Contains(string(output), "Success")
}, testutil.WithWaitFor(20*time.Second))
})

re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes"))
re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServerLoop"))
}

func TestExternalAllocNodeWhenStart(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// external alloc node for keyspace group, when keyspace manager update keyspace info to keyspace group
// we hope the keyspace group can be updated correctly.
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/externalAllocNode", `return("127.0.0.1:2379,127.0.0.1:2380")`))
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayStartServerLoop", `return(true)`))
keyspaces := make([]string, 0)
for i := 0; i < 10; i++ {
keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i))
}
tc, err := tests.NewTestAPICluster(ctx, 1, func(conf *config.Config, serverName string) {
conf.Keyspace.PreAlloc = keyspaces
})
re.NoError(err)
err = tc.RunInitialServers()
re.NoError(err)
pdAddr := tc.GetConfig().GetClientURL()

cmd := pdctlCmd.GetRootCmd()

time.Sleep(2 * time.Second)
tc.WaitLeader()
leaderServer := tc.GetServer(tc.GetLeader())
re.NoError(leaderServer.BootstrapCluster())

// check keyspace group information.
defaultKeyspaceGroupID := fmt.Sprintf("%d", utils.DefaultKeyspaceGroupID)
args := []string{"-u", pdAddr, "keyspace-group"}
testutil.Eventually(re, func() bool {
output, err := pdctl.ExecuteCommand(cmd, append(args, defaultKeyspaceGroupID)...)
re.NoError(err)
var keyspaceGroup endpoint.KeyspaceGroup
err = json.Unmarshal(output, &keyspaceGroup)
re.NoError(err)
return len(keyspaceGroup.Keyspaces) == len(keyspaces)+1 && len(keyspaceGroup.Members) == 2
})

re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/externalAllocNode"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes"))
re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServerLoop"))
}
9 changes: 2 additions & 7 deletions tests/pdctl/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
)

func TestScheduler(t *testing.T) {
t.Skip("skip this super unstable test which impacts everyone's productivity")
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -464,12 +463,8 @@ func TestScheduler(t *testing.T) {
result := make(map[string]interface{})
testutil.Eventually(re, func() bool {
mightExec([]string{"-u", pdAddr, "scheduler", "describe", "balance-leader-scheduler"}, &result)
return len(result) != 0
}, testutil.WithTickInterval(50*time.Millisecond))

testutil.Eventually(re, func() bool {
return result["status"] == "paused" && result["summary"] == ""
}, testutil.WithTickInterval(50*time.Millisecond))
return len(result) != 0 && result["status"] == "paused" && result["summary"] == ""
}, testutil.WithWaitFor(30*time.Second))

mustUsage([]string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler", "60"})
mustExec([]string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler"}, nil)
Expand Down

0 comments on commit f069411

Please sign in to comment.